In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Working with Paper 2
* This paper does prediction on BSE SENSEX data
* Data spans the dates of May 30, 2010, and February 9, 2018
* It uses RMSE and MSE metric
* Their best result is MSE-0.0021 RMSE-0.0438
* Paper link:-https://ieeexplore.ieee.org/document/10397684

## Trying without vix

In [None]:
import pandas as pd

# Load Sensex dataset
data = pd.read_csv("/kaggle/input/d/abirc8010/historical-india-stock-market/BSE Sensex 30 Historical Data.csv")

# Print initial number of rows
initial_count = len(data)
print(f"Initial number of rows: {initial_count}")

# Reverse and reset index
data = data[::-1].reset_index(drop=True)

# Sort columns alphabetically and rename 'Price' to 'Close'
data.sort_index(axis=1, ascending=True, inplace=True)
data.rename(columns={'Price': 'Close'}, inplace=True)

# Create a copy to avoid modifying the original data
df = data.copy()

# Convert financial columns to numeric
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):
        vol = vol.replace(",", "")
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Convert 'Date' column to datetime format, handling mixed formats
df["Date"] = pd.to_datetime(df["Date"], dayfirst=True, errors="coerce")

# Filter Sensex data within the specified date range
start_date = "2010-05-30"
end_date = "2018-02-09"
df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date)]

# Print final number of rows
final_count = len(df)
print(f"Final number of rows after filtering: {final_count}")

# Assign back to 'data'
data = df

# Drop 'Date' column
data.drop(["Date"], axis=1, inplace=True)

# Print final dataset
print(data.head())


In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


## Working with Paper 3
* This paper does prediction on Nifty50 , Sensex and S&P 500
* The data is collected daily from 2013 to 2022
* The metric used is RMSE
* Their result is NIFTY50:-RMSE - 170.843,SENSEX:-RMSE - 578.746 S&P 500:-RMSE -  50.1650
* Paper link:-https://www.sciencedirect.com/science/article/pii/S1568494624005337

## Doing without vix

## Working with Sensex

In [None]:
data=pd.read_csv("/kaggle/input/personal-dataset/Sensex Paper1.csv")
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

#data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


## Working with Nifty50

## doing without vix

In [None]:
data=pd.read_csv("/kaggle/input/personal-dataset/Nifty50 Paper1.csv")
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
data['Vol.']=data['Vol.'].fillna("278.04M")
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

#data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
print(len(feature_cols))

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


## Working with S&P500

## Doing without vix

In [None]:


# Load the CSV file
df = pd.read_csv("/kaggle/input/personal-dateset-vix/SP 500 (2).csv")

# Ensure the required columns exist
if "Price" in df.columns and "Open" in df.columns:
    # Calculate the percentage change
    df["Change %"] = ((df["Price"] - df["Open"]) / df["Price"]) * 100

    # Save the updated CSV file
    # df.to_csv("SP 500 (2).csv", index=False)

    # Display first few rows
    print(df.head())
else:
    print("Error: CSV file must contain 'Price' and 'Open' columns.")
df.head()

In [None]:
data=df
data=data[::-1]
data.reset_index(drop=True, inplace=True)
print(data.head())
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
#df.drop("Vol.",axis=1,inplace=True)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df
#data['Date'] = pd.to_datetime(data['Date'], format="%d-%m-%Y")

# For 'vix', the date format is "MM/DD/YYYY"
#vix['Date'] = pd.to_datetime(vix['Date'], format="%m/%d/%Y")
#data = pd.merge(data, vix, on='Date', how='inner')
#row=data[data["Vol."].isna()]
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(20)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)
y_test_inv3, y_pred_inv3 = true_values_inv, predictions_inv

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()

## Working with Paper 4
* This paper does prediction on Nifty50
* The metric used is RMSE
* Their result is RMSE - 171.4
* We are training the model on data from 1st January 2020 till 31st December 2023 and testing it on data from 1st January
  2024 till 15th March 2024.
* Paper link:-https://www.semanticscholar.org/paper/Closing-Price-Prediction-for-the-NIFTY-50-Index%3A-A-Singh-Shah/d0eca144e2cd8e86c57eca34e3d9f4943f3c45f2

In [None]:
import pandas as pd
data=pd.read_csv("/kaggle/input/personal-dataset/Nifty 50 Train paper 2.csv")
data2=pd.read_csv("/kaggle/input/personal-dataset/Nifty 50 Test paper2.csv")

In [None]:
data.head(),data2.head()

In [None]:
def preprocess(data):
    data=data[::-1]
    data.reset_index(drop=True, inplace=True)
    print(data.head())
    data.nunique()

    data.sort_index(axis=1,ascending=True)
    data['Vol.']=data['Vol.'].fillna("278.04M")
    df=data
    print(df)
    data.rename(columns={'Price': 'Close'}, inplace=True)
    print(data.head()),print(data.dtypes)
    data['Vol.']=data['Vol.'].fillna("278.04M")
    df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
    for col in ["Close", "Open", "High", "Low"]:
        df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
    def convert_volume(vol):
        if isinstance(vol, str):  # Ensure it's a string before replacing
            vol = vol.replace(",", "")  # Remove any thousand separators
            if "B" in vol:
                return float(vol.replace("B", "")) * 1_000_000_000
            elif "M" in vol:
                return float(vol.replace("M", "")) * 1_000_000
            elif "K" in vol:
                return float(vol.replace("K", "")) * 1_000
        return float(vol)  # Convert directly if already a number

    df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
    df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
    print(df.dtypes)
    print(df)

# Assign back to 'data' (if needed)
    data = df
    return data
    #data=df
#data.drop("Date",axis=1,inplace=True)

data=preprocess(data)


In [None]:
data2=preprocess(data2)

In [None]:
vix = pd.read_csv("/kaggle/input/vix-paper2/vix paper 2.csv")
vix.drop("Vol.",axis=1,inplace=True)
vix["Change %"] = vix["Change %"].str.replace("%", "").astype(float)
vix.head(),vix.dtypes

In [None]:
data = pd.merge(data, vix, on='Date', how='inner')
data.drop("Date",axis=1,inplace=True)
print(data.head())


In [None]:
data2 = pd.merge(data2, vix, on='Date', how='inner')
data2.drop("Date",axis=1,inplace=True)
print(data2.head())
print(len(data2))

## doing without vix

In [None]:
data=pd.read_csv("/kaggle/input/personal-dataset/Nifty 50 Train paper 2.csv")
data2=pd.read_csv("/kaggle/input/personal-dataset/Nifty 50 Test paper2.csv")
data=preprocess(data)
data2=preprocess(data2)
data2.drop("Date",axis=1,inplace=True)
data.drop("Date",axis=1,inplace=True)

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


## Working with Paper 5
* This paper does prediction on Nifty50 and BSE Sensex
* The ranges are:-
* ## BSE( Data )
 24/02/2015-27/11/2022

* ## NSE(Data)
 27/10/2006-27/11/2022
* The metric used are RMSE,MSE,MAE
* Their best results are:-
* ## BSE 15 Day:
MAE Scaled: 0.0522
 MSE Scaled: 0.0515
 RMSE Scaled: 0.1456
* ## NSE 15 Day:
MAE Scaled: 0.0712
 MSE Scaled: 0.0521
  RMSE Scaled: 0.1365
* Paper link:-https://ieeexplore.ieee.org/document/10481618

### Sensex (without vix)

In [None]:

data=pd.read_csv("/kaggle/input/paper-5-deepstock-dataset/BSE Sensex 30 Historical Data ( paper - 5 ).csv")
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

#data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {total_params}")

## Nifty50 without vix

In [None]:
data=pd.read_csv("/kaggle/input/paper-5-deepstock-dataset/Nifty 50 Historical Data - paper( 5 ).csv")
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True) 
data['Vol.']=data['Vol.'].fillna("278.04M")
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

#data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {total_params}")

# OUR RANGE: SENSEX (15-3-2008 to 15-3-2024)

In [None]:
vix = pd.read_csv("/kaggle/input/our-data-15-3-2008-to-15-3-2024/India VIX Historical Data (3).csv")
vix.drop("Vol.",axis=1,inplace=True)
vix["Change %"] = vix["Change %"].str.replace("%", "").astype(float)
vix.head(),vix.dtypes
data=pd.read_csv("/kaggle/input/our-data-15-3-2008-to-15-3-2024/BSE Sensex 30 Historical Data (3).csv")
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

# data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {total_params}")

# OUR RANGE: NIFTY50 (15-3-2008 to 15-3-2024)

In [None]:
vix = pd.read_csv("/kaggle/input/our-data-15-3-2008-to-15-3-2024/India VIX Historical Data (3).csv")
vix.drop("Vol.",axis=1,inplace=True)
vix["Change %"] = vix["Change %"].str.replace("%", "").astype(float)
vix.head(),vix.dtypes
data=pd.read_csv("/kaggle/input/our-data-15-3-2008-to-15-3-2024/Nifty 50 Historical Data (2).csv")
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
data['Vol.']=data['Vol.'].fillna("278.04M")
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

#data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


# OUR RANGE: S&P 500 (15-3-2008 to 15-3-2024)

In [None]:
import pandas as pd

# Load the CSV file
df = pd.read_csv("/kaggle/input/sp500-dataset/sp500_historical_data (2).csv")

# Ensure the required columns exist
if "Price" in df.columns and "Open" in df.columns:
    # Calculate the percentage change
    df["Change %"] = ((df["Price"] - df["Open"]) / df["Price"]) * 100

    # Save the updated CSV file
    # df.to_csv("SP 500 (2).csv", index=False)

    # Display first few rows
    print(df.head())
else:
    print("Error: CSV file must contain 'Price' and 'Open' columns.")
df.head()

In [None]:
data=df
data=data[::-1]
data.reset_index(drop=True, inplace=True)
print(data.head())
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
#df.drop("Vol.",axis=1,inplace=True)

# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df
#data['Date'] = pd.to_datetime(data['Date'], format="%d-%m-%Y")

# For 'vix', the date format is "MM/DD/YYYY"
#vix['Date'] = pd.to_datetime(vix['Date'], format="%m/%d/%Y")
#data = pd.merge(data, vix, on='Date', how='inner')
#row=data[data["Vol."].isna()]
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)
y_test_inv3, y_pred_inv3 = true_values_inv, predictions_inv

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()


# OUR RANGE: NIKKIE225 (15-3-2008 to 15-3-2024)

In [None]:
'''vix = pd.read_csv("/kaggle/input/our-data-15-3-2008-to-15-3-2024/India VIX Historical Data (3).csv")
vix.drop("Vol.",axis=1,inplace=True)
vix["Change %"] = vix["Change %"].str.replace("%", "").astype(float)
vix.head(),vix.dtypes'''
data=pd.read_csv("/kaggle/input/nikkie/Nikkei 225 Historical Data.csv")
display(data)
data=data[::-1]
data.reset_index(drop=True, inplace=True)
data.head()
data.nunique()

data.sort_index(axis=1,ascending=True)
data.rename(columns={'Price': 'Close'}, inplace=True)
data.drop(["Vol."], axis=1, inplace=True)
df = data.copy()  # Ensure we don't modify the original dataset

# Convert financial columns to numeric (remove commas)
for col in ["Close", "Open", "High", "Low"]:
    df[col] = df[col].astype(str).str.replace(",", "").astype(float)

# Function to convert 'Vol.' column
'''
def convert_volume(vol):
    if isinstance(vol, str):  # Ensure it's a string before replacing
        vol = vol.replace(",", "")  # Remove any thousand separators
        if "B" in vol:
            return float(vol.replace("B", "")) * 1_000_000_000
        elif "M" in vol:
            return float(vol.replace("M", "")) * 1_000_000
        elif "K" in vol:
            return float(vol.replace("K", "")) * 1_000
    return float(vol)  # Convert directly if already a number

df["Vol."] = df["Vol."].astype(str).apply(convert_volume)
'''
# Convert 'Change %' column (remove '%' and convert to float)
df["Change %"] = df["Change %"].astype(str).str.replace("%", "").astype(float)

# Print final DataFrame
print(df.dtypes)
print(df)

# Assign back to 'data' (if needed)
data = df

# data = pd.merge(data, vix, on='Date', how='inner')
data.drop(['Date'], axis=1, inplace=True)
data.head()

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, explained_variance_score
from torch.utils.data import TensorDataset, DataLoader

# -------------------------------
# 1. Set Seeds for Reproducibility
# -------------------------------
def set_random_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seed(30)

# -------------------------------
# 2. Load & Preprocess Data
# -------------------------------
# Make sure to load your DataFrame before this step.
# For example: data = pd.read_csv('your_data.csv')
df = data.copy()  # 'data' should be defined
all_cols = df.columns.tolist()

target = "Close"
# Ensure the target column is the first in the feature list.
feature_cols = [target] + [col for col in all_cols if col != target]
df = df[feature_cols].copy()

# Convert numeric columns (remove commas if needed)
for col in feature_cols:
    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
df = df.dropna()

# -------------------------------
# 3. Create Sequences from Time Series Data
# -------------------------------
def create_sequences(data, seq_length=20, target_idx=0):
    """
    Creates sequences from data.
    Each sequence of length 'seq_length' has a target as the value at index target_idx
    of the next timestep.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length, target_idx])  # target is "Close"
    return np.array(X), np.array(y)

data_values = df.values  # All features as inputs

# Split data into train (80%) and test (20%)
train_size = int(len(data_values) * 0.8)
train_data = data_values[:train_size]
test_data  = data_values[train_size:]

# Scale the data using MinMaxScaler (applied on all features)
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled  = scaler.transform(test_data)

# Create sequences with a sequence length of 20
seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length=seq_length, target_idx=0)
X_test, y_test   = create_sequences(test_scaled, seq_length=seq_length, target_idx=0)

# Further split training data into train and validation (e.g., 87.5% train, 12.5% validation)
train_split = int(len(X_train) * 0.875)
X_val = X_train[train_split:]
y_val = y_train[train_split:]
X_train = X_train[:train_split]
y_train = y_train[:train_split]

print("Train shape:", X_train.shape)
print("Validation shape:", X_val.shape)
print("Test shape:", X_test.shape)

# -------------------------------
# 4. Prepare PyTorch Datasets & DataLoaders
# -------------------------------
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_val_t   = torch.tensor(X_val, dtype=torch.float32)
y_val_t   = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

batch_size = 32
train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset   = TensorDataset(X_val_t, y_val_t)
test_dataset  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# 5. Define Merged Custom LSTM Model
# -------------------------------
class CustomLSTMModel(nn.Module):
    def __init__(self, in_channels, seq_length=20, hidden_dim=64, dense_dim=32, scale_factor=0.2):
        """
        A custom LSTM model that internally uses its own LSTM cell logic.
        Processes the input sequence by iterating over every time step.
        """
        super(CustomLSTMModel, self).__init__()
        self.seq_length = seq_length
        self.scale_factor = scale_factor
        self.hidden_dim = hidden_dim
        
        # LSTM cell parameters (merged from CustomLSTMCell)
        self.W_z = nn.Linear(in_channels, hidden_dim)
        self.U_z = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_z = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_m = nn.Linear(in_channels, hidden_dim)
        self.U_m = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_m = nn.Parameter(torch.zeros(hidden_dim))
        
        self.W_c = nn.Linear(in_channels, hidden_dim)
        self.U_c = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        
        # Residual connection: project previous hidden state
        self.residual_linear = nn.Linear(hidden_dim, hidden_dim)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # Skip connection: project the input to hidden_dim
        self.input_linear = nn.Linear(in_channels, hidden_dim)
        
        # Dense layers for output processing
        self.dense = nn.Sequential(
            nn.Linear(hidden_dim, dense_dim),
            nn.ReLU()
        )
        self.return_layer = nn.Linear(dense_dim, 1)
    
    def forward(self, x):
        # x shape: (batch, seq_length, in_channels)
        batch_size = x.size(0)
        device = x.device
        
        # Initialize hidden and cell states to zeros
        h = torch.zeros(batch_size, self.hidden_dim, device=device)
        c = torch.zeros(batch_size, self.hidden_dim, device=device)
        
        # Process the sequence one timestep at a time
        for t in range(self.seq_length):
            x_t = x[:, t, :]
            z_t = torch.sigmoid(self.W_z(x_t) + self.U_z(h) + self.b_z)
            m_t = torch.sigmoid(self.W_m(x_t) + self.U_m(h) + self.b_m)
            candidate = torch.tanh(self.W_c(x_t) + self.U_c(h) + self.b_c)
            candidate = candidate + self.residual_linear(h)
            candidate = candidate + self.input_linear(x_t)
            candidate = self.layer_norm(candidate)
            
            c = (1 - z_t) * c + z_t * candidate
            h = m_t * torch.tanh(c)
        
        # Compute dense layers and output
        dense_out = self.dense(h)
        long_term_return = torch.tanh(self.return_layer(dense_out)) * self.scale_factor
        
        # Use the last day's "Close" price from the input sequence (assumes first feature is "Close")
        last_day = x[:, -1, 0].unsqueeze(1)
        predicted_price = last_day * (1 + long_term_return)
        return predicted_price

# -------------------------------
# 6. Training and Evaluation Pipeline
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
in_channels = len(feature_cols)
model = CustomLSTMModel(in_channels, seq_length=seq_length, hidden_dim=64, dense_dim=32, scale_factor=0.2).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
best_val_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.6f} - Val Loss: {avg_val_loss:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

model.load_state_dict(best_model_state)

# -------------------------------
# 7. Evaluation on Test Data
# -------------------------------
model.eval()
predictions = []
true_values = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        preds = model(batch_X)
        predictions.append(preds.cpu().numpy())
        true_values.append(batch_y.cpu().numpy())

predictions = np.vstack(predictions)
true_values = np.vstack(true_values)

def inverse_transform(values):
    dummy = np.zeros((len(values), len(feature_cols)))
    dummy[:, 0] = values.flatten()
    return scaler.inverse_transform(dummy)[:, 0]

predictions_inv = inverse_transform(predictions)
true_values_inv = inverse_transform(true_values)
y_test_inv4, y_pred_inv4 = true_values_inv, predictions_inv

r2 = r2_score(true_values_inv, predictions_inv)
mae = mean_absolute_error(true_values_inv, predictions_inv)
mse = mean_squared_error(true_values_inv, predictions_inv)
rmse = np.sqrt(mse)
evs = explained_variance_score(true_values_inv, predictions_inv)
mape = mean_absolute_percentage_error(true_values_inv, predictions_inv) * 100

r2_scaled = r2_score(true_values, predictions)
mae_scaled = mean_absolute_error(true_values, predictions)
mse_scaled = mean_squared_error(true_values, predictions)
rmse_scaled = np.sqrt(mse_scaled)
evs_scaled = explained_variance_score(true_values, predictions)
mape_scaled = mean_absolute_percentage_error(true_values, predictions) * 100

print("📊 Final Results -")
print(f"  R²: {r2:.4f}")
print(f"  MAE: {mae:.4f}")
print(f"  MSE: {mse:.4f}")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"  EVS: {evs:.4f}")

print("\n--- Scaled Metrics ---")
print(f"R² Score: {r2_scaled:.4f}")
print(f"MAE: {mae_scaled:.4f}")
print(f"MSE: {mse_scaled:.4f}")
print(f"RMSE: {rmse_scaled:.4f}")
print(f"Explained Variance Score: {evs_scaled:.4f}")
print(f"MAPE: {mape_scaled:.2f}%")

plt.figure(figsize=(10,5))
plt.plot(true_values_inv, label='Actual')
plt.plot(predictions_inv, label='Predicted')
plt.xlabel("Time")
plt.ylabel("Close Price")
plt.legend()
plt.show()
