In [1]:
%matplotlib inline

#import packages needed
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader,TensorDataset
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler,MinMaxScaler
import math
from matplotlib.lines import Line2D
from torchsummaryX import summary

# To auto load the customise module
%load_ext autoreload
%autoreload 2
import deep_learning_module
import data_module


ModuleNotFoundError: No module named 'deep_learning_module'

In [None]:
df = pd.read_csv('../../datasets/superstore.csv')

In [None]:
technology = df.loc[df['Category'] == 'Technology']
technology

In [None]:
technology['Order Date'].min(), technology['Order Date'].max()

In [None]:
cols = ['Row ID', 'Order ID', 'Ship Date', 'Ship Mode', 'Customer ID', 'Customer Name', 'Segment', 'Country', 'City', 'State', 'Postal Code', 'Region', 'Product ID', 'Category', 'Sub-Category', 'Product Name', 'Quantity', 'Discount', 'Profit']
technology.drop(cols, axis=1, inplace=True)
technology.isnull().sum()


In [None]:
technology['Order Date'] = pd.to_datetime(technology['Order Date'])
technology.set_index('Order Date', inplace=True)


In [None]:
technology = technology.groupby('Order Date')['Sales'].sum().reset_index()
technology

In [None]:
technology = technology.set_index('Order Date')
technology.index


In [None]:
y = technology['Sales'].resample('MS').mean()
y.shape

In [None]:
y.plot(figsize=(20,10))
plt.xlabel("Order Date")
plt.ylabel("Technology Sales")
plt.show()

In [None]:
split_ratio = 0.7
num_epochs = 60
window_size = 2
batch_size = 2
n_step = 2


In [None]:
split_data = round(len(y)*split_ratio)
split_data

In [None]:
#split data by indexing 
train_data = y[:split_data]
test_data = y[split_data:]

train_time = y.index[:split_data]
test_time = y.index[split_data:]
print("train_data_shape")
print(train_data.shape)
print("test_data_shape")
print(test_data.shape)

In [None]:
# Build the standard scaler, Use to fit the train data and take the statistic of train data of train data to apply in test data.
scaler = StandardScaler().fit(train_data.values.reshape(-1, 1))
train_data_standard = scaler.transform(train_data.values.reshape(-1, 1))
test_data_standard = scaler.transform(test_data.values.reshape(-1, 1))

In [None]:
print(f"train_data_standard shape : {train_data_standard.shape}")
print(f"test_data_standard shape : {test_data_standard.shape}")

In [None]:
### BEGIN SOLUTION
trainX ,trainY =  data_module.univariate_multi_step(train_data_standard,window_size,n_step)
testX , testY = data_module.univariate_multi_step(test_data_standard,window_size,n_step)
### END SOLUTION
print(f"trainX shape:{trainX.shape} trainY shape:{trainY.shape}\n")
print(f"testX shape:{testX.shape} testY shape:{testY.shape}")

In [None]:
def key_assign(trainingX,testingX,trainingY,testingY):
    """ 
    Use to assign  the key to create the train_data_dict and test_data_dict
    
    Arguments:
    trainingX -- feature for traning data 
    testingX -- feature for testing data
    trainingY -- label for traning data
    testingY -- label for testing data
    
    Returns: 
    train_data_dict -- dictionary of trainingX and trainingY
    test_data_dict -- dictionary of testingX and testingY
    """
    ### BEGIN SOLUTION
    
    # Create a dictionary that can store the train set feature and label
    train_data_dict = {"train_data_x_feature" : trainingX, "train_data_y_label" : trainingY}
    
    # Create a dictionary that can store the test set feature and label
    test_data_dict  = {"test_data_x_feature" : testingX , "test_data_y_label" : testingY }
    
    ### END SOLUTION
    return train_data_dict , test_data_dict

train_data_dictionary , test_data_dictionary = key_assign(trainingX = trainX,
                                 testingX = testX,
                                 trainingY = trainY,
                                 testingY = testY)


In [None]:
def transform(train_data_dict, test_data_dict):
    """ 
    Transform the NumPy data to torch tensor
    
    Arguments:
    train_data_dict -- train data dictionary 
    test_data_dict -- test data dictionary
    
    Returns: 
    train_data_dict -- train data dictionary 
    test_data_dict -- test data dictionary
    """
    ### BEGIN SOLUTION
    for train_datapoint in train_data_dict:
        train_data_dict[train_datapoint] =  torch.from_numpy(train_data_dict[train_datapoint]).type(torch.Tensor)
        
    for test_datapoint in test_data_dict:
        test_data_dict[test_datapoint] = torch.from_numpy(test_data_dict[test_datapoint]).type(torch.Tensor)
        
    ### END SOLUTION

    return train_data_dict,test_data_dict

train_data_dictionary,test_data_dictionary = transform(train_data_dictionary,test_data_dictionary)

In [None]:
def sanity_check(data_1,data_2):
    """ 
    Print the shape of data_1 and data_2
    
    Arguments:
    data_1 -- (dict) type of data
    data_2 -- (dict) type of data 
    """
    
    ### BEGIN SOLUTION

    for key_1 in data_1:
        print(key_1 +" shape : " + str(data_1[key_1].shape))
    for key_2 in data_2:
        print(key_2 +" shape : " + str(data_2[key_2].shape))
        
    ### END SOLUTION
# Sanity check
sanity_check(train_data_dictionary,test_data_dictionary)

In [None]:
# Create Iterator
def iterator(train_data_dict,test_data_dict,batch_size):
    """ 
    Create iterator for train data and test data 
    
    Arguments:
    train_data_dict -- train data dictionary 
    test_data_dict -- test data dictionary
    
    Returns: 
    train_iter -- train data iterator 
    test_iter -- test data iterator 
    """
    ### BEGIN SOLUTION
    train_dataset = TensorDataset(train_data_dict["train_data_x_feature" ],
                                  train_data_dict["train_data_y_label"])
    train_iter = DataLoader(train_dataset,batch_size=batch_size,shuffle=False)

    test_dataset = TensorDataset(test_data_dict["test_data_x_feature"],
                                 test_data_dict["test_data_y_label"])
    test_iter = DataLoader(test_dataset,batch_size=batch_size,shuffle=False)
    ### END SOLUTION
    
    return train_iter , test_iter

train_iter , test_iter = iterator(train_data_dictionary,test_data_dictionary,batch_size)

In [None]:
# seed
torch.manual_seed(123)

#Arguments for LSTM model
hidden_dim = 1
n_feature = 1 
n_step = 2

#1 for vanila LSTM , >1 is mean stacked LSTM
num_layers = 1


### BEGIN SOLUTION
#Vanila , Stacked LSTM
model = deep_learning_module.LSTM(n_feature = n_feature ,
                         hidden_dim = hidden_dim ,
                         num_layers = num_layers,
                         n_step = n_step)
#Bidirectional LSTM
# model = deep_learning_module.BidirectionalLSTM(n_feature = n_feature ,
#                          hidden_dim = hidden_dim ,
#                          num_layers = num_layers,
#                          n_step = n_step)
### END SOLUTION

In [None]:
#loss function 
loss_fn = torch.nn.MSELoss()

#optimiser
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

#dropout
# dropout = nn.Dropout()

In [None]:
inputs = torch.zeros((batch_size,window_size,1),dtype=torch.float) # batch size , seq_length , input_dim
print(summary(model,inputs))

In [None]:
# Start Training 
torch.manual_seed(123)
train_loss,val_loss = deep_learning_module.training(num_epochs,train_iter,test_iter,optimizer,loss_fn,model)

In [None]:
data_module.learning_curve(num_epochs,train_loss,val_loss)

In [None]:
data_module.zoom_learning_curve(start_epoch = 50,
                                end_epoch =60,
                                training_loss = train_loss,
                                validation_loss = val_loss)

In [None]:
# Section 1 : Feed in the train and test data to the model
with torch.no_grad():
    y_train_prediction = model(train_data_dictionary["train_data_x_feature"])
    y_test_prediction = model(test_data_dictionary["test_data_x_feature"])
    

In [None]:
def key_assign_evaluation(y_train_prediction,
                          y_test_prediction,
                          train_data_dictionary,
                          test_data_dictionary):
    """ 
    Assign key for prediction and output data dictionary 
    
    Arguments:
    y_train_prediction -- (tensor) prediction for training data
    y_test_prediction -- (tensor) prediction for test data
    train_data_dictionary -- (dict) train data dictionary
    test_data_dictionary -- (dict) test data dictionary
    
    
    Returns: 
    prediction -- (dict) dictionary that consists of prediction from train data and test data
    output_data -- (dict) dictionary that consists of output(label) from train data and test data
    """
    ### BEGIN SOLUTION
    prediction ={"train_data_prediction" : y_train_prediction,
            "test_data_prediction" :y_test_prediction }
    output_data ={"train_data_output" : train_data_dictionary["train_data_y_label"] ,
               "test_data_output" : test_data_dictionary["test_data_y_label"]}
    ### END SOLUTION
    return prediction , output_data

prediction , output_data = key_assign_evaluation(y_train_prediction,y_test_prediction,
                                                 train_data_dictionary,
                                                 test_data_dictionary)     

In [None]:
# Check the prediction and output shape
sanity_check(data_1 = prediction,data_2 = output_data)

In [None]:
# Section 2 : Reshape both to the original data dimension
def squeeze_dimension(output):
    """ 
    Squeeze the dimension of output data
    
    Arguments:
    output -- (dict) output_data
    
    Returns: 
    output_data -- (dict) output_data
    """
    ### BEGIN SOLUTION
    for key in output:
        output[key] = torch.squeeze(output[key],2)
    ### END SOLUTION
    return output

output_data = squeeze_dimension(output_data)

In [None]:
# Check the output shape
sanity_check(data_1 = output_data,data_2 = {})

In [None]:
# Section 3 : Invert the scaling back to orignal data value
def inverse_scaler(scaled_data,scaler):
    """ 
    Inverse the scaled data
    
    Arguments:
    scaled_data -- (dict) data that being scaled 
    scaler -- scaler 
    
    Returns: 
    scaled_data -- (dict) data after inverse scale
    """
    ### BEGIN SOLUTION
    for item in scaled_data:
        scaled_data[item] =  scaler.inverse_transform(scaled_data[item].detach().numpy())
    ### END SOLUTION
    return scaled_data
    
prediction = inverse_scaler(prediction,scaler)
output_data  = inverse_scaler(output_data ,scaler)


In [None]:
sanity_check(data_1 = prediction,data_2 = output_data )

In [None]:
def list_forecast_value(output_data,prediction):
    """ 
    To list the test output and prediction output side by side
    
    Arguments:
    output_data --  (dict) output data dictionary
    prediction -- (dict) prediction output dictionary
    """
    ### BEGIN SOLUTION
    print("Test Data\t\t\tForecast")
    for test, forecast in zip(output_data["test_data_output"],prediction["test_data_prediction"]):   
        print(f"{test}\t\t{forecast}")
    ### END SOLUTION
        
list_forecast_value(output_data,prediction)        

In [None]:
# Section 4 : Calculate the RMSE of train and test data
def rmse(prediction,output_data):
    """ 
    Calculate RMSE between output data and prediction data 
    
    Arguments:
    prediction -- (dict) prediction output dictionary
    output_data --  (dict) output data dictionary
    
    Returns:
    trainScore - RMSE of train dataset
    testScore - RMSE of test dataset
    """
    trainScore = math.sqrt(mean_squared_error(prediction["train_data_prediction"], output_data["train_data_output"]))
    testScore = math.sqrt(mean_squared_error(prediction["test_data_prediction"], output_data["test_data_output"]))
    return trainScore,testScore

trainScore,testScore = rmse(prediction,output_data)
print('Train Score: %.2f RMSE' % (trainScore))
print('Test Score: %.2f RMSE' % (testScore))

In [None]:
plot_details ={"x-axis" : "Date",
          "y-axis" : "Values",
          "title"  : "Technology Sales"
         }

In [None]:
# Plot forecast plot for multi-step
def multi_step_plot(original_test_data,
                    after_sequence_test_data ,
                    forecast_data,test_time,window_size,
                    n_step ,
                    details = {},
                    original_plot = False):
    
    """ 
    Plot the result of the multi-step forecast 
    
    Arguments:
    
    original_test_data -- test data before sequence
    
    after_sequence_test_data -- (dict) output data dictionary
    
    forecast_data -- (dict) prediction data dictionary
    
    test_time --  time index for test data before sliding window (data sequence)
    
    window_size -- window size for the data sequence
    
    n_step -- the number of future step , 1 -> single >1 -> multi-step
    
    details -- (dict) details for plot such as "x-axis" ,"y-axis", "title"
    
    original_plot -- (boolean) True ->observe how sliding window (data sequence) take place in the test data
    
    """
    
    after_sequence_test_data = after_sequence_test_data['test_data_output'] 
    forecast_data = forecast_data["test_data_prediction"]
    
    # Plot Setting
    plt.figure(figsize=(10,6))
    plt.xticks(rotation=45)    
    
    # Store test and forecast data into DataFrame type 
    column_names = ["timestep_" + str(i) for i in range(after_sequence_test_data.shape[1])]
    y_test_dataframe = pd.DataFrame(after_sequence_test_data,columns = column_names)
    y_test_pred_dataframe =pd.DataFrame(forecast_data,columns = column_names)
    
    # Create time index for data after sequence
    time_index_after_sequence = test_time[window_size:]
    
    # Test Data plot before sliding window(data sequencing)
    if original_plot:
        plt.plot(test_time,original_test_data,marker='x',color="blue")

    # For loop to plot the data step by step base on time index    
    start_idx = 0 
    for row in range(len(y_test_dataframe)):
        
        # Iterate the time index after sequence
        time_index = time_index_after_sequence[start_idx:start_idx+n_step]
        
        # Plot the test data
        plt.plot(time_index,y_test_dataframe.iloc[row],color="green",marker='o')
        
        # Plot the forecast data
        plt.plot(time_index,y_test_pred_dataframe.iloc[row],color="red",marker='o')
        
        # Pointer for time_index_after_sequence
        start_idx += 1
        
    # Customize the legend
    custom_lines = [Line2D([0], [0], color="green", lw=4),
                Line2D([0], [0], color="red", lw=4),
                Line2D([0], [0], color="blue", lw=4)]
    plt.legend(custom_lines, ['Test Data After Sequencing', 'Forecast Data', 'Test Data Before Sequencing'])
    
    # Extra details - Optional function
    if details != {}:
        plt.xlabel(details["x-axis"])
        plt.ylabel(details["y-axis"])
        plt.title(details["title"])
    

In [None]:
# Use the multi_step_plot function
multi_step_plot(original_test_data = test_data,
                after_sequence_test_data = output_data ,
                forecast_data = prediction,
                test_time = test_time,
                window_size = window_size ,
                n_step = n_step,
                details = plot_details,
                original_plot = False)