In [7]:
# Walk Forward  04122022 Scott Sir

In [8]:
import pandas as pd
import numpy as np
import time
import os
import datetime
import backtest
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
import warnings
import joblib
from sklearn.feature_selection import RFECV


def add_features(df):
    # candle sign is calculated using closing and opening prices if close is greater than open sign is +1 else sign is -1
    df['candle_sign'] = np.sign(df['Close'] - df['Open'])

    # calculate rolling average of last x candlesizes and signs and shift by 1
    df['daily_returns'] = df['Close'].pct_change()

    # if open = close it is assigned +1
    df.loc[df['candle_sign'] == 0, 'candle_sign'] = 1

    # using the formula calculate the candle size
    df['candle_size'] = np.sign(df['Close'] - df['Open']) * (
            df['High'] - df['Low']) * 100 / df['Low']

    # rolling means of last 7 values (including the current one)
    df['rolling_candle_size'] = df['candle_size'].rolling(7).mean()
    df['rolling_candle_sign'] = df['candle_sign'].rolling(7).mean()
    df['rolling_returns'] = df['daily_returns'].rolling(7).mean()

    # rolling std of last 7 values (including the current one)
    df['rolling_candle_size_std'] = df['candle_size'].rolling(7).std()
    df['rolling_returns_std'] = df['daily_returns'].rolling(7).std()
    df['rolling_std_dev'] = df['candle_size'].rolling(7).std()

    return df


def walk_forward_sets(end_year, train_duration=5, test_duration=1):
    st = time.time()
    path_to_data_folder = "data"
    signal_file_path = os.path.join(path_to_data_folder, "D:\\A Scott Internship\\Programs\\Walkforward 04122022\\Signals for SPY 2000 to 102822.xlsx")
    data_file_path = os.path.join(path_to_data_folder, "D:\\A Scott Internship\\Programs\\Walkforward 04122022\\ETF Ticker Data thru 102822.xlsx")

    df_signal = pd.read_excel(signal_file_path, sheet_name="Signals")
    df_signal['Date/Time'] = pd.to_datetime(df_signal['Date/Time'])

    #df_value = pd.read_excel(signal_file_path, sheet_name="Values")
    #df_value['Date/Time'] = pd.to_datetime(df_value['Date/Time'])

    df_data = pd.read_excel(data_file_path)
    df_data_spy = df_data.loc[df_data['Ticker'] == "SPY"].reset_index(drop=True)

    df_data_spy['Date/Time'] = pd.to_datetime(df_data_spy['Date/Time'])
    df_data_spy.sort_values(by='Date/Time', ascending=True, inplace=True)

    # split data into train and test
    # divide the data into train and test sets
    df_data_spy = add_features(df_data_spy)
    df_data_spy = pd.merge(df_data_spy, df_signal, on='Date/Time', how='left')
    #df_data_spy = pd.merge(df_data_spy, df_value, on='Date/Time', how='left')

    df_data_spy.dropna(inplace=True)
    df_data_spy = df_data_spy.reset_index(drop=True)

    # train_size = int(df_data_spy.shape[0] * 0.8)

    xtrain = df_data_spy.loc[(df_data_spy['Date/Time'] > datetime.datetime(year=end_year - train_duration, month=1, day=1)) & (
            df_data_spy['Date/Time'] < datetime.datetime(year=end_year, month=1, day=1)), :].reset_index(drop=True)
    xtest = df_data_spy.loc[(df_data_spy['Date/Time'] >= datetime.datetime(year=end_year, month=1, day=1)) &
                            (df_data_spy['Date/Time'] <= datetime.datetime(year=end_year+test_duration, month=1,
                                                                           day=1)), :].reset_index(
        drop=True)

    xtrain['target'] = 10
    xtrain.loc[xtrain['daily_returns'] > xtrain['daily_returns'].quantile(0.75), 'target'] = 1
    xtrain.loc[xtrain['daily_returns'] < xtrain['daily_returns'].quantile(0.25), 'target'] = -1
    xtrain['target'] = xtrain['target'].shift(-2)
    return xtrain, xtest

def walkforward_train(xtrain):
    xtrain.drop(['Date/Time', 'Open', 'Close', 'High', 'Low', 'Ticker'], axis=1, inplace=True)

    # drop rows with missing values
    xtrain.dropna(inplace=True)
    xtrain.reset_index(drop=True, inplace=True)

    ytrain = xtrain['target']
    xtrain = xtrain.loc[:, xtrain.columns != 'target']

    # for machine learning algo hyperparameter tuning 3 fold custom cv is used
    params = {
        'max_depth': [5, 6, 7, -1],
        'num_leaves': [12, 24, 32],
        'n_estimators': [100, 200],
        'colsample_bytree': [0.5, 0.7, 0.8, 0.9],
        'subsample': [0.5, 0.7, 0.8, 0.9]
    }


    model = lgb.LGBMClassifier(random_state=121, class_weight='balanced')

    # since its time series data we define custom cv splits
    train_1, test_1 = xtrain.loc[:int(0.8 * xtrain.shape[0])].index, xtrain.loc[int(0.8 * xtrain.shape[0]):].index
    train_2, test_2 = xtrain.loc[:int(0.85 * xtrain.shape[0])].index, xtrain.loc[int(0.85 * xtrain.shape[0]):].index
    train_3, test_3 = xtrain.loc[:int(0.9 * xtrain.shape[0])].index, xtrain.loc[int(0.9 * xtrain.shape[0]):].index
    cv = [(train_1, test_1), (train_2, test_2), (train_3, test_3)]

    clf = GridSearchCV(model, params, cv=cv, n_jobs=-1, scoring='f1_macro')

    clf.fit(xtrain.values, ytrain)

    model = clf.best_estimator_

    rfe = RFECV(model, step=1, cv=cv)
    rfe.fit(xtrain.values, ytrain)
    features = xtrain.loc[:, rfe.support_].columns.values
    joblib.dump(features, 'features.pkl')

    # fit model on complete training data
    model.fit(xtrain[features].values, ytrain)

    # save the model
    joblib.dump(model, 'model_lgb.pkl')

def walkforward_test(test, year):
    features = joblib.load('features.pkl')

    model = joblib.load('model_lgb.pkl')

    test['Date/Time'] = pd.to_datetime(test['Date/Time'])
    test.dropna(inplace=True)

    test.reset_index(drop=True, inplace=True)
    test["pred"] = model.predict(test[features])

    # Results on test set
    # backtest.backtest(test, 'pred', output_filename="output_1year_{}.html".format(year))
    return test


def run_walk_forward():
    test_df = pd.DataFrame()
    for year in range(2016, 2022, 1):
        print(year)
        train, test = walk_forward_sets(year)
        walkforward_train(train)
        test = walkforward_test(test, year)
        test_df = pd.concat([test, test_df], axis=0)
    test_df.reset_index(drop=True, inplace=True)
    test_df.to_csv("check_test_data.csv", index=False)
    backtest.backtest(test_df, 'pred', output_filename="output_1year.html")

if __name__ == '__main__':
    run_walk_forward()

2016
2017
2018
2019
2020
2021


AttributeError: module 'backtest' has no attribute 'backtest'