<a href="https://colab.research.google.com/github/harshitt018/Advance-Data-Science/blob/main/LSTM_and_RNN_on_Google_Stock_Price.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("medharawat/google-stock-price")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/medharawat/google-stock-price?dataset_version_number=1...


100%|██████████| 23.4k/23.4k [00:00<00:00, 39.4MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/medharawat/google-stock-price/versions/1





In [49]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, confusion_matrix
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, SimpleRNN, Dropout, Dense, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import warnings

warnings.filterwarnings("ignore")
tf.random.set_seed(42)
np.random.seed(42)

# Load datasets
train_df = pd.read_csv("/root/.cache/kagglehub/datasets/medharawat/google-stock-price/versions/1/Google_Stock_Price_Train.csv")
test_df = pd.read_csv("/root/.cache/kagglehub/datasets/medharawat/google-stock-price/versions/1/Google_Stock_Price_Test.csv")

# Columns to clean
cols = ['Open', 'High', 'Low', 'Close', 'Volume']
for df in [train_df, test_df]:
    df[cols] = df[cols].apply(lambda x: x.astype(str).str.replace(',', '').astype(float))

In [50]:
def add_technical_indicators(df):
    df['MA_10'] = df['Close'].rolling(window=10).mean()
    df['MA_20'] = df['Close'].rolling(window=20).mean()

    # RSI calculation
    delta = df['Close'].diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.rolling(window=14).mean()
    avg_loss = loss.rolling(window=14).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))

    df.fillna(method='bfill', inplace=True)
    return df

train_df = add_technical_indicators(train_df)
test_df = add_technical_indicators(test_df)

feature_cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'MA_10', 'MA_20', 'RSI']
train_df = train_df[feature_cols]
test_df = test_df[feature_cols]

In [51]:
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_df)
test_scaled = scaler.transform(test_df)

def create_sequences(data, seq_len=60):
    X, y_price, y_dir = [], [], []
    for i in range(seq_len, len(data)):
        X.append(data[i-seq_len:i])
        y_price.append(data[i, 0])
        y_dir.append(1 if data[i, 0] > data[i-1, 0] else 0)
    return np.array(X), np.array(y_price), np.array(y_dir)

SEQ_LEN = 60
X_train, y_train_price, y_train_dir = create_sequences(train_scaled, SEQ_LEN)

# Adjust SEQ_LEN if test data is smaller
SEQ_LEN_test = min(SEQ_LEN, len(test_scaled)-1)
X_test, y_test_price, y_test_dir = create_sequences(test_scaled, SEQ_LEN_test)

print(f"Train sequences: {X_train.shape}, Test sequences: {X_test.shape}")

Train sequences: (1198, 60, 8), Test sequences: (1, 19, 8)


In [52]:
def build_price_model(model_type='LSTM'):
    model = Sequential()
    if model_type == 'LSTM':
        model.add(LSTM(32, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
        model.add(LSTM(32))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
    elif model_type == 'RNN':
        model.add(SimpleRNN(32, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
        model.add(SimpleRNN(32))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
    model.add(Dense(1))
    model.compile(optimizer=Adam(0.001), loss='mean_squared_error')
    return model

def build_direction_model(model_type='LSTM'):
    model = Sequential()
    if model_type == 'LSTM':
        model.add(LSTM(32, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
        model.add(LSTM(32))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
    elif model_type == 'RNN':
        model.add(SimpleRNN(32, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
        model.add(SimpleRNN(32))
        model.add(BatchNormalization())
        model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(0.001), loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [55]:
# Price Prediction Training
print("Training LSTM Price Prediction Model...")
history_lstm_price = model_lstm.fit(
    X_train, y_train_price,
    epochs=30, batch_size=32,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1,
    shuffle=True
)

print("Training RNN Price Prediction Model...")
history_rnn_price = model_rnn.fit(
    X_train, y_train_price,
    epochs=30, batch_size=32,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1,
    shuffle=True
)

# Direction Classification Training
print("Training LSTM Direction Classification Model...")
history_lstm_dir = model_lstm_dir.fit(
    X_train, y_train_dir,
    epochs=30, batch_size=32,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1,
    shuffle=True
)

print("Training RNN Direction Classification Model...")
history_rnn_dir = model_rnn_dir.fit(
    X_train, y_train_dir,
    epochs=30, batch_size=32,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1,
    shuffle=True
)

Training LSTM Price Prediction Model...
Epoch 1/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 23ms/step - loss: 2.0145 - val_loss: 0.3415 - learning_rate: 0.0010
Epoch 2/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 14ms/step - loss: 1.2680 - val_loss: 0.2461 - learning_rate: 0.0010
Epoch 3/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 1.1147 - val_loss: 0.1424 - learning_rate: 0.0010
Epoch 4/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 29ms/step - loss: 0.8939 - val_loss: 0.1760 - learning_rate: 0.0010
Epoch 5/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 26ms/step - loss: 0.7945 - val_loss: 0.1147 - learning_rate: 0.0010
Epoch 6/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - loss: 0.7143 - val_loss: 0.1150 - learning_rate: 0.0010
Epoch 7/30
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step - loss: 0.6431 - 

In [56]:
# Predict direction
pred_lstm_dir = (model_lstm_dir.predict(X_test) > 0.5).astype(int).flatten()
pred_rnn_dir = (model_rnn_dir.predict(X_test) > 0.5).astype(int).flatten()

# Accuracy
acc_lstm_dir = accuracy_score(y_test_dir, pred_lstm_dir)
acc_rnn_dir = accuracy_score(y_test_dir, pred_rnn_dir)

print(f"LSTM Direction Accuracy: {acc_lstm_dir*100:.2f}%")
print(f"RNN Direction Accuracy: {acc_rnn_dir*100:.2f}%")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 182ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 572ms/step
LSTM Direction Accuracy: 100.00%
RNN Direction Accuracy: 100.00%
