**This script creates a class for classical machine learning in time series data. **

## Standard Import


In [None]:
## Standard imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import sklearn

import os
import math

%matplotlib inline


In [None]:
from sklearn.model_selection import TimeSeriesSplit
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import GridSearchCV


from sklearn import linear_model
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')
# DATA_DIR="/content/drive/MyDrive/IAQF - Five+1 Guys/2022/Data"

# ML Framework class (need to pass dataset and model)

In [None]:
class MLFramework():
  def __init__(self, target_var:str, features:list , df = None, DF_DIR: str = None):
    if df is not None:
      self.df = df
    else:
      self.df = pd.read_csv(DF_DIR, index_col=0, parse_dates=True)
      self.df.sort_index(inplace=True)
    self.target_var = target_var
    self.features = features
    self.tsSplit = None
  
  def standardFlow(self, model, metric_CV=None):
    self.train_test_split()
    self.setModel(model)
    CVScore = self.CVScore_TS(metric_CV = metric_CV)
    print("CV Score: ",CVScore)
    print("Insample Result: ", self.insampleResult())
    print("Outsample Result: ", self.outsampleResult())
    print("Remark: model is fitted with the whole trainning dataset ")

  def insampleResult(self):
    self.model.fit(self.X_train,self.y_train)
    return self.model.score(self.X_train, self.y_train)

  def outsampleResult(self):
    return self.model.score(self.X_test, self.y_test)


  def train_test_split(self, StDate_test = pd.to_datetime('2018-01-01'), ratio = None):
    if ratio is not None:
      nrow = self.df.shape[0]
      StDate_test = self.df.iloc[int(nrow*ratio)].index
    else:
      if self.df.index.max() < StDate_test:
        raise ValueError('MLFramework.train_test_split: Incorrect StDate_test')
      
    
    train_idx = self.df.index < StDate_test
    # self.X_train = self.df.loc[train_idx,self.features].values
    # self.X_test = self.df.loc[~train_idx,self.features].values
    # self.y_train = self.df.loc[train_idx,self.target_var].values
    # self.y_test = self.df.loc[~train_idx,self.target_var].values

    self.X_train = self.df.loc[train_idx,self.features]
    self.X_test = self.df.loc[~train_idx,self.features]
    self.y_train = self.df.loc[train_idx,self.target_var]
    self.y_test = self.df.loc[~train_idx,self.target_var]

  def setModel(self, model):
    self.model = model

  def CVScore_TS(self, metric_CV=None, n_splits=5):
    # metric_cv: https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter
    self.metric_CV = metric_CV
    self.tsSplit = TimeSeriesSplit(n_splits=n_splits)
    cv = self.tsSplit.split(self.X_train)
    return cross_val_score(self.model, self.X_train , self.y_train , cv = cv, scoring = metric_CV )

  def gridSearch_TS(self, parameters:dict, metric_CV=None, n_splits=5):
    if self.tsSplit is None:
      self.tsSplit = TimeSeriesSplit(n_splits=n_splits)
    cv = self.tsSplit.split(self.X_train)

    self.gs =  GridSearchCV(self.model, parameters, scoring = metric_CV, cv = cv)
    self.gs.fit(self.X_train, self.y_train)
    
    print("Best param from Grid Search:", self.gs.best_params_)
    print("CV score for the best param:", self.gs.best_score_)
    return pd.DataFrame(self.gs.cv_results_)

  def target_transform_cat(self):
    self.y_train_trans = pd.get_dummies(self.y_train)[[-1,0,1]]
    self.y_test_trans = pd.get_dummies(self.y_test)[[-1,0,1]]
    if hasattr(self, 'y_val'):
      self.y_val_trans = pd.get_dummies(self.y_val)[[-1,0,1]]
    # return self.y_train_trans, self.y_test_trans 
  
  def train_val_test_split(self, StDate_val = pd.to_datetime('2014-01-01'), StDate_test = pd.to_datetime('2018-01-01')):
    train_idx = self.df.index < StDate_val
    val_idx = (self.df.index >= StDate_val) & (self.df.index < StDate_test)
    test_idx = self.df.index >= StDate_test


    self.X_train = self.df.loc[train_idx,self.features]
    self.X_val = self.df.loc[val_idx,self.features]
    self.X_test = self.df.loc[test_idx,self.features]

    self.y_train = self.df.loc[train_idx,self.target_var]
    self.y_val = self.df.loc[val_idx, self.target_var]
    self.y_test = self.df.loc[test_idx,self.target_var]

    return self.X_train,  self.X_val, self.X_test, self.y_train, self.y_val, self.y_test

In [None]:
# df = pd.read_csv(os.path.join(DATA_DIR,'Russell3000.csv'), index_col=0, parse_dates=True)
# target_var='Adj Close'
# features = ['Open','High','Low']

# model = linear_model.Lasso(alpha=0.01)

# ML_trial = MLFramework( target_var=target_var, features=features , df =df)
# ML_trial.standardFlow(model = model)


In [None]:
# params = {'alpha':np.arange(0.001,1,0.01)}
# ML_trial.gridSearch_TS(params)

In [None]:
# np.arange(0.001,1,0.01)

# Features

## 

In [None]:
# class features():
#   def __init__(self,df)
#   def return()

# Read dataset for checking

In [None]:
# file = os.path.join(DATA_DIR,'DailyData.csv')
# test=pd.read_csv(file, parse_dates=True, index_col=0)

In [None]:
# test.describe()

In [None]:
# test.loc[test.isna().any(axis=1),test.isna().any(axis=0)]

In [None]:
# test.dtypes

In [None]:
# filename = os.path.join(DATA_DIR,'Russell3000TotalReturn.xlsx')
# test2 = pd.read_excel(filename,skiprows=6,index_col=0,parse_dates=True)
# test2

In [None]:
# test2.index

In [None]:
# test2.index[0]

## Try with labels

In [None]:
# df = pd.read_csv(os.path.join(DATA_DIR,'Russell3000.csv'), index_col=0, parse_dates=True)
# df.head()


## Method 1: detect an increase/decrease of at least r% in a future horizon of n units time.
Let up=0.1, and down=-0.14. Look ahead 10 days.

In [None]:
# df['RET10'] = df["Adj Close"].shift(-10) / df["Adj Close"] - 1 #return of future 10 days

# def sig(value, up, down):
#   if value > up:
#     return 1
#   elif value < down:
#     return -1
#   return 0

# up = 0.1
# down = -0.15

# df['POS'] = df['RET10'].apply(sig, args=(up, down,))
# df

In [None]:
# df['POS'].cumsum().plot()

In [None]:
# target_var='POS'
# features = ['Open','High','Low']

In [None]:
# model = MLPClassifier(random_state=1, max_iter=300)

# ML_trial = MLFramework( target_var=target_var, features=features , df =df)
# ML_trial.standardFlow(model = model)

## Method 2: Lunde and Timmermann

In [None]:
# def defbuallbear(df, varname, lambda1, lambda2):
#   peaks = [df.index[0]]
#   troughs = [df.index[0]]
#   flag = 1
#   for i in range(df.shape[0]):
#     if flag == 1:
#       if df.iloc[i][varname] > df.loc[peaks[-1]][varname]:
#         peaks[-1] = df.index[i]
#       elif df.iloc[i][varname] < (1 - lambda2) * df.loc[peaks[-1]][varname]:
#         troughs.append(df.index[i])
#         flag = -1
#     else:
#       if df.iloc[i][varname] < df.loc[troughs[-1]][varname]:
#         troughs[-1] = df.index[i]
#       elif df.iloc[i][varname] > (1 + lambda1) * df.loc[troughs[-1]][varname]:
#         peaks.append(df.index[i])
#         flag = 1
#   df["State"] = np.nan
#   df.loc[peaks, "State"] = 1
#   df.loc[troughs, "State"] = -1
#   df.fillna(method="bfill", inplace=True)
#   return df

In [None]:
# df = defbullbear(df, "Adj Close", 0.1, 0.05) # lambda1=0.1, lambda2=0.05

In [None]:
# # create figure and axis objects with subplots()
# fig,ax = plt.subplots(figsize=(20,12))
# # make a plot
# ax.plot(df['Adj Close'], color="red", marker="o")
# # set x-axis label
# ax.set_xlabel("year",fontsize=14)
# # set y-axis label
# ax.set_ylabel("Close",color="red",fontsize=14)

# # twin object for two different y-axis on the sample plot
# ax2=ax.twinx()
# # make a plot with different y-axis using second axis object
# ax2.plot(df["State"], color="blue",marker="o")
# ax2.set_ylabel("State",color="blue",fontsize=14)
# plt.show()

In [None]:
# df["State"].hist()

In [None]:
# target_var='State'
# features = ['Open','High','Low']

# model = linear_model.Lasso(alpha=0.01)

# ML_trial = MLFramework( target_var=target_var, features=features , df =df)
# ML_trial.standardFlow(model = model)

In [None]:
# model = MLPClassifier(random_state=1, max_iter=300)

# ML_trial = MLFramework( target_var=target_var, features=features , df =df)
# ML_trial.standardFlow(model = model)