# LSTM with Attention for Stock Prediction - Basic Tutorial

This notebook implements the corrected LSTM with attention mechanism for stock price prediction.

**Author:** Dr. Ernesto Lee | [drlee.io](https://drlee.io)

**Repository:** [github.com/fenago/lstm-attention-stock-prediction](https://github.com/fenago/lstm-attention-stock-prediction)

---

## What's Fixed in This Version?

✅ Working attention mechanism (Functional API)
✅ Proper scaler handling (no data leakage)
✅ Correct data splitting
✅ Production-ready code

---

## 1. Installation

Install all required packages:

In [None]:
!pip install tensorflow keras yfinance numpy pandas matplotlib scikit-learn -q

## 2. Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
import pickle
from datetime import datetime, timedelta
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

import tensorflow as tf
from tensorflow import keras
from keras.models import Model
from keras.layers import Input, LSTM, Dense, Dropout, BatchNormalization
from keras.layers import AdditiveAttention, Concatenate, Lambda
from keras.callbacks import EarlyStopping, ReduceLROnPlateau

import warnings
warnings.filterwarnings('ignore')

# Set seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow Version: {tf.__version__}")

## 3. Stock Predictor Class

Complete implementation with all bug fixes:

In [None]:
class StockPredictorLSTMAttention:
    """
    LSTM with Attention Mechanism for Stock Price Prediction
    
    This corrected implementation fixes all issues from the original article.
    """

    def __init__(self, sequence_length=60, prediction_days=4, features=['Close']):
        self.sequence_length = sequence_length
        self.prediction_days = prediction_days
        self.features = features
        self.n_features = len(features)
        self.scaler = None
        self.model = None

    def fetch_data(self, ticker='AAPL', start_date='2020-01-01', end_date='2024-01-01'):
        """Fetch stock data from Yahoo Finance"""
        print(f"Fetching {ticker} data from {start_date} to {end_date}...")
        data = yf.download(ticker, start=start_date, end=end_date)
        data = data.fillna(method='ffill').fillna(method='bfill')
        print(f"Fetched {len(data)} trading days")
        return data

    def prepare_data(self, data, train_split=0.8):
        """Prepare data with PROPER train/test split and NO look-ahead bias"""
        feature_data = data[self.features].values

        # Split BEFORE scaling (critical!)
        split_idx = int(len(feature_data) * train_split)
        train_data = feature_data[:split_idx]
        test_data = feature_data[split_idx:]

        # Fit scaler on training data ONLY
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        train_scaled = self.scaler.fit_transform(train_data)
        test_scaled = self.scaler.transform(test_data)

        # Create sequences
        X_train, y_train = self._create_sequences(train_scaled)
        combined_data = np.concatenate([train_scaled[-self.sequence_length:], test_scaled])
        X_test, y_test = self._create_sequences(combined_data)

        test_dates = data.index[split_idx + self.sequence_length:]

        print(f"Training sequences: {X_train.shape}")
        print(f"Testing sequences: {X_test.shape}")

        return X_train, y_train, X_test, y_test, test_dates

    def _create_sequences(self, data):
        """Create sequences for LSTM training"""
        X, y = [], []
        for i in range(self.sequence_length, len(data)):
            X.append(data[i - self.sequence_length:i])
            y.append(data[i, :])
        return np.array(X), np.array(y)

    def build_model(self, lstm_units=[64, 32], dropout_rate=0.2):
        """Build LSTM model with WORKING attention mechanism"""
        inputs = Input(shape=(self.sequence_length, self.n_features))

        # First LSTM layer
        lstm_out1 = LSTM(lstm_units[0], return_sequences=True)(inputs)
        lstm_out1 = Dropout(dropout_rate)(lstm_out1)
        lstm_out1 = BatchNormalization()(lstm_out1)

        # Second LSTM layer
        lstm_out2 = LSTM(lstm_units[1], return_sequences=True)(lstm_out1)
        lstm_out2 = Dropout(dropout_rate)(lstm_out2)
        lstm_out2 = BatchNormalization()(lstm_out2)

        # Attention mechanism (PROPERLY IMPLEMENTED)
        attention_out = AdditiveAttention()([lstm_out2, lstm_out2])

        # Combine attention output with LSTM output
        concat = Concatenate()([lstm_out2, attention_out])

        # Global pooling
        pooled = Lambda(lambda x: tf.reduce_mean(x, axis=1))(concat)

        # Dense layers
        dense1 = Dense(32, activation='relu')(pooled)
        dense1 = Dropout(dropout_rate)(dense1)

        # Output layer
        outputs = Dense(self.n_features)(dense1)

        # Create model
        model = Model(inputs=inputs, outputs=outputs)

        # Compile
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )

        self.model = model
        return model

    def train(self, X_train, y_train, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=1):
        """Train the model"""
        callbacks = [
            EarlyStopping(monitor='val_loss' if X_val is not None else 'loss',
                         patience=15, restore_best_weights=True, verbose=1),
            ReduceLROnPlateau(monitor='val_loss' if X_val is not None else 'loss',
                            factor=0.5, patience=7, min_lr=1e-7, verbose=1)
        ]

        validation_data = (X_val, y_val) if X_val is not None else None

        history = self.model.fit(
            X_train, y_train,
            validation_data=validation_data,
            epochs=epochs,
            batch_size=batch_size,
            callbacks=callbacks,
            verbose=verbose
        )

        return history

    def evaluate(self, X_test, y_test):
        """Evaluate model performance"""
        predictions = self.model.predict(X_test)

        predictions_original = self.scaler.inverse_transform(predictions)
        y_test_original = self.scaler.inverse_transform(y_test)

        metrics = {}
        for i, feature in enumerate(self.features):
            mae = mean_absolute_error(y_test_original[:, i], predictions_original[:, i])
            rmse = np.sqrt(mean_squared_error(y_test_original[:, i], predictions_original[:, i]))
            r2 = r2_score(y_test_original[:, i], predictions_original[:, i])

            metrics[feature] = {'MAE': mae, 'RMSE': rmse, 'R2': r2}

            print(f"\n{feature}:")
            print(f"  MAE:  ${mae:.2f}")
            print(f"  RMSE: ${rmse:.2f}")
            print(f"  R²:   {r2:.4f}")

        return metrics, predictions_original, y_test_original

    def predict_next_n_days(self, data, n_days=4):
        """Predict next N days using the SAVED scaler"""
        last_sequence = data[self.features].values[-self.sequence_length:]
        last_sequence_scaled = self.scaler.transform(last_sequence)

        predictions = []
        current_sequence = last_sequence_scaled.copy()

        for _ in range(n_days):
            current_batch = current_sequence.reshape(1, self.sequence_length, self.n_features)
            next_pred = self.model.predict(current_batch, verbose=0)
            predictions.append(next_pred[0])
            current_sequence = np.vstack([current_sequence[1:], next_pred[0]])

        predictions_original = self.scaler.inverse_transform(np.array(predictions))
        return predictions_original

    def save_model(self, model_path='lstm_model.h5', scaler_path='scaler.pkl'):
        """Save model and scaler"""
        self.model.save(model_path)
        with open(scaler_path, 'wb') as f:
            pickle.dump(self.scaler, f)
        print(f"Model and scaler saved!")

    def load_model(self, model_path='lstm_model.h5', scaler_path='scaler.pkl'):
        """Load saved model and scaler"""
        self.model = keras.models.load_model(model_path)
        with open(scaler_path, 'rb') as f:
            self.scaler = pickle.load(f)
        print(f"Model and scaler loaded!")

## 4. Configuration

In [None]:
# Configuration
TICKER = 'AAPL'  # Change to your preferred stock
START_DATE = '2020-01-01'
END_DATE = '2024-01-01'
SEQUENCE_LENGTH = 60
PREDICTION_DAYS = 4
FEATURES = ['Close']  # Start with Close only

## 5. Initialize and Fetch Data

In [None]:
# Initialize predictor
predictor = StockPredictorLSTMAttention(
    sequence_length=SEQUENCE_LENGTH,
    prediction_days=PREDICTION_DAYS,
    features=FEATURES
)

# Fetch data
data = predictor.fetch_data(TICKER, START_DATE, END_DATE)

# Display first few rows
data.head()

## 6. Prepare Data

In [None]:
# Prepare data (proper splitting!)
X_train, y_train, X_test, y_test, test_dates = predictor.prepare_data(data, train_split=0.8)

# Split training data for validation
val_split = 0.2
val_idx = int(len(X_train) * (1 - val_split))
X_train_final = X_train[:val_idx]
y_train_final = y_train[:val_idx]
X_val = X_train[val_idx:]
y_val = y_train[val_idx:]

print(f"\nFinal split:")
print(f"  Training: {X_train_final.shape[0]} samples")
print(f"  Validation: {X_val.shape[0]} samples")
print(f"  Test: {X_test.shape[0]} samples")

## 7. Build Model

In [None]:
# Build model
model = predictor.build_model(lstm_units=[64, 32], dropout_rate=0.2)
print(f"Total parameters: {model.count_params():,}")

# Show model architecture
model.summary()

## 8. Train Model

In [None]:
# Train
history = predictor.train(
    X_train_final, y_train_final,
    X_val, y_val,
    epochs=100,
    batch_size=32
)

## 9. Plot Training History

In [None]:
# Plot training history
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Loss
axes[0].plot(history.history['loss'], label='Train Loss')
axes[0].plot(history.history['val_loss'], label='Validation Loss')
axes[0].set_title('Model Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# MAE
axes[1].plot(history.history['mae'], label='Train MAE')
axes[1].plot(history.history['val_mae'], label='Validation MAE')
axes[1].set_title('Model MAE')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('MAE')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 10. Evaluate on Test Set

In [None]:
# Evaluate
metrics, predictions, actuals = predictor.evaluate(X_test, y_test)

## 11. Visualize Predictions

In [None]:
# Plot predictions
plt.figure(figsize=(16, 7))

# Plot last 200 days
plt.plot(data.index[-200:], data['Close'].values[-200:],
         label='Historical Price', color='blue', alpha=0.7, linewidth=2)

# Plot test predictions
plt.plot(test_dates, actuals[:, 0],
         label='Actual (Test)', color='green', marker='o', markersize=4)

plt.plot(test_dates, predictions[:, 0],
         label='Predicted (Test)', color='red', marker='x', markersize=5)

plt.axvline(x=test_dates[0], color='black', linestyle='--', linewidth=2,
            label='Train/Test Split', alpha=0.6)

plt.title(f'{TICKER} Stock Price Prediction - LSTM with Attention', fontsize=16, fontweight='bold')
plt.xlabel('Date', fontsize=13)
plt.ylabel('Price ($)', fontsize=13)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## 12. Predict Future Prices

In [None]:
# Predict next 4 days
future_predictions = predictor.predict_next_n_days(data, n_days=4)

last_date = data.index[-1]
last_price = data['Close'].iloc[-1]

print(f"Last known date: {last_date.date()}")
print(f"Last known price: ${last_price:.2f}")
print("\nFuture Predictions:")

for i, pred in enumerate(future_predictions, 1):
    pred_price = pred[0]
    change = pred_price - last_price
    pct_change = (change / last_price) * 100
    print(f"Day {i}: ${pred_price:.2f} (change: ${change:+.2f}, {pct_change:+.2f}%)")

## 13. Save Model

In [None]:
# Save model and scaler
predictor.save_model('aapl_lstm_model.h5', 'aapl_scaler.pkl')

## 14. Load and Use Saved Model (Optional)

In [None]:
# Example: Load saved model later
# new_predictor = StockPredictorLSTMAttention(sequence_length=60, prediction_days=4, features=['Close'])
# new_predictor.load_model('aapl_lstm_model.h5', 'aapl_scaler.pkl')
# predictions = new_predictor.predict_next_n_days(data, n_days=4)

---

## Next Steps

1. **Try different stocks** - Change the `TICKER` variable
2. **Add more features** - Use `features=['Open', 'High', 'Low', 'Close', 'Volume']`
3. **Advanced version** - Check out the advanced tutorial with 35 technical indicators
4. **Deploy** - Use the saved model for production

---

## Resources

- **GitHub Repository:** [fenago/lstm-attention-stock-prediction](https://github.com/fenago/lstm-attention-stock-prediction)
- **Medium Article:** Complete tutorial with explanations
- **Advanced Tutorial:** Next notebook with technical indicators

---

## Disclaimer

⚠️ **This code is for educational purposes only.** Do not use for actual trading without proper risk management and professional financial advice.

---

**Dr. Ernesto Lee | drlee.io**