In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pickle
import matplotlib.pyplot as plt

In [None]:
import tensorflow as tf
from keras.layers import Dense, Input, GlobalMaxPooling1D, LSTM, GRU
from keras.models import Model
from keras.callbacks import ModelCheckpoint
from sklearn.metrics import mean_absolute_percentage_error

In [None]:
class LSTMModel():

    def __init__(self,Ntest,T,Nsplit,epochs, Layer1,Layer2,featureNumber,stockname):
        #output number
        self.Ntest=Ntest
        #input number
        self.Tx=T
        #test-train split
        self.Nsplit=Nsplit
        self.epochs=epochs
        self.Layer1=Layer1
        self.Layer2=Layer2
        self.featureNumber=featureNumber
        self.stockname=stockname

    def splitDataset(self,df):
        train = df.iloc[:-self.Nsplit]
        test = df.iloc[-self.Nsplit:]

        # boolean series to index df rows
        train_idx = df.index <= train.index[-1]
        test_idx = df.index > train.index[-1]

        return df,train,test,train_idx,test_idx

    def supervisedDataset(self,df,T,Ntest,scaler):
        # Scale the dataset

        df_scaled = pd.DataFrame(scaler.fit_transform(df.values))
        df_scaled.set_index(df.index,inplace=True)
        scaler_name="minmax_scaler_{}".format(self.stockname)


        with open(scaler_name, 'wb') as f:
            pickle.dump(scaler, f)

        df_scaled['Seconds'] = df_scaled.index.map(pd.Timestamp.timestamp)
        day = 60 * 60 * 24
        year = 365.2425 * day

        df_scaled['Day sin'] = np.sin(df_scaled['Seconds'] * (2 * np.pi / day))
        df_scaled['Day cos'] = np.cos(df_scaled['Seconds'] * (2 * np.pi / day))
        df_scaled['Year sin'] = np.sin(df_scaled['Seconds'] * (2 * np.pi / year))
        df_scaled['Year cos'] = np.cos(df_scaled['Seconds'] * (2 * np.pi / year))
        df_scaled.drop(columns={'Seconds'}, inplace=True)


        Tx = T
        Ty = Ntest
        X = []
        Y = []

        # Make supervised dataset
        series = df_scaled.dropna().to_numpy()
        series_out=series[:,0]
        for t in range(len(series) - Tx - Ty + 1):
            x = series[t:t + Tx]
            X.append(x)
            y = series_out[t + Tx:t + Tx + Ty]
            Y.append(y)

        X = np.array(X).reshape(-1, Tx, self.featureNumber)
        Y = np.array(Y).reshape(-1, Ty)

        print("X.shape", X.shape, "Y.shape", Y.shape)

        Xtrain_m, Ytrain_m = X[:-self.Nsplit], Y[:-self.Nsplit]
        Xtest_m, Ytest_m = X[-self.Nsplit:], Y[-self.Nsplit:]

        return df_scaled,Xtrain_m,Xtest_m,Ytrain_m,Ytest_m

    def build_model_rnn(self,Xtrain_m,Ytrain_m,Xtest_m,Ytest_m):

        i = Input(shape=(self.Tx, self.featureNumber))
        x = LSTM(self.Layer1, return_sequences=True)(i)
        x = LSTM(self.Layer2, return_sequences=True)(x)
        x = GlobalMaxPooling1D()(x)
        x = Dense(self.Ntest)(x)
        model = Model(i, x)

        model_name="bestmodel_{}.h5".format(self.stockname)
        check_point = ModelCheckpoint(model_name, monitor='val_loss', save_best_only=True)

        model.compile(
            loss='mse',
            optimizer='adam',
        )

        r = model.fit(
            Xtrain_m,
            Ytrain_m,
            epochs=self.epochs,
            validation_data=(Xtest_m, Ytest_m),
            callbacks=[check_point],
        )

        plt.plot(r.history['loss'], label='train loss')
        plt.plot(r.history['val_loss'], label='test loss')
        plt.title(self.stockname)
        plt.legend()
        plt.show()

        # load best model
        best_model = tf.keras.models.load_model(model_name)

        return best_model

    def predict(self,model, Xtrain, Xtest):
        Ptrain = model.predict(Xtrain)
        Ptest = model.predict(Xtest)

        return Ptrain, Ptest

    def evalutePredictions(self,Ptrain,Ptest,scaler, train_idx, test_idx,df,Ytest_m):

        Ptrain_inverse = scaler.inverse_transform(Ptrain)
        Ptest_inverse = scaler.inverse_transform(Ptest)
        Ytest=scaler.inverse_transform(Ytest_m)
        Y_test_mape=Ytest[:,0]
        Ptest_mape=Ptest_inverse[:,0]

        N=len(Ptrain)
        train_idx[:self.Tx] = False
        train_idx[(N+self.Ntest):]=False

        test_idx[((N+self.Ntest)):]=True
        test_idx[(-(self.Ntest-1)):] = False



        df.loc[train_idx, 'multistep_train'] = Ptrain_inverse[:,0]
        df.loc[test_idx, 'multistep_test'] = Ptest_inverse[:,0]

        mape1 = mean_absolute_percentage_error(
            df.loc[test_idx, 'price'], df.loc[test_idx, 'multistep_test'])
        print("1-step MAPE:", mape1 * 100)

        mape2 = mean_absolute_percentage_error(
            Y_test_mape, Ptest_mape)
        print("1-step MAPE:", mape2 * 100)
        return df,mape1*100

    def dfModel(self,df,resolution):
        df = df.dropna()

        # scaling
        # Create a MinMaxScaler instance
        scaler = MinMaxScaler()


        df_first,train, test, train_idx, test_idx = self.splitDataset(df)

        df,Xtrain_m, Xtest_m, Ytrain_m, Ytest_m=self.supervisedDataset(df,self.Tx,self.Ntest,scaler)

        best_model=self.build_model_rnn(Xtrain_m, Ytrain_m, Xtest_m, Ytest_m)

        Ptrain, Ptest=self.predict(best_model,Xtrain_m,Xtest_m)

        df,mape=self.evalutePredictions(Ptrain,Ptest,scaler, train_idx, test_idx,df_first,Ytest_m)


        return df,mape,Ytest_m,Ptest