In [1]:
import pandas as pd
import numpy as np
from backtesting import Backtest, Strategy
from data_storage import create_connection
from stockstats import StockDataFrame
from risk_metrics import Risk_Metrics

In [2]:
connection = create_connection("../database/crypto_billionairs.db")

In [3]:
def momentum(df, lag):
    return df.pct_change(periods=lag)

In [4]:
class Momentum_Hypothesis_Test(Strategy):
   
    
    def init(self):
        # compute the rsi and stochastic oscillator with stockstats and return the buy signal of the current row
        
        self.init_long_signal = self.I(init_buy_signal, self.data.df)
        self.init_close_long_signal = self.I(init_close_long_signal, self.data.df)
        self.init_short_signal = self.I(init_short_signal, self.data.df)
        self.init_close_short_signal = self.I(init_close_short_signal, self.data.df)

        self.data.df.drop(self.data.df.columns.difference(['Open', 'High', 'Low', 'Close', "Volume"]), 1, inplace=True)
        
       
    
    def next(self):
        
        #take care here mean reversal is implemented!!
        if self.init_long_signal == 1 and self.position.is_long is False:
            self.position.close()
            self.sell()
            
        elif self.init_close_long_signal == 1:
             self.position.close()
             
        elif self.init_short_signal == -1 and self.position.is_short is False:
            self.position.close()
            self.buy()
            
        elif self.init_close_short_signal == -1:
            self.position.close()
            

In [5]:
def init_buy_signal(nothing):
    return df_temp['buy_indicator'].shift(1)

def init_short_signal(nothing):
    return df_temp['short_indicator'].shift(1)

def init_close_long_signal(nothing):
    return df_temp['close_buy_indicator'].shift(1)

def init_close_short_signal(nothing):
    return df_temp['close_short_indicator'].shift(1)

# if shift(1) is removed the hypothesis is tested correctly (hypo 2) is then tested
# if shift(1) is added the hypothesis 3 is tested 

In [6]:
#this function does the heuristic backtest
def run_backtesting_raw(db_connection):
    
    global df_temp
    
    table_names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", db_connection)
    
    table_names_list = table_names['name'].tolist()

    filtered_table_names = [name for name in table_names_list if "_1day_features" in name and 'trades' not in name and not 'equity_curve' in name and not "_pooling" in name and "_ensemble" not in name]
    print(filtered_table_names)
    df_risk = pd.DataFrame(columns = range(11))
    df_risk.columns = ["table_name", "timehorizon", "return", "buy and hold return", "annualized_return", "sharpe_ratio_annualized", "sortino_ratio_annualized", "maximum_drawdown", "calmar_ratio_annualized", "trades_count", "win_rate"]
    
    for table in filtered_table_names:
        
        df_temp = pd.read_sql_query(f"select * from {table}", db_connection)
        df_temp = df_temp[-365:].copy()
        df_backtesting = pd.DataFrame()
        df_backtesting[['Open', 'High', 'Low', 'Close', "Volume"]] = df_temp[['open', 'high', 'low', 'close', 'volume']]
        
        bt = Backtest(df_backtesting, Momentum_Hypothesis_Test, cash=100_000, commission=.001)
        stats = bt.run()
        trades = pd.DataFrame(stats['_trades'])
        trades.to_sql(f"trades_{table}", db_connection, if_exists="replace")
        
        equity_curve = pd.DataFrame(stats["_equity_curve"]).reset_index()
        equity_curve.to_sql(f"{table}_equity_curve", db_connection, if_exists="replace")
        
        risk = Risk_Metrics(trades, df_temp, 0, stats)
        risk_metrics_list = [f"{table}", len(df_backtesting), stats["Return [%]"], stats["Buy & Hold Return [%]"], stats["Return [%]"],
                             risk.sharpe_ratio(), risk.sortino_ratio(),
                             risk.max_drawdown(), risk.calmar_ratio(), len(trades), stats["Win Rate [%]"]]
        print(risk_metrics_list)
        #df_risk.append(risk_metrics_list)
        df_risk.loc[len(df_risk)] = risk_metrics_list
        
    
    df_risk.to_sql("cryptocurrencies_risk_metrics_1m_abnormal_day", db_connection, if_exists="replace")

In [7]:
%%capture
run_backtesting_raw(connection)

In [8]:
class Momentum_AR_long_short(Strategy):
   
    
    def init(self):
        # compute the rsi and stochastic oscillator with stockstats and return the buy signal of the current row
        
        self.init_long_signal = self.I(init_buy_signal, self.data.df)
        self.init_close_long_signal = self.I(init_close_long_signal, self.data.df)
        self.init_short_signal = self.I(init_short_signal, self.data.df)
        self.init_close_short_signal = self.I(init_close_short_signal, self.data.df)
       

        self.data.df.drop(self.data.df.columns.difference(['Open', 'High', 'Low', 'Close', "Volume"]), 1, inplace=True)
        
       
    
    def next(self):
        
        if int(float(self.init_long_signal)) == 1 and self.position.is_long is False:
            self.position.close()
            self.buy()
            
        elif int(float(self.init_close_long_signal)) == 1:
             self.position.close()
            
        elif int(float(self.init_short_signal)) == -1 and self.position.is_short is False:
            self.position.close()
            self.sell()
            
        elif int(float(self.init_close_short_signal)) == -1:
            self.position.close()

In [9]:
def init_buy_signal(nothing):
    return df_temp_ml['buy_short_indicator']

def init_short_signal(nothing):
    return df_temp_ml['buy_short_indicator']

def init_close_long_signal(nothing):
    return df_temp_ml['close_buy_short_indicator'].shift(1)

def init_close_short_signal(nothing):
    return df_temp_ml['close_buy_short_indicator'].shift(1)

In [10]:
#this function does the backtest for notebook 2
def run_backtesting_ml(db_connection):
    
    global df_temp_ml
    
    table_names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", db_connection)
    
    table_names_list = table_names['name'].tolist()

    filtered_table_names = [name for name in table_names_list if ("_svc" in name or "_knn" in name or "_random_forest" in name
                                                                  or "_logistic_regression" in name or "_mlp" in name)
                            and 'trades' not in name and not 'equity_curve' in name
                            and 'pooling' not in name and 'resampled' not in name]
    print(filtered_table_names)
    df_risk = pd.DataFrame(columns = range(11))
    df_risk.columns = ["table_name", "timehorizon", "return", "buy and hold return", "annualized_return", "sharpe_ratio_annualized", "sortino_ratio_annualized", "maximum_drawdown", "calmar_ratio_annualized", "trades_count", "win_rate"]
    
    for table in filtered_table_names:
        
        df_temp_ml = pd.read_sql_query(f"select * from {table}", db_connection)
        
        df_temp_ml = df_temp_ml[-365:].copy()
        
        df_backtesting = pd.DataFrame()
        df_backtesting[['Open', 'High', 'Low', 'Close', "Volume"]] = df_temp_ml[['open', 'high', 'low', 'close', 'volume']]
        
        
        bt = Backtest(df_backtesting, Momentum_AR_long_short, cash=100_000, commission=.001)
        stats = bt.run()
        trades = pd.DataFrame(stats['_trades'])
        #trades.to_sql(f"trades_{table}", db_connection, if_exists="replace")
        
        #equity_curve = pd.DataFrame(stats["_equity_curve"])
        #equity_curve.to_sql(f"{table}_equity_curve", db_connection, if_exists="replace")
        
        risk = Risk_Metrics(trades, df_temp_ml, 0, stats)
        
        try:
            risk_metrics_list = [f"{table}", len(df_backtesting), stats["Return [%]"], stats["Buy & Hold Return [%]"], risk._annualize(stats["Return [%]"]),
                             risk.sharpe_ratio(), risk.sortino_ratio(),
                             risk.max_drawdown(), risk.calmar_ratio(), len(trades), stats["Win Rate [%]"]]
            print(risk_metrics_list)
            df_risk.loc[len(df_risk)] = risk_metrics_list
        
        except:
            print(table, len(trades))
            pass
        
    
    df_risk.to_sql("cryptocurrencies_risk_metrics_1m_abnormal_day_ml", db_connection, if_exists="replace")

In [11]:
%%capture
run_backtesting_ml(connection)

In [12]:
#this function does the backtest for notebook 3
def run_backtesting_oversampling(db_connection):
    
    global df_temp_ml
    
    table_names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", db_connection)
    
    table_names_list = table_names['name'].tolist()

    filtered_table_names = [name for name in table_names_list if 'resampled' in name]
    print(filtered_table_names)
    df_risk = pd.DataFrame(columns = range(11))
    df_risk.columns = ["table_name", "timehorizon", "return", "buy and hold return", "annualized_return", "sharpe_ratio_annualized", "sortino_ratio_annualized", "maximum_drawdown", "calmar_ratio_annualized", "trades_count", "win_rate"]
    
    for table in filtered_table_names:
        
        df_temp_ml = pd.read_sql_query(f"select * from {table}", db_connection)
        
        df_temp_ml = df_temp_ml[-365:].copy()
        
        df_backtesting = pd.DataFrame()
        df_backtesting[['Open', 'High', 'Low', 'Close', "Volume"]] = df_temp_ml[['open', 'high', 'low', 'close', 'volume']]
        
        
        bt = Backtest(df_backtesting, Momentum_AR_long_short, cash=100_000, commission=.001)
        stats = bt.run()
        trades = pd.DataFrame(stats['_trades'])
        #trades.to_sql(f"trades_{table}", db_connection, if_exists="replace")
        
        #equity_curve = pd.DataFrame(stats["_equity_curve"])
        #equity_curve.to_sql(f"{table}_equity_curve", db_connection, if_exists="replace")
        
        risk = Risk_Metrics(trades, df_temp_ml, 0, stats)
        
        try:
            risk_metrics_list = [f"{table}", len(df_backtesting), stats["Return [%]"], stats["Buy & Hold Return [%]"], risk._annualize(stats["Return [%]"]),
                             risk.sharpe_ratio(), risk.sortino_ratio(),
                             risk.max_drawdown(), risk.calmar_ratio(), len(trades), stats["Win Rate [%]"]]
            print(risk_metrics_list)
            df_risk.loc[len(df_risk)] = risk_metrics_list
        
        except:
            print(table, len(trades))
            pass
        
    
    df_risk.to_sql("cryptocurrencies_risk_metrics_1m_abnormal_day_ml_oversampling", db_connection, if_exists="replace")

In [13]:
%%capture
run_backtesting_oversampling(connection)

In [14]:
#this function does the backtest for the thresholds
def run_backtesting_ml_threshold(db_connection):
    
    global df_temp_ml
    
    table_names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", db_connection)
    
    table_names_list = table_names['name'].tolist()
    
    filtered_table_names = [name for name in table_names_list if "no_" in name]
    df_risk = pd.DataFrame(columns = range(11))
    df_risk.columns = ["table_name", "timehorizon", "return", "buy and hold return", "annualized_return", "sharpe_ratio_annualized", "sortino_ratio_annualized", "maximum_drawdown", "calmar_ratio_annualized", "trades_count", "win_rate"]
    
    for table in filtered_table_names:
        
        df_temp_ml = pd.read_sql_query(f"select * from {table}", db_connection)
        
        #df_temp_ml = df_temp_ml[-356:]
        
        df_backtesting = pd.DataFrame()
        df_backtesting[['Open', 'High', 'Low', 'Close', "Volume"]] = df_temp_ml[['open', 'high', 'low', 'close', 'volume']]
        
        
        bt = Backtest(df_backtesting, Momentum_AR_long_short, cash=100_000, commission=.001)
        stats = bt.run()
        trades = pd.DataFrame(stats['_trades'])
        #trades.to_sql(f"trades_{table}", db_connection, if_exists="replace")
        
        #equity_curve = pd.DataFrame(stats["_equity_curve"])
        #equity_curve.to_sql(f"{table}_equity_curve", db_connection, if_exists="replace")
        
        risk = Risk_Metrics(trades, df_temp_ml, 0, stats)
        
        try:
            risk_metrics_list = [f"{table}", len(df_backtesting), stats["Return [%]"], stats["Buy & Hold Return [%]"], risk._annualize(stats["Return [%]"]),
                             risk.sharpe_ratio(), risk.sortino_ratio(),
                             risk.max_drawdown(), risk.calmar_ratio(), len(trades), stats["Win Rate [%]"]]
            print(risk_metrics_list)
            df_risk.loc[len(df_risk)] = risk_metrics_list
        
        except:
            print(table, len(trades))
            pass
        
    
    df_risk.to_sql("cryptocurrencies_risk_metrics_1m_abnormal_day_ml_threshold", db_connection, if_exists="replace")

In [15]:
%%capture
run_backtesting_ml_threshold(connection)

In [16]:
#this function does the backtest for the pooling models
def run_backtesting_ml_pooling(db_connection):
    
    global df_temp_ml
    
    table_names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", db_connection)
    
    table_names_list = table_names['name'].tolist()
    
    filtered_table_names = [name for name in table_names_list if "_pooling" in name and 'trades' not in name and not 'equity_curve' in name
                            and not "final" in name and not "dataset" in name and not "cryptocurrencies" in name]
    print(filtered_table_names)
    df_risk = pd.DataFrame(columns = range(11))
    df_risk.columns = ["table_name", "timehorizon", "return", "buy and hold return", "annualized_return", "sharpe_ratio_annualized", "sortino_ratio_annualized", "maximum_drawdown", "calmar_ratio_annualized", "trades_count", "win_rate"]
    
    for table in filtered_table_names:
        
        df_temp_ml = pd.read_sql_query(f"select * from {table}", db_connection)
        
        #df_temp_ml = df_temp_ml[-356:]
        
        df_backtesting = pd.DataFrame()
        df_backtesting[['Open', 'High', 'Low', 'Close', "Volume"]] = df_temp_ml[['open', 'high', 'low', 'close', 'volume']]
        
        
        bt = Backtest(df_backtesting, Momentum_AR_long_short, cash=100_000, commission=.001)
        stats = bt.run()
        trades = pd.DataFrame(stats['_trades'])
        #trades.to_sql(f"trades_{table}", db_connection, if_exists="replace")
        
        #equity_curve = pd.DataFrame(stats["_equity_curve"])
        #equity_curve.to_sql(f"{table}_equity_curve", db_connection, if_exists="replace")
        
        risk = Risk_Metrics(trades, df_temp_ml, 0, stats)
        
        try:
            risk_metrics_list = [f"{table}", len(df_backtesting), stats["Return [%]"], stats["Buy & Hold Return [%]"], risk._annualize(stats["Return [%]"]),
                             risk.sharpe_ratio(), risk.sortino_ratio(),
                             risk.max_drawdown(), risk.calmar_ratio(), len(trades), stats["Win Rate [%]"]]
            print(risk_metrics_list)
            df_risk.loc[len(df_risk)] = risk_metrics_list
        
        except:
            print(table, len(trades))
            pass
        
    
    df_risk.to_sql("cryptocurrencies_risk_metrics_1m_abnormal_day_ml_pooling", db_connection, if_exists="replace")

In [17]:
%%capture
run_backtesting_ml_pooling(connection)

In [18]:
#this function does the backtest for the final chosen thresholds
def run_backtesting_ml_final(db_connection):
    
    global df_temp_ml
    
    table_names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", db_connection)
    
    table_names_list = table_names['name'].tolist()
    
    filtered_table_names = [name for name in table_names_list if "ensemble_pooling_final_" in name and not "equity_" in name and not "_trades" in name]
    df_risk = pd.DataFrame(columns = range(11))
    df_risk.columns = ["table_name", "timehorizon", "return", "buy and hold return", "annualized_return", "sharpe_ratio_annualized", "sortino_ratio_annualized", "maximum_drawdown", "calmar_ratio_annualized", "trades_count", "win_rate"]
    print(filtered_table_names)
    for table in filtered_table_names:
        
        df_temp_ml = pd.read_sql_query(f"select * from {table}", db_connection)
        
        df_temp_ml = df_temp_ml[-365:].copy()
        
        df_backtesting = pd.DataFrame()
        df_backtesting[['Open', 'High', 'Low', 'Close', "Volume"]] = df_temp_ml[['open', 'high', 'low', 'close', 'volume']]
        
        
        bt = Backtest(df_backtesting, Momentum_AR_long_short, cash=100_000, commission=.001)
        try:
            stats = bt.run()
            trades = pd.DataFrame(stats['_trades'])
            trades.to_sql(f"{table}_trades", db_connection, if_exists="replace")
        
            equity_curve = pd.DataFrame(stats["_equity_curve"])
            equity_curve.to_sql(f"{table}_equity_curve", db_connection, if_exists="replace")
        
            risk = Risk_Metrics(trades, df_temp_ml, 0, stats)
        
            risk_metrics_list = [f"{table}", len(df_backtesting), stats["Return [%]"], stats["Buy & Hold Return [%]"], risk._annualize(stats["Return [%]"]),
                             risk.sharpe_ratio(), risk.sortino_ratio(),
                             risk.max_drawdown(), risk.calmar_ratio(), len(trades), stats["Win Rate [%]"]]
            print(risk_metrics_list)
            df_risk.loc[len(df_risk)] = risk_metrics_list
        
        except:
            print(table, len(trades))
            pass
        
    
    df_risk.to_sql("cryptocurrencies_risk_metrics_1m_abnormal_day_ml_final", db_connection, if_exists="replace")

In [19]:
%%capture
run_backtesting_ml_final(connection)