In [1]:
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
import yfinance as yf

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

import os
import random

# -----------------------------
# Set seeds for reproducibility
# -----------------------------
np.random.seed(42)
tf.random.set_seed(42)
random.seed(42)

# -----------------------------
# Enable GPU memory growth
# -----------------------------
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU enabled and memory growth set.")
    except RuntimeError as e:
        print(e)

# -----------------------------
# Constants
# -----------------------------
TICKER = "AAPL"
LOOKBACK = 60
BATCH_SIZE = 32
EPOCHS = 20

print("Setup complete. Ready to download data and train LSTM models.")




Setup complete. Ready to download data and train LSTM models.


In [None]:
# Download 20 years of daily closing prices
data = yf.download(TICKER, start="2005-01-01", end="2025-01-01")
print(f"Downloaded {len(data)} rows for {TICKER}.")

# Drop missing values
data = data.dropna()
print(f"After dropping NA: {len(data)} rows remaining.")

# Use only the closing price
close_prices = data['Close'].values.reshape(-1, 1)

# Basic statistics
print("Closing price stats:")
print(f"Min: {close_prices.min():.2f}")
print(f"Max: {close_prices.max():.2f}")
print(f"Mean: {close_prices.mean():.2f}")
print(f"Std: {close_prices.std():.2f}")

# Preview the first 10 rows
print(data.head(10))

# Save for future use
data.to_csv(f"{TICKER}_historical.csv")
print(f"Saved historical data to {TICKER}_historical.csv")


In [None]:
plt.figure(figsize=(14,6))
plt.plot(data['Close'], label='Closing Price', color='blue')
plt.title(f"{TICKER} Closing Price Over Time")
plt.xlabel("Date")
plt.ylabel("Price USD")
plt.legend()
plt.grid(True)
plt.show()

# Plot moving averages
data['MA50'] = data['Close'].rolling(window=50).mean()
data['MA200'] = data['Close'].rolling(window=200).mean()

plt.figure(figsize=(14,6))
plt.plot(data['Close'], label='Close', alpha=0.5)
plt.plot(data['MA50'], label='50-day MA', color='orange')
plt.plot(data['MA200'], label='200-day MA', color='red')
plt.title(f"{TICKER} Moving Averages")
plt.xlabel("Date")
plt.ylabel("Price USD")
plt.legend()
plt.show()

# Plot histogram of returns
data['Returns'] = data['Close'].pct_change()
plt.figure(figsize=(10,4))
plt.hist(data['Returns'].dropna(), bins=100, alpha=0.7, color='green')
plt.title(f"{TICKER} Daily Returns Distribution")
plt.xlabel("Return")
plt.ylabel("Frequency")
plt.show()


In [None]:
# MinMax scaling
scaler = MinMaxScaler(feature_range=(0,1))
scaled_data = scaler.fit_transform(close_prices)s
print(f"Scaled data shape: {scaled_data.shape}")

# Function to create sequences
def create_sequences(data, lookback=LOOKBACK):
    X, y = [], []
    for i in range(lookback, len(data)):
        X.append(data[i-lookback:i, 0])
        y.append(data[i,0])
    return np.array(X), np.array(y)

# Create sequences
X_all, y_all = create_sequences(scaled_data)
print(f"X shape: {X_all.shape}, y shape: {y_all.shape}")

# Split 80% train / 20% test
split_index = int(len(X_all) * 0.8)
X_train, X_test = X_all[:split_index], X_all[split_index:]
y_train, y_test = y_all[:split_index], y_all[split_index:]

# Reshape for LSTM input [samples, timesteps, features]
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1],1))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1],1))
print(f"Training samples: {X_train.shape[0]}, Testing samples: {X_test.shape[0]}")


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

def build_lstm_model(input_shape=(LOOKBACK,1)):
    model = Sequential()
    model.add(LSTM(64, return_sequences=True, input_shape=input_shape))
    model.add(Dropout(0.2))
    model.add(LSTM(32, return_sequences=False))
    model.add(Dropout(0.2))
    model.add(Dense(25, activation='relu'))
    model.add(Dense(1))
    
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

# Test build
model = build_lstm_model()
print(model.summary())


In [None]:
# Train LSTM on last 1 year (~252 trading days)

days = 252
subset = close_prices[-days:]
scaled_subset = scaler.fit_transform(subset)

X_sub, y_sub = create_sequences(scaled_subset)
X_sub = X_sub.reshape((X_sub.shape[0], X_sub.shape[1],1))

split_index = int(len(X_sub) * 0.8)
X_train_sub, X_test_sub = X_sub[:split_index], X_sub[split_index:]
y_train_sub, y_test_sub = y_sub[:split_index], y_sub[split_index:]

model_1y = build_lstm_model()
history_1y = model_1y.fit(
    X_train_sub, y_train_sub,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1
)

preds_1y = model_1y.predict(X_test_sub)
preds_1y = scaler.inverse_transform(preds_1y)
y_test_rescaled = scaler.inverse_transform(y_test_sub.reshape(-1,1))

rmse_1y = math.sqrt(mean_squared_error(y_test_rescaled, preds_1y))
mae_1y = mean_absolute_error(y_test_rescaled, preds_1y)

print(f"1-Year Dataset → RMSE: {rmse_1y:.4f}, MAE: {mae_1y:.4f}")


In [None]:
#Train LSTM on last 3 years (~756 trading days)

days = 3*252
subset = close_prices[-days:]
scaled_subset = scaler.fit_transform(subset)

X_sub, y_sub = create_sequences(scaled_subset)
X_sub = X_sub.reshape((X_sub.shape[0], X_sub.shape[1],1))

split_index = int(len(X_sub) * 0.8)
X_train_sub, X_test_sub = X_sub[:split_index], X_sub[split_index:]
y_train_sub, y_test_sub = y_sub[:split_index], y_sub[split_index:]

model_3y = build_lstm_model()
history_3y = model_3y.fit(
    X_train_sub, y_train_sub,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1
)

preds_3y = model_3y.predict(X_test_sub)
preds_3y = scaler.inverse_transform(preds_3y)
y_test_rescaled = scaler.inverse_transform(y_test_sub.reshape(-1,1))

rmse_3y = math.sqrt(mean_squared_error(y_test_rescaled, preds_3y))
mae_3y = mean_absolute_error(y_test_rescaled, preds_3y)

print(f"3-Year Dataset → RMSE: {rmse_3y:.4f}, MAE: {mae_3y:.4f}")


In [None]:
# Train LSTM on last 5 years (~1260 trading days)

days = 5*252
subset = close_prices[-days:]
scaled_subset = scaler.fit_transform(subset)

X_sub, y_sub = create_sequences(scaled_subset)
X_sub = X_sub.reshape((X_sub.shape[0], X_sub.shape[1],1))

split_index = int(len(X_sub) * 0.8)
X_train_sub, X_test_sub = X_sub[:split_index], X_sub[split_index:]
y_train_sub, y_test_sub = y_sub[:split_index], y_sub[split_index:]

model_5y = build_lstm_model()
history_5y = model_5y.fit(
    X_train_sub, y_train_sub,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1
)

preds_5y = model_5y.predict(X_test_sub)
preds_5y = scaler.inverse_transform(preds_5y)
y_test_rescaled = scaler.inverse_transform(y_test_sub.reshape(-1,1))

rmse_5y = math.sqrt(mean_squared_error(y_test_rescaled, preds_5y))
mae_5y = mean_absolute_error(y_test_rescaled, preds_5y)

print(f"5-Year Dataset → RMSE: {rmse_5y:.4f}, MAE: {mae_5y:.4f}")


In [None]:
# Train LSTM on last 10 years (~2520 trading days)

days = 10*252
subset = close_prices[-days:]
scaled_subset = scaler.fit_transform(subset)

X_sub, y_sub = create_sequences(scaled_subset)
X_sub = X_sub.reshape((X_sub.shape[0], X_sub.shape[1],1))

split_index = int(len(X_sub) * 0.8)
X_train_sub, X_test_sub = X_sub[:split_index], X_sub[split_index:]
y_train_sub, y_test_sub = y_sub[:split_index], y_sub[split_index:]

model_10y = build_lstm_model()
history_10y = model_10y.fit(
    X_train_sub, y_train_sub,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1
)

preds_10y = model_10y.predict(X_test_sub)
preds_10y = scaler.inverse_transform(preds_10y)
y_test_rescaled = scaler.inverse_transform(y_test_sub.reshape(-1,1))

rmse_10y = math.sqrt(mean_squared_error(y_test_rescaled, preds_10y))
mae_10y = mean_absolute_error(y_test_rescaled, preds_10y)

print(f"10-Year Dataset → RMSE: {rmse_10y:.4f}, MAE: {mae_10y:.4f}")


In [None]:
# Compare all dataset sizes and pick top 2

results = [
    ('1Y', rmse_1y, mae_1y, model_1y),
    ('3Y', rmse_3y, mae_3y, model_3y),
    ('5Y', rmse_5y, mae_5y, model_5y),
    ('10Y', rmse_10y, mae_10y, model_10y)
]

# Sort by RMSE ascending
results_sorted = sorted(results, key=lambda x: x[1])
top2 = results_sorted[:2]

print("Top 2 dataset sizes (lowest RMSE):")
for label, rmse, mae, _ in top2:
    print(f"{label} → RMSE={rmse:.4f}, MAE={mae:.4f}")


In [None]:
# Download unseen future stock prices (2024)

future_data = yf.download(TICKER, start="2024-01-01", end="2025-01-01")
future_close = future_data['Close'].dropna().values.reshape(-1,1)

plt.figure(figsize=(12,5))
plt.plot(future_close, label="Future 2024 Prices", color='purple')
plt.title(f"{TICKER} Closing Price 2024 (Unseen Data)")
plt.xlabel("Date")
plt.ylabel("Price USD")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Evaluate top 2 models on unseen 2024 data

def test_future(model, scaler, future_close, lookback=LOOKBACK):
    scaled_f = scaler.transform(future_close)
    Xf, yf = create_sequences(scaled_f, lookback)
    Xf = Xf.reshape((Xf.shape[0], Xf.shape[1],1))
    preds = model.predict(Xf)
    preds = scaler.inverse_transform(preds)
    real = scaler.inverse_transform(yf.reshape(-1,1))
    rmse = math.sqrt(mean_squared_error(real, preds))
    mae = mean_absolute_error(real, preds)
    return preds, real, rmse, mae

for label, _, _, model in top2:
    preds_f, real_f, rmse_f, mae_f = test_future(model, scaler, future_close)
    print(f"{label} model on 2024 → RMSE={rmse_f:.4f}, MAE={mae_f:.4f}")


In [None]:
#Plot predictions for top model

best_label, _, _, best_model = top2[0]
preds_f, real_f, _, _ = test_future(best_model, scaler, future_close)

plt.figure(figsize=(12,5))
plt.plot(real_f, label="Actual Future Prices")
plt.plot(preds_f, label=f"Predicted by {best_label} model", linestyle='--')
plt.title(f"{TICKER} 2024 Prediction vs Actual ({best_label} dataset)")
plt.xlabel("Days")
plt.ylabel("Price USD")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Save best model and scaler

best_model.save(f"{TICKER}_best_model.h5")
import joblib
joblib.dump(scaler, f"{TICKER}_scaler.save")

print(f"Saved model: {TICKER}_best_model.h5")
print(f"Saved scaler: {TICKER}_scaler.save")

# Test loading
from tensorflow.keras.models import load_model
loaded_model = load_model(f"{TICKER}_best_model.h5")
loaded_scaler = joblib.load(f"{TICKER}_scaler.save")
print("Loaded model and scaler successfully.")


In [None]:
# Predict next day using last 60 days

def predict_next_day(model, scaler, recent_prices, lookback=LOOKBACK):
    scaled = scaler.transform(np.array(recent_prices[-lookback:]).reshape(-1,1))
    X = scaled.reshape(1, lookback, 1)
    pred_scaled = model.predict(X)
    prediction = scaler.inverse_transform(pred_scaled)
    return prediction[0][0]

latest_60 = close_prices.values[-LOOKBACK:]
next_day_price = predict_next_day(loaded_model, loaded_scaler, latest_60)
print(f"Predicted next-day price for {TICKER}: ${next_day_price:.2f}")


In [None]:
# Predict next 5 days iteratively

future_preds = []
recent_prices = close_prices.values[-LOOKBACK:].copy()

for i in range(5):
    pred = predict_next_day(loaded_model, loaded_scaler, recent_prices)
    future_preds.append(pred)
    recent_prices = np.append(recent_prices, pred)

print("Next 5-day predictions:", [f"${p:.2f}" for p in future_preds])

plt.figure(figsize=(12,5))
plt.plot(range(1,6), future_preds, marker='o', linestyle='--', color='orange')
plt.title(f"{TICKER} Next 5-Day Price Prediction")
plt.xlabel("Day")
plt.ylabel("Predicted Price USD")
plt.grid(True)
plt.show()
