### ------------------ Module 5: Machine Learning Model ------------------

- What: Create class to coordinate model usage
- When: 05 August 2022   

$\color{red}{\text{Define Parameters:}}$

In [1]:
# Path to data (if any)
Data_Path = './data/'
Data_Name = 'BTCUSDT_1MINUTE.feather'

# Path to model (Required)
Model_Path = './Models/'
Model_Name = 'BTC_20210601_20211001'

# Path to class (Required)
Class_Path = './Classes/'
Class_Name = 'Indicator_Class.txt'

#### 1. Load some key libaries

In [2]:
import os
import pandas as pd
import numpy as np
import pickle
import dill

from sklearn.ensemble import RandomForestClassifier
from datetime import date,datetime

# Function to keep track of code execution.
def Keep_Track():
    print("Executed successfully. " + datetime.now().strftime("%a %d %b @ %H:%M:%S") + "\n")
    
Keep_Track()

Executed successfully. Sat 06 Aug @ 11:04:37



#### 2. Build a class which will coordinate model usage

In [3]:
## Build a class that will coordinate the usage of the model.

class Indicator_Model():
    
    # Identify the source of the data and the trained model
    def __init__(self, Data_Path, Model_Path, Symbol, ModelName):
        
        # Path to the data
        self.Data_Path = Data_Path
        
        # Path to the model
        self.Model_Path = Model_Path
        
        # Data file name
        self.Symbol = Symbol
        
        # Saved model name
        self.ModelName = ModelName
        
        # Model and data status
        self.DataLoaded = False
        self.DataError = True
        self.ModelLoaded = False
        self.Anyforecast = False
        
        self.DataLoadedWhen = '-'
        self.ModelLoadedWhen = '-'
        self.ModelExecutedWhen = '-'
        self.ForecastWhen = '-'
        
        self.FileName = ['None', 'None']
        
        self.__Lag = [60,180]
        self.__factor_functions=[self.__Closing, self.__Volume, self.__cal_close_avg,self.__cal_bk_return,
                          self.__cal_high_low,self.__cal_max_mean,self.__cal_min_mean, self.__cal_volume_avg,
                          self.__cal_max_mean_volume,self.__cal_min_mean_volume,self.__cal_std, 
                          self.__r_zscore, self.__Parkinson]

        self.get_data()
        self.get_model()
        
        
    # Function to provide some details on the state of the class.    
    def get_info(self):
        print('* Data Loaded :', self.DataLoaded)
        print('  - Symbol    :', self.FileName[0])
        print('  - Type      :', self.FileName[-1])
        print('  - Data Error:', self.DataError)
        if self.DataLoaded:
            print('  - Loaded @  :', self.DataLoadedWhen)
            print('  - Records   :', len(self.data))
            print('  - Max date  :', self.Max_Date)
            print('  - Min date  :', self.Min_Date)
        
        print('* Model Loaded:', self.ModelLoaded)
        print('  - Model name:', self.ModelName)
        if self.ModelLoaded:
            print('  - Loaded @  :', self.ModelLoadedWhen)
        
        print('* Forecast(s) :', self.Anyforecast)
        if self.Anyforecast:
            print('  - Forecast @ :', self.ForecastWhen)
        
        print('')
        self.__Keep_Track()
        
    # Keep track of code execution    
    def __Keep_Track(self):
            print("  Executed successfully. " + datetime.now().strftime("%a %d %b @ %H:%M:%S") + "\n")
    
    # Check the data contains the columns needed
    def __DataCheck(self, Data):    
        return all(elem in Data  for elem in ['Open Time', 'Close', 'Volume','High', 'Low'])
    
    
    # Load the data -- allow for various file types.
    def get_data(self):
        try:
            
            if self.Symbol == None:
                # In this case we want to use a dataframe directly (i.e. not load it from file)
                df = self.Data_Path
                if not self.__DataCheck(df):
                    self.DataError = True
                    print('* File error - fields missing')
                
                else:
                    self.DataError = False
                    self.Max_Date = df['Open Time'].max()
                    self.Min_Date = df['Open Time'].min()
                    df = df.set_index('Open Time')
                    self.DataLoadedWhen = datetime.now().strftime("%a %d %b @ %H:%M:%S")
                    self.DataLoaded = True 
                    
                
                self.FileName = ['User', 'None']    
                df['Ticker'] = 'SelfLoad'
                
                
            elif self.Data_Path == 'Binance': # Load direct from Binance
                df = self.get_binance(self.Symbol)
                self.DataError = False
                self.Max_Date = df['Open Time'].max()
                self.Min_Date = df['Open Time'].min()
                df = df.set_index('Open Time')
                self.DataLoadedWhen = datetime.now().strftime("%a %d %b @ %H:%M:%S")
                self.DataLoaded = True 
                self.FileName = ['Binance', 'None']    
                df['Ticker'] = self.Symbol 
                print('* Binance Data     :', self.Symbol)
                
            else:
                self.FileName = self.Symbol.split('.')
                self.extension = self.FileName[1]

                if self.extension=='feather':
                    print('* Feather data file:', self.Symbol)
                    df = pd.read_feather(os.path.join(self.Data_Path, f'{self.Symbol}'))
                    if not self.__DataCheck(df):
                        self.DataError = True
                        print('* File error - fields missing')
                    else:
                        self.DataError = False
                        self.Max_Date = df['Open Time'].max()
                        self.Min_Date = df['Open Time'].min()
                        df = df.set_index('Open Time')
                        self.DataLoadedWhen = datetime.now().strftime("%a %d %b @ %H:%M:%S")
                        self.DataLoaded = True                
                elif self.extension =='csv':
                    print('* CSV data file    :', self.Symbol)
                    df = pd.read_csv(os.path.join(path, f'{self.Symbol}'))
                    if not self.__DataCheck(df):
                        self.DataError = True
                        print('* File error - fields missing')
                    else:
                        self.DataError = False
                        self.Max_Date = df['Open Time'].max()
                        self.Min_Date = df['Open Time'].min()
                        df = df.set_index('Open Time')
                        self.DataLoadedWhen = datetime.now().strftime("%a %d %b @ %H:%M:%S")
                        self.DataLoaded = True
                else:
                    print('type not supported')
                
                df['Ticker'] =  self.FileName[0]
                
                        
            
            self.data = df
        
        except:
            print('  !!! Error loading data !!!')
              
   
    # Load the Model
    def get_model(self):
        try:
            self.Model = pickle.load(open(os.path.join(self.Model_Path, f'{self.ModelName}.pkl'),'rb'))
            print('* Model loaded     :', self.ModelName)
            self.ModelLoaded = True
            self.ModelLoadedWhen = datetime.now().strftime("%a %d %b @ %H:%M:%S")
        except:
            print('* Error loading model!')
            
    def __cal_close_avg(self, unused):
        # Current close/avg
        d={}
        for i in self.__Lag:
            ds = (self.data_run.Close/(self.data_run.Close.rolling(i).mean()))
            d[f'CurCloseDivAvg_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_volume_avg(self, unused):
        # Current volume/avg
        d={}
        for i in self.__Lag:
            ds = (self.data_run.Volume/(self.data_run.Volume.rolling(i).mean()))
            d[f'CurVolumeDivAvg_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_bk_return(self, unused):
        # Period return up to now
        d={}
        for i in self.__Lag:
            ds = self.data_run.Close.pct_change(i)
            d[f'BackReturn_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_high_low(self, unused):
        #rolling high/rolling low
        d={}
        for i in self.__Lag:
            ds=self.data_run.High.rolling(i).max()/self.data_run.Low.rolling(i).min()
            d[f'HighDivLow_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_max_mean(self, unused):
        #rolling high/rolling mean
        d={}
        for i in self.__Lag:
            ds=self.data_run.High.rolling(i).max()/self.data_run.Close.rolling(i).mean()
            d[f'MaxDivMean_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_min_mean(self, unused):
        #rolling min/rolling mean
        d={}
        for i in self.__Lag:
            ds=self.data_run.Low.rolling(i).min()/self.data_run.Close.rolling(i).mean()
            d[f'MinDivMean_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_max_mean_volume(self, unused):
        #rolling high/rolling mean for volume
        d={}
        for i in self.__Lag:
            ds=self.data_run.Volume.rolling(i).max()/self.data_run.Volume.rolling(i).mean()
            d[f'MaxDivMeanVolume_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_min_mean_volume(self, unused):
        #rolling min/rolling mean for volume
        d={}
        for i in self.__Lag:
            ds=self.data_run.Volume.rolling(i).min()/self.data_run.Volume.rolling(i).mean()
            d[f'MinDivMeanVolume_{i}']=ds
        return pd.concat(d,axis=1)


    def __cal_std(self, unused):
        #rolling std 
        d={}
        for i in self.__Lag:
            ds=self.data_run.Close.rolling(i).std()
            d[f'Std_{i}']=ds
        return pd.concat(d,axis=1)


    def __Closing(self, unused):
        #rolling std 
        d={}
        for i in self.__Lag:
            ds=self.data_run.Close.shift(i)
            d[f'Close_{i}']=ds

         # Include current by default 
        d[f'Close_{0}'] = self.data_run.Close
        return pd.concat(d,axis=1)


    def __Volume(self, unused):
        #rolling std 
        d={}
        for i in self.__Lag:
            ds=self.data_run.Volume.shift(i)
            d[f'Volume_{i}']=ds

        # Include current by default     
        d[f'Volume_{0}']=self.data_run.Volume   
        return pd.concat(d,axis=1)

    def __r_zscore(self, unused):
        # z_score Tina
        d={}
        for i in self.__Lag:
            ds=(np.log(self.data_run.Volume) - np.log(self.data_run.Volume).rolling(i).mean())/np.log(self.data_run.Volume).rolling(i).std()
            d[f'Z_Volume_{i}']=ds

        return pd.concat(d,axis=1)


    def __Parkinson(self, unused):
        # z_score Tina
        d={}

        for i in self.__Lag:
            ds = np.log(self.data_run.High / self.data_run.Low)**2
            d[f'Parkinson_{i}']= 0.5*np.sqrt(ds.rolling(i).sum()) /np.sqrt(np.log(2)*i/(365*1440))

        return pd.concat(d,axis=1)                
                
                
    # Generate the features needed by the model with the dataset provided.     
    def __generate_factorsMM(self):    
        # Generate factors on panel data with rolling window w
        l=[]
        
        for f in self.__factor_functions:
            factor = self.data_run.groupby('Ticker').apply(f)
            l.append(factor)
            
        return pd.concat(l,axis=1) 
    
    def Calculate_Indicator(self, records):
        try:
            if records <=0:
                print('! Need to forecast for at least 1 timestamp')

            elif  len(self.data.tail(records+180)) < (records+180):
                print('! Insufficient data for',records,'forecast(s)')

            else:

                # Limit the dataset to the range of interest (needed for performance)
                # Need at least 180 records
                self.data_run = self.data.tail(records+180)
                
                self.GeneratedFactors = self.__generate_factorsMM()
                
                # Reset the index
                self.GeneratedFactors = self.GeneratedFactors.reset_index()

                # Remove all missing values
                self.GeneratedFactors = self.GeneratedFactors.dropna()

                Temp = self.GeneratedFactors[['Open Time']].copy()

                # Drop the date column
                Data_Run =  self.GeneratedFactors.drop(columns = ['Open Time']).values

                # Run the model    
                Prob = self.Model.predict_proba(Data_Run)

                Temp['Probability'] = Prob[:,1]
                
                self.Anyforecast = True
                self.ForecastWhen = datetime.now().strftime("%a %d %b @ %H:%M:%S")
                Temp['Forecast Time'] = self.ForecastWhen
                Temp = Temp.reset_index(drop = True)
                return Temp[['Forecast Time', 'Open Time', 'Probability']]

        except:
                print('* Error in forecast')
                if not(self.DataLoaded):
                    print(' - Please load data')
                if not(self.ModelLoaded):
                    print(' - Please load a model')
                    
                    
    def get_binance(self, ticker):
        
        try:
            client = Client('','')

            kline_interval ='1MINUTE'

            #Query binance API and format returned dictionary into dataframe
            raw = client.get_historical_klines(ticker, client.KLINE_INTERVAL_1MINUTE,'181 minutes ago UTC')

            # Convert to Dataframe
            df = pd.DataFrame(raw)

            # Define columns
            df.columns=['Open Time','Open','High','Low','Close','Volume','Close Time','Quote Asset Volume','Number of Trades','TB Base Volume','TB Quote Volume','ignore']

            # Format
            df['Open Time']=pd.to_datetime(df['Open Time']/1000,unit='s')
            df['Close Time']=pd.to_datetime(df['Close Time']/1000,unit='s')

            # Change object values into numeric 
            numeric_columns=['Open','High','Low','Close','Volume','Quote Asset Volume','TB Base Volume','TB Quote Volume']
            df[numeric_columns]=df[numeric_columns].apply(pd.to_numeric,axis=1)

            return df
        
        except:
            return 0
                
                
Keep_Track()       

Executed successfully. Sat 06 Aug @ 11:04:47



Serialise the class

In [4]:
dat = dill.dumps(Indicator_Model)

#open text file
text_file = open(Class_Path+Class_Name, "wb")
# text_file = open("Indicator_Class.txt", "wb")
 
#write string to file
n = text_file.write(dat)
 
#close file
text_file.close()

Keep_Track()  

Executed successfully. Sat 06 Aug @ 11:04:58

