<a href="https://colab.research.google.com/github/MattJBorowski1991/AAPL_LSTM_simple/blob/main/AAPL_price_predictor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install yfinance pandas matplotlib



In [2]:
# Apple Stock Price Prediction - Data Preparation
# This script downloads Apple stock data and prepares it for deep learning

import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from tqdm.notebook import tqdm

# Set random seed for reproducibility
np.random.seed(42)

# Download Apple's complete stock history
print("Downloading Apple stock data from Yahoo Finance...")
aapl_data = yf.download("AAPL", period="max")

# Display info about the dataset
print(f"Dataset shape: {aapl_data.shape}")
print(f"Date range: {aapl_data.index.min()} to {aapl_data.index.max()}")
print(f"Total days: {aapl_data.shape[0]}")
print(f"Total datapoints: {aapl_data.shape[0] * aapl_data.shape[1]}")

# Preview the data
print("\nData Preview:")
print(aapl_data.head())

# Check for missing values
print("\nMissing values:")
print(aapl_data.isnull().sum())

# Calculate technical indicators
print("\nCalculating technical indicators...")
df = aapl_data.copy()

# 1. RSI (14-day)
delta = df['Close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss
df['RSI_14'] = 100 - (100 / (1 + rs))

# 2. Add date-based features
df['Day_of_Week'] = df.index.dayofweek
df['Month'] = df.index.month
df['Day_of_Month'] = df.index.day

# 3. Price patterns - gaps
df['Gap_Up'] = ((df['Open'] > df['Close'].shift(1)) * 1)
df['Gap_Down'] = ((df['Open'] < df['Close'].shift(1)) * 1)

# 4. Add our target variable - next day's opening price
df['Next_Day_Open'] = df['Open'].shift(-1)

# Remove rows with NaN values (from rolling calculations)
df_clean = df.dropna()
print(f"Original features: {aapl_data.shape[1]}")
print(f"Expanded features: {df.shape[1]}")
print(f"Total datapoints after feature engineering: {df_clean.shape[0] * df_clean.shape[1]}")

# Show all features
print("\nAvailable features in the dataset:")
for i, col in enumerate(df_clean.columns):
    print(f"{i+1}. {col}")

# Prepare data for model training
print("\nPreparing data for training...")

# Define features and target
X = df_clean.drop(['Next_Day_Open'], axis=1)
y = df_clean['Next_Day_Open']

# Feature scaling
print("Applying MinMax scaling to features...")
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()

X_scaled = scaler_X.fit_transform(X)
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).flatten()

# Create sequences for time series prediction
def create_sequences(X, y, time_steps=90):
    X_seq, y_seq = [], []
    for i in range(len(X) - time_steps):
        X_seq.append(X[i:i + time_steps])
        y_seq.append(y[i + time_steps])
    return np.array(X_seq), np.array(y_seq)

# Define sequence length (lookback period)
sequence_length = 90  # Using 90 days of data to predict the next day

print(f"Creating sequences with lookback period of {sequence_length} days...")
X_seq, y_seq = create_sequences(X_scaled, y_scaled, sequence_length)

print(f"Sequence shape: {X_seq.shape}")
print(f"Target shape: {y_seq.shape}")

# Train-test split (90-10)
train_size = int(len(X_seq) * 0.9)
X_train, X_test = X_seq[:train_size], X_seq[train_size:]
y_train, y_test = y_seq[:train_size], y_seq[train_size:]

print(f"Training set: {X_train.shape}")
print(f"Testing set: {X_test.shape}")

# Save processed data
np.save('X_train.npy', X_train)
np.save('y_train.npy', y_train)
np.save('X_test.npy', X_test)
np.save('y_test.npy', y_test)

# Save scalers for later use
import pickle
with open('scaler_X.pkl', 'wb') as f:
    pickle.dump(scaler_X, f)
with open('scaler_y.pkl', 'wb') as f:
    pickle.dump(scaler_y, f)

print("\nData preparation complete! Files saved and ready for model training.")

# Save the feature list for reference
with open('feature_list.txt', 'w') as f:
    for feature in X.columns:
        f.write(f"{feature}\n")

print("Feature list saved to 'feature_list.txt'")

Downloading Apple stock data from Yahoo Finance...
YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed


Dataset shape: (11182, 5)
Date range: 1980-12-12 00:00:00 to 2025-04-24 00:00:00
Total days: 11182
Total datapoints: 55910

Data Preview:
Price          Close      High       Low      Open     Volume
Ticker          AAPL      AAPL      AAPL      AAPL       AAPL
Date                                                         
1980-12-12  0.098726  0.099155  0.098726  0.098726  469033600
1980-12-15  0.093575  0.094005  0.093575  0.094005  175884800
1980-12-16  0.086707  0.087136  0.086707  0.087136  105728000
1980-12-17  0.088853  0.089282  0.088853  0.088853   86441600
1980-12-18  0.091429  0.091858  0.091429  0.091429   73449600

Missing values:
Price   Ticker
Close   AAPL      0
High    AAPL      0
Low     AAPL      0
Open    AAPL      0
Volume  AAPL      0
dtype: int64

Calculating technical indicators...
Original features: 5
Expanded features: 12
Total datapoints after feature engineering: 134016

Available features in the dataset:
1. ('Close', 'AAPL')
2. ('High', 'AAPL')
3. ('Low', 'A

In [15]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
# ----- ADDED IMPORTS FOR HYPERPARAMETER SEARCH -----
import random
from itertools import product
import pandas as pd
# --------------------------------------------------

# Load prepared data
X_train = np.load('X_train.npy')
y_train = np.load('y_train.npy')
X_test = np.load('X_test.npy')
y_test = np.load('y_test.npy')

# Define custom dataset class
class StockDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx].unsqueeze(-1)

# Create data loaders
train_dataset = StockDataset(X_train, y_train)
test_dataset = StockDataset(X_test, y_test)

def create_data_loaders(batch_size):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader

# Define model architecture
class StockPredictor(nn.Module):
    def __init__(self, hidden_size, num_layers, dropout_rate):
        super(StockPredictor, self).__init__()
        self.lstm = nn.LSTM(input_size=X_train.shape[2], hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.bn = nn.BatchNorm1d(hidden_size)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.bn(out[:, -1, :])
        out = self.dropout(out)
        out = self.fc(out)
        return out

# Define hyperparameter ranges
hyperparams = {
    'batch_size': [32],
    'hidden_size': [64],
    'num_layers': [2],
    'dropout_rate': [0.25],
    'learning_rate': [0.001],
    'weight_decay': [0.001],
    'num_epochs': [10, 15, 20]
}


# Best Model Configuration:

# ITERATION 1
# batch_size       64.000000
# hidden_size      64.000000
# num_layers        3.000000
# dropout_rate      0.200000
# learning_rate     0.001000
# weight_decay      0.001000
# num_epochs       15.000000
# test_loss         0.000956

# ITERATION 2
# Best Model Configuration:
# batch_size       32.000000
# hidden_size      64.000000
# num_layers        2.000000
# dropout_rate      0.250000
# learning_rate     0.001000
# weight_decay      0.001000
# num_epochs        7.000000
# test_loss         0.000447
# Name: 4, dtype: float64
# 2nd Best Model Configuration:
# batch_size       128.000000
# hidden_size       64.000000
# num_layers         3.000000
# dropout_rate       0.250000
# learning_rate      0.001000
# weight_decay       0.000100
# num_epochs         6.000000
# test_loss          0.000491
# Name: 3, dtype: float64

# ITERATION 3 (inferior to ITERATION 2. Go back)
# Best Model Configuration:
# batch_size       64.000000
# hidden_size      64.000000
# num_layers        2.000000
# dropout_rate      0.300000
# learning_rate     0.001000
# weight_decay      0.001000
# num_epochs       10.000000
# test_loss         0.000512
# Name: 8, dtype: float64

# ITERATION 4 (Slightly better than ITERATION 2. Train more.)
# Best Model Configuration:
# batch_size       32.000000
# hidden_size      64.000000
# num_layers        2.000000
# dropout_rate      0.250000
# learning_rate     0.001000
# weight_decay      0.001000
# num_epochs        8.000000
# test_loss         0.000401
# Name: 3, dtype: float64

# ITERATION 5
# Best Model Configuration:
# batch_size       32.000000
# hidden_size      64.000000
# num_layers        2.000000
# dropout_rate      0.250000
# learning_rate     0.001000
# weight_decay      0.001000
# num_epochs       10.000000
# test_loss         0.000398
# Name: 6, dtype: float64


# Number of random samples to try
num_samples = 10

# Store results
results = []

# Random sampling loop
for i in range(num_samples):
    print(f"\nTraining model {i+1}/{num_samples}")

    # Randomly sample hyperparameters
    config = {
        'batch_size': random.choice(hyperparams['batch_size']),
        'hidden_size': random.choice(hyperparams['hidden_size']),
        'num_layers': random.choice(hyperparams['num_layers']),
        'dropout_rate': random.choice(hyperparams['dropout_rate']),
        'learning_rate': random.choice(hyperparams['learning_rate']),
        'weight_decay': random.choice(hyperparams['weight_decay']),
        'num_epochs': random.choice(hyperparams['num_epochs'])
    }

    # Create data loaders with sampled batch size
    train_loader, test_loader = create_data_loaders(config['batch_size'])

    # Initialize model with sampled parameters
    model = StockPredictor(
        hidden_size=config['hidden_size'],
        num_layers=config['num_layers'],
        dropout_rate=config['dropout_rate']
    )

    # Initialize optimizer and loss function
    criterion = nn.MSELoss()
    optimizer = optim.Adam(
        model.parameters(),
        lr=config['learning_rate'],
        weight_decay=config['weight_decay']
    )

    # Train the model
    for epoch in range(config['num_epochs']):
        model.train()
        total_loss = 0
        for batch in train_loader:
            X_batch, y_batch = batch
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{config['num_epochs']}, Loss: {total_loss / len(train_loader):.6f}")

    # Evaluate the model
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            X_batch, y_batch = batch
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()

    avg_test_loss = test_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.6f}")

    # Store results
    results.append({
        **config,
        'test_loss': avg_test_loss
    })

# Convert results to DataFrame and find best model
results_df = pd.DataFrame(results)
best_model = results_df.loc[results_df['test_loss'].idxmin()]

# Print results
print("\nHyperparameter Search Results:")
print(results_df)
print("\nBest Model Configuration:")
print(best_model)
# -------------------------------------------------------


Training model 1/10
Epoch 1/10, Loss: 0.035424
Epoch 2/10, Loss: 0.001720
Epoch 3/10, Loss: 0.000592
Epoch 4/10, Loss: 0.000395
Epoch 5/10, Loss: 0.000275
Epoch 6/10, Loss: 0.000281
Epoch 7/10, Loss: 0.000271
Epoch 8/10, Loss: 0.000229
Epoch 9/10, Loss: 0.000264
Epoch 10/10, Loss: 0.000223
Test Loss: 0.000423

Training model 2/10
Epoch 1/10, Loss: 0.024402
Epoch 2/10, Loss: 0.000862
Epoch 3/10, Loss: 0.000376
Epoch 4/10, Loss: 0.000276
Epoch 5/10, Loss: 0.000261
Epoch 6/10, Loss: 0.000273
Epoch 7/10, Loss: 0.000221
Epoch 8/10, Loss: 0.000254
Epoch 9/10, Loss: 0.000252
Epoch 10/10, Loss: 0.000256
Test Loss: 0.000575

Training model 3/10
Epoch 1/20, Loss: 0.023465
Epoch 2/20, Loss: 0.001012
Epoch 3/20, Loss: 0.000372
Epoch 4/20, Loss: 0.000305
Epoch 5/20, Loss: 0.000298
Epoch 6/20, Loss: 0.000258
Epoch 7/20, Loss: 0.000278
Epoch 8/20, Loss: 0.000239
Epoch 9/20, Loss: 0.000260
Epoch 10/20, Loss: 0.000258
Epoch 11/20, Loss: 0.000283
Epoch 12/20, Loss: 0.000268
Epoch 13/20, Loss: 0.000272


In [16]:
sorted_results = results_df.sort_values(by='test_loss')
second_best_model = sorted_results.iloc[1]  # 2nd best model
third_best_model = sorted_results.iloc[2]   # 3rd best model

# Print 2nd and 3rd best models
print("\n2nd Best Model Configuration:")
print(second_best_model)
print("\n3rd Best Model Configuration:")
print(third_best_model)
# ----------------------------------------------------


2nd Best Model Configuration:
batch_size       32.000000
hidden_size      64.000000
num_layers        2.000000
dropout_rate      0.250000
learning_rate     0.001000
weight_decay      0.001000
num_epochs       10.000000
test_loss         0.000423
Name: 0, dtype: float64

3rd Best Model Configuration:
batch_size       32.000000
hidden_size      64.000000
num_layers        2.000000
dropout_rate      0.250000
learning_rate     0.001000
weight_decay      0.001000
num_epochs       15.000000
test_loss         0.000434
Name: 5, dtype: float64
