In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
stock_name = "TATAPOWER.NS"

In [None]:
import os

In [None]:
model_dir = f"/content/drive/MyDrive/sp_models/{stock_name}"

if not os.path.exists(model_dir):
    os.makedirs(model_dir)


In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
import tensorflow as tf
import os

In [None]:
data = yf.download("TATAPOWER.NS" , start = "2018-01-01" , interval = '1d')

[*********************100%***********************]  1 of 1 completed


In [None]:
csv_file = os.path.join(model_dir, f"{stock_name}.csv")

In [None]:
data.to_csv(csv_file)

In [None]:
data.shape

(1322, 6)

In [None]:
data.head(3)

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2018-01-01,93.800003,100.0,92.400002,97.150002,83.893021,33659894
2018-01-02,98.25,100.25,96.150002,99.699997,86.095039,29174233
2018-01-03,100.300003,101.599998,99.599998,100.800003,87.044937,23056832


Understanding Trends with in the Data

In [None]:
# Sort the data points based on indexes just for confirmation 
data.sort_index(inplace = True)

In [None]:
# Remove any duplicate index 
data = data.loc[~data.index.duplicated(keep='first')]

In [None]:
data.tail(3)

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2023-05-08,201.5,203.199997,199.5,202.699997,202.699997,7724617
2023-05-09,203.75,205.100006,202.399994,203.300003,203.300003,7956883
2023-05-10,203.5,204.350006,202.199997,204.149994,204.149994,3776084


In [None]:
# Check for missing values 
data.isnull().sum()

Open         0
High         0
Low          0
Close        0
Adj Close    0
Volume       0
dtype: int64

In [None]:
# Get the statistics of the data
data.describe()

Unnamed: 0,Open,High,Low,Close,Adj Close,Volume
count,1322.0,1322.0,1322.0,1322.0,1322.0,1322.0
mean,118.647844,120.442057,116.548903,118.414751,115.13093,22061700.0
std,71.890963,72.815892,70.738208,71.712002,73.201211,29250950.0
min,27.25,28.6,27.0,27.299999,25.914843,437631.0
25%,63.400002,64.5,61.7875,63.275001,59.669462,6078360.0
50%,80.75,81.699997,79.724998,80.649998,72.565243,12053660.0
75%,205.225002,207.062496,202.337505,204.775002,204.775002,27181780.0
max,292.350006,298.049988,280.350006,289.799988,287.498932,360661800.0


In [None]:
import plotly.graph_objects as go

# Check the trend in Closing Values 
fig = go.Figure()

fig.add_trace(go.Scatter(x = data.index , y = data['Close'] , mode = 'lines'))
fig.update_layout(height = 500 , width = 900, 
                  xaxis_title='Date' , yaxis_title='Close')
fig.show()

In [None]:
# Check the trend in Volume Traded
fig = go.Figure()

fig.add_trace(go.Scatter(x = data.index , y = data['Volume'] , mode = 'lines'))
fig.update_layout(height = 500 , width = 900, 
                  xaxis_title='Date' , yaxis_title='Volume')
fig.show()

Data Preparation

In [None]:
from sklearn.preprocessing import MinMaxScaler 
import pickle 
from tqdm.notebook import tnrange

In [None]:
# Filter only required data 
data = data[['Close' , 'Volume']]
data.head(3)

Unnamed: 0_level_0,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2018-01-01,97.150002,33659894
2018-01-02,99.699997,29174233
2018-01-03,100.800003,23056832


In [None]:
# Confirm the Testing Set length 
test_length = data[(data.index >= '2021-03-01')].shape[0]

In [None]:
def CreateFeatures_and_Targets(data, feature_length):
    X = []
    Y = []

    for i in tnrange(len(data) - feature_length): 
        X.append(data.iloc[i : i + feature_length,:].values)
        Y.append(data["Close"].values[i+feature_length])

    X = np.array(X)
    Y = np.array(Y)

    return X , Y

In [None]:
X , Y = CreateFeatures_and_Targets(data , 32)

  0%|          | 0/1290 [00:00<?, ?it/s]

In [None]:
# Check the shapes
X.shape , Y.shape

((1290, 32, 2), (1290,))

In [None]:
Xtrain , Xtest , Ytrain , Ytest = X[:-test_length] , X[-test_length:] , Y[:-test_length] , Y[-test_length:]

In [None]:
# Check Training Dataset Shape 
Xtrain.shape , Ytrain.shape

((748, 32, 2), (748,))

In [None]:
# Check Testing Dataset Shape
Xtest.shape , Ytest.shape

((542, 32, 2), (542,))

In [None]:
# Create a Scaler to Scale Vectors with Multiple Dimensions 
class MultiDimensionScaler():
    def __init__(self):
        self.scalers = []

    def fit_transform(self , X):
        total_dims = X.shape[2]
        for i in range(total_dims):
            Scaler = MinMaxScaler()
            X[:, :, i] = Scaler.fit_transform(X[:, :, i])
            self.scalers.append(Scaler)
        return X

    def transform(self , X):
        for i in range(X.shape[2]):
            X[:, :, i] = self.scalers[i].transform(X[:,:,i])
        return X 

In [None]:
Feature_Scaler = MultiDimensionScaler()
Xtrain = Feature_Scaler.fit_transform(Xtrain)
Xtest = Feature_Scaler.transform(Xtest)

In [None]:
Target_Scaler = MinMaxScaler()
Ytrain = Target_Scaler.fit_transform(Ytrain.reshape(-1,1))
Ytest = Target_Scaler.transform(Ytest.reshape(-1,1))

In [None]:


checkpoint_path = os.path.join(model_dir, 'best_weights.h5')


In [None]:
def save_object(obj , name : str):
    name_f = os.path.join(model_dir, f"{name}.pck")
    pickle_out = open(name_f,"wb")
    pickle.dump(obj, pickle_out)
    pickle_out.close()

def load_object(name : str):
    name_f = os.path.join(model_dir, f"{name}.pck")
    pickle_in = open(name_f,"rb")
    data = pickle.load(pickle_in)
    return data

In [None]:
# Save your objects for future purposes 
save_object(Feature_Scaler , "Feature_Scaler")
save_object(Target_Scaler , "Target_Scaler")

Model Building

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint , ReduceLROnPlateau

save_best = ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True, save_weights_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.25,patience=4, min_lr=0.00001,verbose = 1)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense , Dropout , LSTM , Bidirectional

model = Sequential()

model.add(Bidirectional(LSTM(512 ,return_sequences=True , recurrent_dropout=0.1, input_shape=(32, 2))))
model.add(LSTM(256 ,recurrent_dropout=0.1))
model.add(Dropout(0.15))
model.add(Dense(64 , activation='elu'))
model.add(Dropout(0.15))
model.add(Dense(32 , activation='elu'))
model.add(Dense(1 , activation='linear'))



In [None]:
#optimizer = tf.keras.optimizers.Adam(learning_rate=0.002)
optimizer = tf.keras.optimizers.SGD(learning_rate = 0.002)
model.compile(loss='mse', optimizer=optimizer)

In [None]:
history = model.fit(Xtrain, Ytrain,
            epochs=10,
            batch_size = 1,
            verbose=1,
            shuffle=False ,
            validation_data=(Xtest , Ytest),
            callbacks=[reduce_lr , save_best])

In [None]:
# Checking the model Structure 
model.summary()

In [None]:
# Load the best weights
weight_path = checkpoint_path
model.load_weights(weight_path)

Visualize prediction on Test Set

In [None]:
Predictions = model.predict(Xtest)

In [None]:
Predictions = Target_Scaler.inverse_transform(Predictions)
Actual = Target_Scaler.inverse_transform(Ytest)

In [None]:
Predictions = np.squeeze(Predictions , axis = 1)
Actual = np.squeeze(Actual , axis = 1)

In [None]:
# Creating Sample Test Dataframe
test_dataframe_dict = {'Actual' : list(Actual) , 'Predicted' : list(Predictions)}
test_df = pd.DataFrame.from_dict(test_dataframe_dict)

test_df.index = data.index[-test_length:]

In [None]:
test_df.head()

In [None]:
# Check the trend in Volume Traded
fig = go.Figure()

fig.add_trace(go.Scatter(x = test_df.index , y = Actual , mode = 'lines' , name='Actual'))
fig.add_trace(go.Scatter(x = test_df.index , y = Predictions , mode = 'lines' , name='Predicted'))
fig.show()

Visualize Prediction on whole data

In [None]:
Total_features = np.concatenate((Xtrain , Xtest) , axis = 0)

In [None]:
Total_Targets = np.concatenate((Ytrain , Ytest) , axis = 0)

In [None]:
Predictions = model.predict(Total_features)

In [None]:
Predictions = Target_Scaler.inverse_transform(Predictions)
Actual = Target_Scaler.inverse_transform(Total_Targets)

In [None]:
Predictions = np.squeeze(Predictions , axis = 1)
Actual = np.squeeze(Actual , axis = 1)

In [None]:

fig = go.Figure()

fig.add_trace(go.Scatter(x = data.index , y = Actual , mode = 'lines' , name='Actual'))
fig.add_trace(go.Scatter(x = data.index , y = Predictions , mode = 'lines' , name='Predicted'))
fig.show()

In [None]:
y_true = Actual
y_pred = Predictions
loss = tf.keras.losses.mean_absolute_percentage_error(y_true, y_pred)
print(loss)

In [None]:
#rmse
from math import sqrt
mse = tf.keras.losses.mean_squared_error(y_true, y_pred)
rmse = sqrt(mse)
print(mse, rmse)