# Enhanced CNN Stock Market Prediction

Improvements over the original model:
1. Additional technical indicators
2. Enhanced CNN architecture with batch normalization
3. Better training process with validation
4. Learning rate scheduling

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

In [None]:
def add_technical_indicators(data):
    # Calculate moving averages
    data['MA5'] = data['Close'].rolling(window=5).mean()
    data['MA20'] = data['Close'].rolling(window=20).mean()
    
    # Calculate RSI
    delta = data['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    data['RSI'] = 100 - (100 / (1 + rs))
    
    # Calculate MACD
    exp1 = data['Close'].ewm(span=12, adjust=False).mean()
    exp2 = data['Close'].ewm(span=26, adjust=False).mean()
    data['MACD'] = exp1 - exp2
    
    # Calculate Bollinger Bands
    data['BB_middle'] = data['Close'].rolling(window=20).mean()
    data['BB_upper'] = data['BB_middle'] + 2 * data['Close'].rolling(window=20).std()
    data['BB_lower'] = data['BB_middle'] - 2 * data['Close'].rolling(window=20).std()
    
    # Forward fill NaN values
    return data.fillna(method='ffill')

# Fetch data with longer period for better indicator calculation
stock_symbol = "RELIANCE.NS"
end_date = datetime.now()
start_date = end_date - timedelta(days=100)  # 100 days for better indicator calculation

data = yf.download(stock_symbol, start=start_date.strftime('%Y-%m-%d'), 
                  end=end_date.strftime('%Y-%m-%d'), interval="30m")

# Add technical indicators
data = add_technical_indicators(data)
print("Data shape:", data.shape)
print("\nFeatures:", list(data.columns))

In [None]:
class EnhancedDataset(Dataset):
    def __init__(self, data, sequence_length=15):
        self.sequence_length = sequence_length
        
        # Select features
        self.feature_columns = ['Open', 'High', 'Low', 'Close', 'Volume',
                               'MA5', 'MA20', 'RSI', 'MACD',
                               'BB_middle', 'BB_upper', 'BB_lower']
        
        self.data = data[self.feature_columns].values
        
        # Scale the data
        self.scaler = MinMaxScaler(feature_range=(0,1))
        self.scaled_data = self.scaler.fit_transform(self.data)
        
        # Create sequences
        self.sequences = []
        self.targets = []
        
        for i in range(len(self.data) - sequence_length):
            sequence = self.scaled_data[i:(i + sequence_length)]
            target = self.scaled_data[i + sequence_length, 3]  # Close price
            self.sequences.append(sequence)
            self.targets.append(target)
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return (torch.FloatTensor(self.sequences[idx]), 
                torch.FloatTensor([self.targets[idx]]))

# Create dataset
dataset = EnhancedDataset(data)

# Split into train and validation sets (80-20)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
class EnhancedCNN(nn.Module):
    def __init__(self, input_channels=12, hidden_size1=128, hidden_size2=64, dropout_prob=0.3):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=hidden_size1, kernel_size=(3,3), 
                     stride=1, padding=1),
            nn.BatchNorm2d(hidden_size1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2), stride=1),
            nn.Dropout(p=dropout_prob),
            
            nn.Conv2d(in_channels=hidden_size1, out_channels=hidden_size2, 
                     kernel_size=(3,3), stride=1, padding=1),
            nn.BatchNorm2d(hidden_size2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2), stride=1),
            nn.Dropout(p=dropout_prob),
            
            nn.Conv2d(in_channels=hidden_size2, out_channels=32, 
                     kernel_size=(2,2), stride=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Dropout(p=dropout_prob)
        )
        
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        self.classifier = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Dropout(p=dropout_prob),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.adaptive_pool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# Initialize model and training components
model = EnhancedCNN(input_channels=len(dataset.feature_columns))
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', 
                                                       factor=0.5, patience=5, 
                                                       verbose=True)

In [None]:
def train_epoch(model, train_loader, criterion, optimizer):
    model.train()
    total_loss = 0
    for sequences, targets in train_loader:
        sequences = sequences.unsqueeze(1)  # Add channel dimension
        optimizer.zero_grad()
        outputs = model(sequences)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

def validate(model, val_loader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for sequences, targets in val_loader:
            sequences = sequences.unsqueeze(1)
            outputs = model(sequences)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
    return total_loss / len(val_loader)

# Training loop
num_epochs = 50
best_val_loss = float('inf')
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer)
    val_loss = validate(model, val_loader, criterion)
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    # Learning rate scheduling
    scheduler.step(val_loss)
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': train_loss,
            'val_loss': val_loss,
        }, 'enhanced_stock_prediction_model.pth')
    
    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Train Loss: {train_loss:.6f}')
    print(f'Val Loss: {val_loss:.6f}')
    print('-' * 40)

# Plot training history
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Model Loss Over Time')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Save the scaler for future predictions
import joblib
joblib.dump(dataset.scaler, 'enhanced_stock_scaler.pkl')

# Make a prediction
model.eval()
with torch.no_grad():
    # Get the last sequence from our data
    last_sequence = dataset.scaled_data[-15:]
    sequence_tensor = torch.FloatTensor(last_sequence).unsqueeze(0).unsqueeze(0)
    
    # Make prediction
    prediction = model(sequence_tensor)
    
    # Convert prediction back to original scale
    dummy_array = np.zeros((1, len(dataset.feature_columns)))
    dummy_array[0, 3] = prediction.item()  # Close price index
    predicted_price = dataset.scaler.inverse_transform(dummy_array)[0, 3]
    
    # Get actual price
    actual_price = data['Close'].iloc[-1]
    
    print(f'Predicted Price: ₹{predicted_price:.2f}')
    print(f'Actual Price: ₹{actual_price:.2f}')
    print(f'Prediction Error: ₹{abs(actual_price - predicted_price):.2f}')