# Financial Stock Price Prediction with LSTM

This notebook demonstrates how to use LSTM neural networks to predict stock prices using historical financial data from Yahoo Finance. We'll adapt the LSTM architecture for time series regression and financial forecasting.

In [None]:
# Import Required Libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print("All libraries imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"Device available: {'GPU' if torch.cuda.is_available() else 'CPU'}")

## Data Collection with yfinance

In [None]:
# Download stock data using yfinance
ticker_symbol = "AAPL"  # Apple Inc. stock
start_date = "2020-01-01"
end_date = "2024-12-01"

print(f"Downloading {ticker_symbol} stock data from {start_date} to {end_date}...")

# Download the data
stock_data = yf.download(ticker_symbol, start=start_date, end=end_date)

# Display basic information about the dataset
print(f"\nDataset shape: {stock_data.shape}")
print(f"Date range: {stock_data.index[0]} to {stock_data.index[-1]}")
print(f"Columns: {list(stock_data.columns)}")

# Display first few rows
print("\nFirst 5 rows of the data:")
print(stock_data.head())

# Check for missing values
print(f"\nMissing values per column:")
print(stock_data.isnull().sum())

## Data Preprocessing and Feature Engineering

In [None]:
# Create a copy for preprocessing
df = stock_data.copy()

# Use closing price as our main target variable
df['Close_Price'] = df['Close']

# Create additional features
df['Price_Change'] = df['Close'].pct_change()
df['Volume_MA_10'] = df['Volume'].rolling(window=10).mean()
df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'] * 100
df['Open_Close_Pct'] = (df['Close'] - df['Open']) / df['Open'] * 100

# Create moving averages
df['MA_5'] = df['Close'].rolling(window=5).mean()
df['MA_10'] = df['Close'].rolling(window=10).mean()
df['MA_20'] = df['Close'].rolling(window=20).mean()

# Create volatility measure (rolling standard deviation)
df['Volatility'] = df['Price_Change'].rolling(window=10).std()

# Drop rows with NaN values
df = df.dropna()

# Select features for training
feature_columns = ['Close_Price', 'Volume', 'High_Low_Pct', 'Open_Close_Pct', 
                   'MA_5', 'MA_10', 'MA_20', 'Volatility']
df_features = df[feature_columns].copy()

print(f"Shape after preprocessing: {df_features.shape}")
print(f"Features selected: {feature_columns}")
print("\nProcessed data sample:")
print(df_features.head())

# Visualize the closing price
plt.figure(figsize=(12, 6))
plt.plot(df.index, df['Close_Price'])
plt.title(f'{ticker_symbol} Stock Price Over Time')
plt.xlabel('Date')
plt.ylabel('Close Price ($)')
plt.grid(True)
plt.show()

## Create Dataset Class for Time Series

In [None]:
# Normalize the data using MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(df_features.values)

print(f"Data shape after scaling: {scaled_data.shape}")
print(f"Sample scaled data (first 3 rows):")
print(scaled_data[:3])

class StockDataset(Dataset):
    def __init__(self, data, sequence_length, target_column=0):
        self.data = data
        self.sequence_length = sequence_length
        self.target_column = target_column
    
    def __len__(self):
        return len(self.data) - self.sequence_length
    
    def __getitem__(self, idx):
        # Get sequence of features
        sequence = self.data[idx:idx + self.sequence_length]
        # Get target (next closing price)
        target = self.data[idx + self.sequence_length, self.target_column]
        
        return torch.FloatTensor(sequence), torch.FloatTensor([target])

# Split data into train/test sets (80/20 split)
train_size = int(len(scaled_data) * 0.8)
train_data = scaled_data[:train_size]
test_data = scaled_data[train_size:]

print(f"Training data shape: {train_data.shape}")
print(f"Test data shape: {test_data.shape}")

# Set sequence length (number of days to look back)
sequence_length = 60

# Create datasets
train_dataset = StockDataset(train_data, sequence_length)
test_dataset = StockDataset(test_data, sequence_length)

print(f"Training samples: {len(train_dataset)}")
print(f"Test samples: {len(test_dataset)}")

# Test the dataset
sample_seq, sample_target = train_dataset[0]
print(f"Sample sequence shape: {sample_seq.shape}")
print(f"Sample target shape: {sample_target.shape}")

## Define LSTM Model for Financial Prediction

In [None]:
class FinancialLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size=1, dropout=0.2):
        super(FinancialLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, 
                           batch_first=True, dropout=dropout)
        
        # Dropout for regularization
        self.dropout = nn.Dropout(dropout)
        
        # Output layer
        self.linear = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagate LSTM
        lstm_out, _ = self.lstm(x, (h0, c0))
        
        # Get the last time step output
        last_output = lstm_out[:, -1, :]
        
        # Apply dropout
        last_output = self.dropout(last_output)
        
        # Apply linear layer
        predictions = self.linear(last_output)
        
        return predictions

# Model parameters
input_size = len(feature_columns)  # Number of features
hidden_size = 64
num_layers = 2
output_size = 1  # Predicting one value (closing price)

# Create model instance
model = FinancialLSTM(input_size, hidden_size, num_layers, output_size)

# Print model architecture
print("Model Architecture:")
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

## Set Hyperparameters and Prepare Training

In [None]:
# Training hyperparameters
batch_size = 32
learning_rate = 0.001
num_epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Move model to device
model = model.to(device)

# Loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', 
                                                 factor=0.5, patience=10, verbose=True)

print(f"Device: {device}")
print(f"Batch size: {batch_size}")
print(f"Learning rate: {learning_rate}")
print(f"Number of epochs: {num_epochs}")
print(f"Training batches: {len(train_loader)}")
print(f"Test batches: {len(test_loader)}")

## Training Loop Implementation

In [None]:
# Training function
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    
    for batch_idx, (sequences, targets) in enumerate(train_loader):
        sequences = sequences.to(device)
        targets = targets.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(sequences)
        loss = criterion(outputs, targets)
        
        # Backward pass
        loss.backward()
        
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)

# Validation function
def validate_model(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for sequences, targets in test_loader:
            sequences = sequences.to(device)
            targets = targets.to(device)
            
            outputs = model(sequences)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
    
    return total_loss / len(test_loader)

# Training loop
train_losses = []
val_losses = []

print("Starting training...")
print("=" * 60)

for epoch in range(num_epochs):
    # Train the model
    train_loss = train_model(model, train_loader, criterion, optimizer, device)
    
    # Validate the model
    val_loss = validate_model(model, test_loader, criterion, device)
    
    # Store losses
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    # Update learning rate
    scheduler.step(val_loss)
    
    # Print progress
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')

print("Training completed!")
print("=" * 60)

## Model Evaluation and Predictions

In [None]:
# Make predictions on test set
model.eval()
predictions = []
actuals = []

with torch.no_grad():
    for sequences, targets in test_loader:
        sequences = sequences.to(device)
        targets = targets.to(device)
        
        outputs = model(sequences)
        predictions.extend(outputs.cpu().numpy())
        actuals.extend(targets.cpu().numpy())

# Convert to numpy arrays
predictions = np.array(predictions).reshape(-1, 1)
actuals = np.array(actuals).reshape(-1, 1)

# Inverse transform to get actual price values
# Create dummy array for inverse transform (we only need the first column)
dummy_pred = np.zeros((len(predictions), len(feature_columns)))
dummy_actual = np.zeros((len(actuals), len(feature_columns)))

dummy_pred[:, 0] = predictions.flatten()
dummy_actual[:, 0] = actuals.flatten()

# Inverse transform
pred_prices = scaler.inverse_transform(dummy_pred)[:, 0]
actual_prices = scaler.inverse_transform(dummy_actual)[:, 0]

# Calculate metrics
mse = mean_squared_error(actual_prices, pred_prices)
mae = mean_absolute_error(actual_prices, pred_prices)
rmse = np.sqrt(mse)

# Calculate percentage accuracy
mape = np.mean(np.abs((actual_prices - pred_prices) / actual_prices)) * 100

print("Model Performance Metrics:")
print("=" * 40)
print(f"Mean Squared Error (MSE): {mse:.4f}")
print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")
print(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%")
print(f"Accuracy: {100 - mape:.2f}%")

## Visualize Results

In [None]:
# Create comprehensive visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. Training and Validation Loss
axes[0, 0].plot(train_losses, label='Training Loss', color='blue')
axes[0, 0].plot(val_losses, label='Validation Loss', color='red')
axes[0, 0].set_title('Training and Validation Loss')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].legend()
axes[0, 0].grid(True)

# 2. Actual vs Predicted Prices (last 100 points for clarity)
last_n = min(100, len(actual_prices))
axes[0, 1].plot(actual_prices[-last_n:], label='Actual', color='green', linewidth=2)
axes[0, 1].plot(pred_prices[-last_n:], label='Predicted', color='orange', linewidth=2)
axes[0, 1].set_title(f'Actual vs Predicted Prices (Last {last_n} days)')
axes[0, 1].set_xlabel('Days')
axes[0, 1].set_ylabel('Price ($)')
axes[0, 1].legend()
axes[0, 1].grid(True)

# 3. Scatter plot of Actual vs Predicted
axes[1, 0].scatter(actual_prices, pred_prices, alpha=0.5, color='purple')
axes[1, 0].plot([actual_prices.min(), actual_prices.max()], 
                [actual_prices.min(), actual_prices.max()], 
                'r--', linewidth=2)
axes[1, 0].set_xlabel('Actual Prices ($)')
axes[1, 0].set_ylabel('Predicted Prices ($)')
axes[1, 0].set_title('Actual vs Predicted Prices Scatter Plot')
axes[1, 0].grid(True)

# 4. Prediction Error Distribution
errors = actual_prices - pred_prices
axes[1, 1].hist(errors, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
axes[1, 1].set_xlabel('Prediction Error ($)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Distribution of Prediction Errors')
axes[1, 1].axvline(x=0, color='red', linestyle='--', linewidth=2)
axes[1, 1].grid(True)

plt.tight_layout()
plt.show()

# Summary statistics
print("\nPrediction Summary:")
print("=" * 50)
print(f"Total test samples: {len(actual_prices)}")
print(f"Actual price range: ${actual_prices.min():.2f} - ${actual_prices.max():.2f}")
print(f"Predicted price range: ${pred_prices.min():.2f} - ${pred_prices.max():.2f}")
print(f"Average actual price: ${actual_prices.mean():.2f}")
print(f"Average predicted price: ${pred_prices.mean():.2f}")
print(f"Standard deviation of errors: ${errors.std():.2f}")

# Show last few predictions
print(f"\nLast 5 predictions vs actual:")
print("Date\t\tActual\t\tPredicted\tError")
print("-" * 50)
for i in range(-5, 0):
    error = actual_prices[i] - pred_prices[i]
    print(f"Day {len(actual_prices)+i}\t${actual_prices[i]:.2f}\t\t${pred_prices[i]:.2f}\t\t{error:.2f}")