## Transformer

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import os
from sklearn.preprocessing import MinMaxScaler

import warnings
warnings.filterwarnings('ignore')

# df = pd.read_csv(
#     "S&P500_Close.csv",
#     sep=',',na_values=['-1'], index_col='Date',parse_dates=True)


df = pd.read_csv(
    r"C:\Users\haris\OneDrive\Desktop\NEW\Potato_weekly_kalman_new.csv",
    sep=',', index_col='Date',parse_dates=True)

print("Starting file:")
print(df[0:10])

print("Ending file:")
print(df[-10:])

len(df)

In [None]:
# Test for Stationarity

# ADF test
from statsmodels.tsa.stattools import adfuller
def adf_test(timeseries):
    print ('Results of Dickey-Fuller Test:')
    dftest = adfuller(timeseries, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
        dfoutput['Critical Value (%s)'%key] = value
    print (dfoutput)
    
# Call the function and run the test
adf_test(df['Price_Kalman'])


# KPSS test
from statsmodels.tsa.stattools import kpss
def kpss_test(timeseries):
    print ('Results of KPSS Test:')
    kpsstest = kpss(timeseries, regression='c', nlags="auto")
    kpss_output = pd.Series(kpsstest[0:3], index=['Test Statistic','p-value','#Lags Used'])
    for key,value in kpsstest[3].items():
        kpss_output['Critical Value (%s)'%key] = value
    print (kpss_output)

# Call the function and run the test
kpss_test(df['Price_Kalman'])

In [None]:
# Test for non-linearity
# BDS test - output=(array(Test statistic), array(p-value))
# max_dim>=2

from statsmodels.tsa.stattools import bds
bds(df['Price_Kalman'], max_dim=4, epsilon=0.5) #distance=1.5
bds(df['Price_Kalman'], max_dim=2, epsilon=1) 
bds(df['Price_Kalman'], max_dim=2, epsilon=1.5) 
bds(df['Price_Kalman'], max_dim=2, epsilon=2) 

In [None]:
# Normality of the data
import numpy as np
from scipy import stats
from scipy.stats import shapiro
from numpy.random import randn

# conduct the Shapiro-Wilk Test
Shapiro_Wilks_test=shapiro(df['Price_Kalman'])
print(Shapiro_Wilks_test)

# conduct the Jarque-Bera Test
jarque_bera_test = stats.jarque_bera(df['Price_Kalman'])
print(jarque_bera_test)

In [None]:
# demonstrate data normalization with sklearn
from sklearn.preprocessing import MinMaxScaler

# create scaler
scaler = MinMaxScaler()
# fit scaler on data
scaler.fit(df)
# apply transform
normalized = scaler.transform(df)
# inverse transform
# inverse = scaler.inverse_transform(normalized)


In [None]:
# Splitting the data into train and test sets (90% - 10% split)
train_size = int(0.9 * len(df))
df_train = df['Price_Kalman'][:train_size].values
df_test = df['Price_Kalman'][train_size:].values

#df_train=normalized[0:1198]
#df_test=normalized[1198:]

spots_train = df_train.tolist()
spots_test = df_test.tolist()

print("Training set has {} observations.".format(len(spots_train)))
print("Test set has {} observations.".format(len(spots_test)))

# #plotting the data
# df_train.plot()
# df_test.plot()

In [None]:
import numpy as np

def to_sequences(seq_size, obs):
    x = []
    y = []

    for i in range(len(obs)-SEQUENCE_SIZE):
        #print(i)
        window = obs[i:(i+SEQUENCE_SIZE)]
        after_window = obs[i+SEQUENCE_SIZE]
        window = [[x] for x in window]
        #print("{} - {}".format(window,after_window))
        x.append(window)
        y.append(after_window)
        
    return np.array(x),np.array(y)
    
    
SEQUENCE_SIZE = 7
x_train,y_train = to_sequences(SEQUENCE_SIZE,spots_train)
x_test,y_test = to_sequences(SEQUENCE_SIZE,spots_test)

print("Shape of training set: {}".format(x_train.shape))
print("Shape of training set: {}".format(y_train.shape))

print("Shape of test set: {}".format(x_test.shape))
print("Shape of test set: {}".format(y_test.shape))

In [None]:
# Transformer

from tensorflow import keras
from tensorflow.keras import layers

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

In [None]:
def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(1)(x)
    return keras.Model(inputs, outputs)

In [None]:
input_shape = x_train.shape[1:]

model = build_model(
    input_shape,
    head_size=512,
    num_heads=4,
    ff_dim=4,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.4,
    dropout=0.25,
)

model.compile(
    loss="mean_squared_error",
    optimizer=keras.optimizers.Adam(learning_rate=1e-3)
)

model.summary()

callbacks = [keras.callbacks.EarlyStopping(patience=10, \
    restore_best_weights=True)]

history = model.fit(
    x_train,
    y_train,
    validation_split=0.2,
    epochs=100,
    batch_size=64,
    callbacks=callbacks,
)

model.evaluate(x_test, y_test, verbose=1)

###########################
from sklearn.metrics import r2_score
import numpy as np
from sklearn import metrics
trainpred=model.predict(x_train)
testpred = model.predict(x_test)

#Rescaling train data
trainY=scaler.inverse_transform(y_train.reshape(976,1))
print(trainY.shape)
# trainY=trainY.transpose()

trainPredict=scaler.inverse_transform(trainpred)
print(trainPredict.shape)

#Train RMSE
score = np.sqrt(metrics.mean_squared_error(trainY,trainPredict))
print("Score (Train RMSE): {}".format(score))

#Train MAPE
def mape(trainY,trainPredict): 
    trainY, trainPredict = np.array(trainY), np.array(trainPredict)
    return np.mean(np.abs((trainY - trainPredict) / trainY)) * 100

score1=mape(trainY,trainPredict)
print("Score (Train MAPE): {}".format(score1))

#R-Squared value
r2 = r2_score(trainY, trainPredict)
print('r2 score for perfect train model is', r2)


#Rescaling test data
testY=scaler.inverse_transform(y_test.reshape(103,1))
print(testY.shape)
# testY=testY.transpose()

testPredict=scaler.inverse_transform(testpred)
print(testPredict.shape)

#Test RMSE
score = np.sqrt(metrics.mean_squared_error(testY,testPredict))
print("Score (Test RMSE): {}".format(score))

#Test MAPE
def mape(testY,testPredict): 
    testY, testPredict = np.array(testY), np.array(testPredict)
    return np.mean(np.abs((testY - testPredict) / testY)) * 100

score1=mape(testY, testPredict)
print("Score (Test MAPE): {}".format(score1))

#R-Squared value
r2 = r2_score(testY, testPredict)
print('r2 score for perfect test model is', r2)

from pandas import read_csv
from pandas import to_datetime
from matplotlib import pyplot

# plot expected vs actual
pyplot.plot(testY, label='Actual')
pyplot.plot(testPredict, label='Predicted')
pyplot.legend(loc='lower left')
pyplot.show()

In [None]:
import matplotlib.pyplot as plt
loss_train = history.history['loss']
loss_val = history.history['val_loss']
#epochs = range(1,300)
plt.plot(loss_train, 'g', label='Training loss') #epochs
plt.plot(loss_val, 'b', label='validation loss')
plt.title('Training and Validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()