In [1]:
# Loading the dataset suzuki_data.csv with dimensions: 5252x14
# Stock price data of Pak Suzuki Motor Company Limited (Pakistan)
# Stock prices daily data from 01 Jan 2001 to 17 Nov 2022 

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

df = pd.read_csv('suzuki_data.csv')

## Setting Hyperparameters
MAX_epochs = 100       # Maximum number of epochs
tol = 0.000001         # Tolerance
alpha = 0.01           # Learning rate
m = 100                # Number of instances in a minibatch
window_size = 10
train_test_ratio = 0.95

# Setting Layers and Number of neurons in each layer
ldim = [window_size, 20, 30, 10, 5, 1]
L = len(ldim) - 1    # Number of layers (other than the input layer)

ldim = np.array(ldim, dtype=int)
np.random.seed(1)

print("Original Dataset Head:")
print(df.head())

print("Original Dataset Tail:")
print(df.tail(5))

# Loading and making the data ready in desired format

df = df.dropna()                # data cleaning as desired

# Selecting the relevant column for training and prediction
df_main = df['Close']
df_main = np.array(df_main).reshape(-1,1)

# normalizing the data between 0 to 1
normalizing = MinMaxScaler(feature_range=(0,1))
normalized_data = normalizing.fit_transform(df_main)

## Spliting the time series data into train and test sets
## Temporal order of the data is preserved

train_size = int(len(normalized_data) * train_test_ratio)
train_data = normalized_data[:train_size]
test_data  = normalized_data[train_size:]

# Function for creating datasets using Sliding Window Technique
# This is a common technique for time series prediction using FNN
# Slider window size is the time step

def dataset_creator(window_size, data):
    X, Y = [], []
    for i in range(window_size, len(data)):
        X.append(data[i-window_size:i, 0])
        Y.append(data[i, 0])
    return np.array(X), np.array(Y)

# Creating the training and test sets
X_train, Y_train = dataset_creator(window_size, train_data)
X_test, Y_test = dataset_creator(window_size, test_data)


Original Dataset Head:
         Date   Open   High    Low  Close   Volume
0  2001-01-01  10.25  10.25  10.25  10.25   1000.0
1  2001-01-02  10.25  11.50  10.70  11.30  10500.0
2  2001-01-03  11.30  11.30  10.75  10.75   6500.0
3  2001-01-04  10.75  11.30  11.25  11.25   3000.0
4  2001-01-05  11.25  11.30  11.05  11.05   6500.0
Original Dataset Tail:
            Date    Open    High     Low   Close    Volume
5247  2022-11-11  161.50  162.98  159.00  160.84   18222.0
5248  2022-11-14  159.00  159.85  156.94  157.25   52296.0
5249  2022-11-15  158.50  164.50  157.01  162.27  151394.0
5250  2022-11-16  163.00  164.00  160.00  160.29   49327.0
5251  2022-11-17  161.69  161.70  158.60  159.21   43334.0


In [2]:
##============= Formatting the training set =============##

N = X_train.shape[0]   # Total number of training instances
gamma = int( np.ceil(N/m) )

## converting Pandas objects to NumPy arrays and desired dimensions 

MM = X_train.T
YY = Y_train.reshape(1, -1)

print("Training Set Features ready with dimensions: " , MM.shape)
print("Training Set Target ready with dimensions: " , YY.shape)

##------------- Formatting the training set -------------##


##============= Formatting the test set =============##

Nt = X_test.shape[0]   # Total number of training instances

## converting pandas objects to numpy arrays and desired dimensions 
Mt = X_test.T
Yt = Y_test.T

print("Test Set Features ready with dimensions: " , Mt.shape )
print("Test Set Target ready with dimensions: " , Yt.shape )

##------------- Formatting the test set -------------##


Training Set Features ready with dimensions:  (10, 4979)
Training Set Target ready with dimensions:  (1, 4979)
Test Set Features ready with dimensions:  (10, 253)
Test Set Target ready with dimensions:  (253,)


In [3]:
## Defining activitation functions and their derivatives

def sigmoid(Z):
    return 1 / (1 + np.exp(-Z))

def sigmoid_derivative(Z):
    return sigmoid(Z) * (1 - sigmoid(Z))

def relu(Z):
    return np.maximum(0, Z)

def relu_derivative(Z):
    return np.where(Z <= 0, 0, 1)


## Defining loss functions and their derivatives

def quadratic_cost(P, Q):
    return 0.5 * np.linalg.norm(P - Q) ** 2

def quadratic_cost_derivative(P, Q):
    return Q - P

def binary_cross_entropy(P, Q):
    err = -(np.dot(P,np.log(Q)) + np.dot((1-P),np.log(1-Q)))
    #err = -(P*np.log(Q) + (1-P)*np.log(1-Q))
    #err = np.squeeze(err)
    return  err
    
def binary_cross_entropy_derivative(P, Q):
    return (Q-P)/((1-Q) * Q)


In [4]:
## Defining the function that trains/optimizes model parameters

def trainer(L, N, m, gamma, alpha, Nt, MAX_epochs, ldim, MM, YY):

  ##  Initializing the parameters (weights and biases)
    W = np.zeros( (L+1, np.max(ldim) , np.max(ldim)) )
    B = np.zeros( (L+1, np.max(ldim) , 1))
    dW = np.zeros( (L+1, np.max(ldim) , np.max(ldim)) )
    dB = np.zeros( (L+1, np.max(ldim) , 1) )

    for l in range(1, L + 1):
        W[ l , 0:ldim[l] , 0:ldim[l-1] ] = np.random.randn( 
                                        ldim[l], ldim[l-1] ) * 0.01
        B[ l , 0:ldim[l] , : ] = np.zeros( (ldim[l], 1) )


    ##  Creating auxiliary data structures

    Y_hat = np.zeros( (L+1, np.max(ldim), m) )
    Z     = np.zeros( (L+1, np.max(ldim), m) )
    dZ    = np.zeros( (L+1, np.max(ldim), m) )

    ## Loop over epochs

    for k in range(1, MAX_epochs+1):
    
        # print('\n========================= Epoch number: ', k)
        # Making a random rearrangement of training instances
        permut = np.random.permutation(N)
        # permut = np.array([i for i in range(0,N)])
        MM = MM[:, permut]
        YY = YY[:, permut]

        ## Loop over iterations / steps
    
        start_idx = -m
        end_idx = 0
        for g in range(1, gamma+1):
           
            start_idx = start_idx + m
            end_idx = end_idx + m
        
            if end_idx>N:
                end_idx = N
        
            mc = end_idx-start_idx
        
            # M (n0xm) is sampled from MM (n0xN)
            M = MM[:, start_idx:end_idx]
            # Y (1xm) is sampled from YY (1xN)
            Y = YY[:, start_idx:end_idx]
      
            ## Carrying out computtaions of the Forward Pass
            C = 0
            Y_hat[0, 0:ldim[0], 0:mc] = M  
        
            for l in range(1, L + 1):
                wx = W[l, 0:ldim[l], 0:ldim[l-1] ]
                yold = Y_hat[l-1, 0:ldim[l-1], 0:mc]
                bx = B[l, 0:ldim[l], :]
            
                zx = np.dot(wx, yold) + bx
                Z[l, 0:ldim[l], 0:mc] = zx           
            
                #if l==L:
                yx = sigmoid(zx)
                #else:
                #    yx = relu(zx)
                
                Y_hat[l, 0:ldim[l], 0:mc] = yx

            ## computing cost function
            for p in range(0,mc):
                cp = quadratic_cost(Y[0:ldim[L],p], Y_hat[L,0:ldim[L], p])
                #cp = binary_cross_entropy( Y[0:ldim[L], p],
                                                #Y_hat[L, 0:ldim[L], p])
                C += cp   
            
            C /= mc
        
        
            ## Computing gradients through backpropagation
            
            dY_hat_L = quadratic_cost_derivative(Y[0:ldim[L], 0:mc],
                                                 Y_hat[L, 0:ldim[L], 0:mc])
            #dY_hat_L = binary_cross_entropy_derivative(Y[0:ldim[L], 0:mc],
                                                #Y_hat[L, 0:ldim[L], 0:mc])
            
            dZ[L, 0:ldim[L], 0:mc] = dY_hat_L * sigmoid_derivative(
                                                    Z[L, 0:ldim[L], 0:mc])
            
            for l in range(L-1, 0, -1):
                wt = W[l+1, 0:ldim[l+1], 0:ldim[l]].T
                dzt = dZ[l+1, 0:ldim[l+1],  0:mc]
                zt = Z[l, 0:ldim[l], 0:mc]
                #dZ[l,0:ldim[l],0:mc]= np.dot(wt,dzt)*relu_derivative(zt)
                dZ[l,0:ldim[l],0:mc]= np.dot(wt,dzt)*sigmoid_derivative(zt)

            for l in range(L, 0, -1):
                dZl = dZ[l, 0:ldim[l], 0:mc]
                Y_prev = Y_hat[l-1, 0:ldim[l-1], 0:mc]
                dW[l, 0:ldim[l] , 0:ldim[l-1]] = (np.dot(dZl, Y_prev.T))/mc
                dB[l, 0:ldim[l] , :] =  (np.sum(dZl, 
                                axis=1, keepdims=True))/mc
            
            ## Updating parameters using the Gradient Descent method
            for l in range(L, 0, -1):
                W[ l, 0:ldim[l] , 0:ldim[l-1] ] = W[ l, 0:ldim[l] ,
                    0:ldim[l-1] ] - alpha*dW[ l, 0:ldim[l] , 0:ldim[l-1] ]
                B[ l, 0:ldim[l] , : ] = B[ l, 0:ldim[l] , : ] \
                                            - alpha*dB[ l, 0:ldim[l] , : ]
            
        ## Loop terminated, over iterations / steps
    
    ## Loop terminated, over epochs
   
    print(f'After {k} epochs, the training Error (Cost function)= {C}')
    
    return W, B
    # function trainer() ended
    

In [5]:
## Defining the function that make prediction

def predicting(L, ldim, W, B, Mt, Nt):

    C = 0
    Y_hat1 = np.zeros( (L+1, np.max(ldim), Nt) )
    Y_hat1[0, 0:ldim[0], 0:Nt] = Mt

    for l in range(1, L + 1):
            wx = W[l, 0:ldim[l], 0:ldim[l-1] ]
            yold = Y_hat1[l-1, 0:ldim[l-1], 0:Nt]
            bx = B[l, 0:ldim[l], :]
            
            zx = np.dot(wx, yold) + bx
            #Z1[l, 0:ldim[l], 0:Nt] = zx           
            #if l==L:
            yx = sigmoid(zx)
            #else:
            #    yx = relu(zx)
                
            Y_hat1[l, 0:ldim[l], 0:Nt] = yx
            
    return yx
    # function predicting() ended

In [6]:
## Defining the function for calculating MAE and RMSE

def  reg_errors(Y_test, Y_predict_normalized):

    Y_predict = normalizing.inverse_transform(Y_predict_normalized)
    Y_test1 = normalizing.inverse_transform(Y_test.reshape(-1,1))

    from sklearn.metrics import mean_absolute_error
    from sklearn.metrics import mean_squared_error

    mae = mean_absolute_error(Y_test1,Y_predict.T)
    mse = mean_squared_error(Y_test1,Y_predict.T)
    rmse = np.sqrt(mse)

    print("MAE in the prediction for the test data:", mae)
    print("RMSE in the prediction for the test data: ", rmse)
    
    # function reg_errors() ended

In [8]:
print('\n========== Training epochs started ==========\n')
W, B = trainer(L, N, m, gamma, alpha, Nt, MAX_epochs, ldim, MM, YY)
print('\n========== Training epochs completed ==========')
print("\n")


## using the optimized parameters for making predictions
## The Forward Pass with the test data
print("\n~~~~~ Forward Pass with the test data started ~~~~~") 
Y_predict_normalized = predicting(L, ldim, W, B, Mt, Nt)
print("\n================== Prediction Completed ==================") 
    
    
print("\n======================= Test Errors =======================")
reg_errors(Y_test, Y_predict_normalized)



After 100 epochs, the training Error (Cost function)= 0.015396965552653855




~~~~~ Forward Pass with the test data started ~~~~~


MAE in the prediction for the test data: 30.344541839324346
RMSE in the prediction for the test data:  37.36703073558258
