# Short-Term Stock Price Prediction with LSTM

This notebook implements a machine learning pipeline for predicting Indonesian stock prices (1-3 days ahead) using LSTM neural networks.

## Features:
- Multi-target prediction (1, 2, 3 days ahead)
- Advanced feature engineering optimized for short-term prediction
- Hyperparameter optimization
- Comprehensive evaluation metrics
- Caching system for efficiency

## Target Stocks:
- BBCA.JK (Bank Central Asia)
- UNVR.JK (Unilever Corporation)
- ITMG.JK (Indo Tambangraya Megah)

## 1. Import Libraries and Setup

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
import os
import pickle
import warnings
import time
from datetime import datetime, timedelta
from tqdm import tqdm

# Sklearn imports
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error
from sklearn.model_selection import TimeSeriesSplit

# TensorFlow/Keras imports
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, BatchNormalization, GRU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import regularizers
import tensorflow as tf

# Optimization imports
from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
from skopt.utils import use_named_args

# Configuration
warnings.filterwarnings('ignore')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print("All libraries imported successfully!")
print(f"TensorFlow version: {tf.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")

## 2. Configuration and Constants

In [None]:
# Target stocks
STOCKS = [
    'BBCA.JK',  # Bank Central Asia
    'UNVR.JK',  # Unilever Corporation
    'ITMG.JK',  # Indo Tambangraya Megah
]

STOCK_NAMES = {
    'BBCA.JK': 'Bank Central Asia',
    'UNVR.JK': 'Unilever Corporation',
    'ITMG.JK': 'Indo Tambangraya Megah'
}

# Configuration optimized for short-term prediction
SHORT_TERM_CONFIG = {
    'sequence_length': 30,  # Shorter for short-term prediction
    'prediction_days': [1, 2, 3],  # Predict 1, 2, 3 days ahead
    'data_years': 20,  # 20 years of historical data
    'validation_split': 0.15,  # Smaller for more training data
    'test_split': 0.1
}

print("Configuration loaded:")
print(f"Stocks: {STOCKS}")
print(f"Prediction targets: {SHORT_TERM_CONFIG['prediction_days']} days")
print(f"Sequence length: {SHORT_TERM_CONFIG['sequence_length']} days")
print(f"Historical data: {SHORT_TERM_CONFIG['data_years']} years")

## 3. Cache Manager Class

This class handles caching of data and models to improve efficiency and avoid re-downloading data.

In [None]:
class CacheManager:
    def __init__(self, cache_dir='cache'):
        self.cache_dir = cache_dir
        if not os.path.exists(cache_dir):
            os.makedirs(cache_dir)

    def get_stock_data_path(self, ticker, years):
        return os.path.join(self.cache_dir, f"{ticker}_{years}y_data.pkl")

    def get_model_path(self, model_name):
        return os.path.join(self.cache_dir, f"{model_name}.h5")

    def is_cache_valid(self, path, max_age_hours=24):
        if not os.path.exists(path):
            return False
        file_time = datetime.fromtimestamp(os.path.getmtime(path))
        now = datetime.now()
        age = now - file_time
        return age.total_seconds() < max_age_hours * 3600

    def save_stock_data(self, ticker, years, data):
        path = self.get_stock_data_path(ticker, years)
        with open(path, 'wb') as f:
            pickle.dump(data, f)

    def load_stock_data(self, ticker, years):
        path = self.get_stock_data_path(ticker, years)
        if self.is_cache_valid(path):
            try:
                with open(path, 'rb') as f:
                    return pickle.load(f)
            except Exception as e:
                print(f"Error loading cached data: {e}")
        return None

    def save_model(self, model, model_name):
        path = self.get_model_path(model_name)
        model.save(path)

    def load_model(self, model_name):
        path = self.get_model_path(model_name)
        if os.path.exists(path):
            try:
                return load_model(path)
            except Exception as e:
                print(f"Error loading cached model: {e}")
        return None

    def save_object(self, obj, name):
        path = os.path.join(self.cache_dir, f"{name}.pkl")
        with open(path, 'wb') as f:
            pickle.dump(obj, f)

    def load_object(self, name):
        path = os.path.join(self.cache_dir, f"{name}.pkl")
        if os.path.exists(path):
            try:
                with open(path, 'rb') as f:
                    return pickle.load(f)
            except Exception as e:
                print(f"Error loading cached object: {e}")
        return None

# Initialize cache manager
cache = CacheManager()
print("Cache manager initialized successfully!")

## 4. Data Retrieval Functions

In [None]:
def get_stock_data(ticker, years=20):
    """Retrieve historical stock data with caching optimization for short-term prediction"""
    cached_data = cache.load_stock_data(ticker, years)
    if cached_data is not None:
        print(f"Using cached data for {ticker}")
        return cached_data

    print(f"Downloading {ticker} data for {years} years from Yahoo Finance...")

    end_date = datetime.now()
    start_date = end_date - timedelta(days=365 * years)

    try:
        stock = yf.Ticker(ticker)
        data = stock.history(start=start_date.strftime('%Y-%m-%d'),
                            end=end_date.strftime('%Y-%m-%d'))

        if data.empty:
            print(f"No data available for {ticker}")
            return None

        data['Ticker'] = ticker
        cache.save_stock_data(ticker, years, data)
        return data
    except Exception as e:
        print(f"Error downloading data for {ticker}: {e}")
        return None

# Test data retrieval
print("Testing data retrieval...")
sample_data = get_stock_data('BBCA.JK', 1)  # Test with 1 year
if sample_data is not None:
    print(f"Sample data shape: {sample_data.shape}")
    print(f"Date range: {sample_data.index.min()} to {sample_data.index.max()}")
    print(sample_data.head())
else:
    print("Failed to retrieve sample data")

## 5. Feature Engineering

This section adds technical indicators and features optimized for short-term prediction (1-3 days).

In [None]:
def add_short_term_features(data):
    """
    Add features optimized for short-term prediction (1-3 days)
    """
    df = data.copy()

    # === Short-term momentum features ===
    # RSI with short periods
    for period in [5, 9, 14]:
        delta = df['Close'].diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(window=period).mean()
        avg_loss = loss.rolling(window=period).mean()
        avg_loss = avg_loss.replace(0, 0.00001)
        rs = avg_gain / avg_loss
        df[f'RSI_{period}'] = 100 - (100 / (1 + rs))

    # === Moving Averages for short-term trends ===
    for period in [3, 5, 7, 10, 15, 20]:
        df[f'MA_{period}'] = df['Close'].rolling(window=period).mean()
        df[f'EMA_{period}'] = df['Close'].ewm(span=period).mean()

    # === MACD optimized for short-term ===
    exp5 = df['Close'].ewm(span=5, adjust=False).mean()
    exp13 = df['Close'].ewm(span=13, adjust=False).mean()
    df['MACD_Short'] = exp5 - exp13
    df['MACD_Short_Signal'] = df['MACD_Short'].ewm(span=5, adjust=False).mean()
    df['MACD_Short_Hist'] = df['MACD_Short'] - df['MACD_Short_Signal']

    # === Short-term volatility ===
    for period in [3, 5, 7, 10]:
        df[f'Volatility_{period}d'] = df['Close'].rolling(window=period).std()
        df[f'Price_Change_{period}d'] = df['Close'].pct_change(period)

    # === Bollinger Bands for short-term ===
    for period in [10, 15, 20]:
        df[f'BB_Middle_{period}'] = df['Close'].rolling(window=period).mean()
        df[f'BB_Std_{period}'] = df['Close'].rolling(window=period).std()
        df[f'BB_Upper_{period}'] = df[f'BB_Middle_{period}'] + (df[f'BB_Std_{period}'] * 2)
        df[f'BB_Lower_{period}'] = df[f'BB_Middle_{period}'] - (df[f'BB_Std_{period}'] * 2)
        df[f'BB_Position_{period}'] = (df['Close'] - df[f'BB_Lower_{period}']) / (df[f'BB_Upper_{period}'] - df[f'BB_Lower_{period}'])

    # === Volume indicators ===
    df['Volume_MA_5'] = df['Volume'].rolling(window=5).mean()
    df['Volume_MA_10'] = df['Volume'].rolling(window=10).mean()
    df['Volume_Ratio_5'] = df['Volume'] / df['Volume_MA_5']
    df['Volume_Ratio_10'] = df['Volume'] / df['Volume_MA_10']

    # === Price momentum ===
    for period in [1, 2, 3, 5, 7]:
        df[f'Price_Momentum_{period}d'] = df['Close'] / df['Close'].shift(period) - 1
        df[f'High_Low_Ratio_{period}d'] = (df['High'] - df['Low']) / df['Close'].shift(period)

    # === Intraday features ===
    df['Daily_Return'] = df['Close'].pct_change()
    df['High_Close_Ratio'] = df['High'] / df['Close']
    df['Low_Close_Ratio'] = df['Low'] / df['Close']
    df['Open_Close_Ratio'] = df['Open'] / df['Close']
    df['Body_Size'] = abs(df['Close'] - df['Open']) / df['Close']
    df['Upper_Shadow'] = (df['High'] - np.maximum(df['Close'], df['Open'])) / df['Close']
    df['Lower_Shadow'] = (np.minimum(df['Close'], df['Open']) - df['Low']) / df['Close']

    # === Lag features relevant for short-term ===
    for lag in [1, 2, 3, 5, 7, 10]:
        df[f'Close_Lag_{lag}'] = df['Close'].shift(lag)
        df[f'Volume_Lag_{lag}'] = df['Volume'].shift(lag)
        df[f'Return_Lag_{lag}'] = df['Daily_Return'].shift(lag)

    # === Rolling statistics ===
    for period in [5, 10, 15]:
        df[f'Close_Rolling_Mean_{period}'] = df['Close'].rolling(window=period).mean()
        df[f'Close_Rolling_Std_{period}'] = df['Close'].rolling(window=period).std()
        df[f'Close_Rolling_Min_{period}'] = df['Close'].rolling(window=period).min()
        df[f'Close_Rolling_Max_{period}'] = df['Close'].rolling(window=period).max()

    # === Calendar effects ===
    df['Day_of_Week'] = df.index.dayofweek
    df['Month'] = df.index.month
    df['Quarter'] = df.index.quarter
    df['Is_Month_End'] = df.index.is_month_end.astype(int)
    df['Is_Quarter_End'] = df.index.is_quarter_end.astype(int)

    # Cyclic encoding for calendar features
    df['Day_of_Week_sin'] = np.sin(2 * np.pi * df['Day_of_Week'] / 7)
    df['Day_of_Week_cos'] = np.cos(2 * np.pi * df['Day_of_Week'] / 7)
    df['Month_sin'] = np.sin(2 * np.pi * df['Month'] / 12)
    df['Month_cos'] = np.cos(2 * np.pi * df['Month'] / 12)

    # Drop NaN values
    df.dropna(inplace=True)

    return df

# Test feature engineering
if sample_data is not None:
    print("Testing feature engineering...")
    enhanced_sample = add_short_term_features(sample_data)
    print(f"Original features: {sample_data.shape[1]}")
    print(f"Enhanced features: {enhanced_sample.shape[1]}")
    print(f"New feature count: {enhanced_sample.shape[1] - sample_data.shape[1]}")
    print("\nSample of new features:")
    new_features = [col for col in enhanced_sample.columns if col not in sample_data.columns]
    print(new_features[:10])  # Show first 10 new features
else:
    print("Cannot test feature engineering without sample data")

## 6. Data Normalization

In [None]:
def optimize_normalization(data, ticker):
    """
    Optimized normalization for short-term prediction
    """
    df = data.copy()
    scalers = {}

    # Features using MinMaxScaler
    minmax_features = ['Open', 'High', 'Low', 'Close', 'Volume']

    # Features using StandardScaler (for normally distributed features)
    standard_features = [col for col in df.columns if col not in minmax_features + ['Ticker']]

    scalers[ticker] = {}

    # MinMax scaling for price features
    for feature in minmax_features:
        if feature in df.columns:
            scaler = MinMaxScaler(feature_range=(0, 1))
            values = df[feature].values.reshape(-1, 1)
            scaler.fit(values)
            scalers[ticker][feature] = scaler
            df[feature] = scaler.transform(values).flatten()

    # Standard scaling for technical features
    for feature in standard_features:
        if feature in df.columns and not df[feature].isna().any():
            scaler = StandardScaler()
            values = df[feature].values.reshape(-1, 1)
            scaler.fit(values)
            scalers[ticker][feature] = scaler
            df[feature] = scaler.transform(values).flatten()

    return df, scalers

print("Normalization function defined successfully!")

## 7. Sequence Creation for Multi-Target Prediction

In [None]:
def create_multi_target_sequences(data, sequence_length=30, prediction_days=[1, 2, 3]):
    """
    Create sequences for multi-target prediction (1, 2, 3 days)
    """
    features = [col for col in data.columns if col != 'Ticker']
    values = data[features].values

    X, y = [], []

    for i in range(sequence_length, len(values) - max(prediction_days)):
        X.append(values[i-sequence_length:i])

        # Targets for 1, 2, 3 days ahead
        y_targets = []
        for days in prediction_days:
            if i + days < len(values):
                y_targets.append(values[i + days, features.index('Close')])
            else:
                y_targets.append(np.nan)

        y.append(y_targets)

    # Filter out rows with NaN targets
    X = np.array(X)
    y = np.array(y)

    valid_indices = ~np.isnan(y).any(axis=1)
    X = X[valid_indices]
    y = y[valid_indices]

    return X, y

print("Sequence creation function defined successfully!")

## 8. Model Building Functions

In [None]:
def build_short_term_lstm_model(sequence_length, num_features, num_targets=3):
    """
    LSTM model optimized for short-term prediction
    """
    model = Sequential([
        # First layer with more units to capture short-term patterns
        LSTM(128, return_sequences=True, input_shape=(sequence_length, num_features)),
        Dropout(0.2),
        BatchNormalization(),

        # Second layer
        LSTM(64, return_sequences=True),
        Dropout(0.2),
        BatchNormalization(),

        # Third layer
        LSTM(32, return_sequences=False),
        Dropout(0.2),

        # Dense layers for multi-target output
        Dense(64, activation='relu'),
        Dropout(0.2),
        Dense(32, activation='relu'),
        Dense(num_targets, name='predictions')  # Output for 1, 2, 3 days
    ])

    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='mse',
        metrics=['mae']
    )

    return model

def build_optimized_short_term_model(sequence_length, num_features, num_targets, params):
    """
    Model optimized with hyperparameter tuning
    """
    model = Sequential()

    # First LSTM layer
    model.add(LSTM(
        units=int(params['lstm_units_1']),
        return_sequences=True,
        input_shape=(sequence_length, num_features),
        dropout=params['lstm_dropout_1'],
        recurrent_dropout=params['recurrent_dropout_1']
    ))

    if params['use_batch_norm']:
        model.add(BatchNormalization())

    # Second LSTM layer
    model.add(LSTM(
        units=int(params['lstm_units_2']),
        return_sequences=int(params['num_lstm_layers']) > 2,
        dropout=params['lstm_dropout_2'],
        recurrent_dropout=params['recurrent_dropout_2']
    ))

    if params['use_batch_norm']:
        model.add(BatchNormalization())

    # Third LSTM layer (optional)
    if int(params['num_lstm_layers']) > 2:
        model.add(LSTM(
            units=int(params['lstm_units_3']),
            return_sequences=False,
            dropout=params['lstm_dropout_2'],
            recurrent_dropout=params['recurrent_dropout_2']
        ))

        if params['use_batch_norm']:
            model.add(BatchNormalization())

    # Dense layers
    model.add(Dense(
        units=int(params['dense_units_1']),
        activation='relu',
        kernel_regularizer=regularizers.l2(params['l2_reg'])
    ))
    model.add(Dropout(params['dense_dropout']))

    model.add(Dense(
        units=int(params['dense_units_2']),
        activation='relu',
        kernel_regularizer=regularizers.l2(params['l2_reg'])
    ))
    model.add(Dropout(params['dense_dropout']))

    # Output layer
    model.add(Dense(num_targets))

    model.compile(
        optimizer=Adam(learning_rate=params['learning_rate']),
        loss='mse',
        metrics=['mae']
    )

    return model

print("Model building functions defined successfully!")

## 9. Hyperparameter Optimization

In [None]:
def optimize_short_term_hyperparameters(X_train, y_train, X_val, y_val, num_targets=3):
    """
    Hyperparameter optimization for short-term prediction model
    """
    # Search space optimized for short-term prediction
    dimensions = [
        Integer(32, 256, name='lstm_units_1'),
        Integer(16, 128, name='lstm_units_2'),
        Integer(8, 64, name='lstm_units_3'),
        Integer(16, 128, name='dense_units_1'),
        Integer(8, 64, name='dense_units_2'),
        Real(0.1, 0.4, name='lstm_dropout_1'),
        Real(0.1, 0.4, name='lstm_dropout_2'),
        Real(0.0, 0.3, name='recurrent_dropout_1'),
        Real(0.0, 0.3, name='recurrent_dropout_2'),
        Real(0.1, 0.4, name='dense_dropout'),
        Real(0.0001, 0.01, prior='log-uniform', name='learning_rate'),
        Real(0.0001, 0.01, prior='log-uniform', name='l2_reg'),
        Integer(2, 3, name='num_lstm_layers'),
        Categorical([True, False], name='use_batch_norm'),
        Integer(16, 128, name='batch_size')
    ]

    default_params = {
        'lstm_units_1': 128,
        'lstm_units_2': 64,
        'lstm_units_3': 32,
        'dense_units_1': 64,
        'dense_units_2': 32,
        'lstm_dropout_1': 0.2,
        'lstm_dropout_2': 0.2,
        'recurrent_dropout_1': 0.1,
        'recurrent_dropout_2': 0.1,
        'dense_dropout': 0.2,
        'learning_rate': 0.001,
        'l2_reg': 0.001,
        'num_lstm_layers': 2,
        'use_batch_norm': True,
        'batch_size': 32
    }

    @use_named_args(dimensions=dimensions)
    def objective(**params):
        try:
            model = build_optimized_short_term_model(
                X_train.shape[1],
                X_train.shape[2],
                num_targets,
                params
            )

            early_stopping = EarlyStopping(
                monitor='val_loss',
                patience=10,
                restore_best_weights=True
            )

            history = model.fit(
                X_train, y_train,
                epochs=100,
                batch_size=int(params['batch_size']),
                validation_data=(X_val, y_val),
                callbacks=[early_stopping],
                verbose=0
            )

            val_loss = np.min(history.history['val_loss'])
            return val_loss

        except Exception as e:
            print(f"Error in model evaluation: {e}")
            return 1e10

    print("Starting hyperparameter optimization for short-term prediction...")
    result = gp_minimize(
        func=objective,
        dimensions=dimensions,
        n_calls=50,
        n_random_starts=10,
        x0=[default_params[dim.name] for dim in dimensions],
        random_state=42,
        verbose=True
    )

    best_params = {dim.name: value for dim, value in zip(dimensions, result.x)}

    # Ensure integer parameters
    for param in ['lstm_units_1', 'lstm_units_2', 'lstm_units_3', 'dense_units_1', 'dense_units_2', 'num_lstm_layers', 'batch_size']:
        best_params[param] = int(best_params[param])

    return best_params

print("Hyperparameter optimization function defined successfully!")

## 10. Model Evaluation Functions

In [None]:
def evaluate_short_term_model(model, X_test, y_test, scalers, ticker, prediction_days=[1, 2, 3]):
    """
    Evaluate model for short-term prediction with separate metrics for each day
    """
    y_pred = model.predict(X_test)

    # Inverse transform predictions
    close_scaler = scalers[ticker]['Close']

    results = {}

    for i, days in enumerate(prediction_days):
        # Extract predictions for specific day
        y_true_day = y_test[:, i]
        y_pred_day = y_pred[:, i]

        # Inverse transform
        y_true_day_inv = close_scaler.inverse_transform(y_true_day.reshape(-1, 1)).flatten()
        y_pred_day_inv = close_scaler.inverse_transform(y_pred_day.reshape(-1, 1)).flatten()

        # Calculate metrics
        mse = mean_squared_error(y_true_day_inv, y_pred_day_inv)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_true_day_inv, y_pred_day_inv)
        mape = mean_absolute_percentage_error(y_true_day_inv, y_pred_day_inv)
        r2 = r2_score(y_true_day_inv, y_pred_day_inv)

        # Direction accuracy
        actual_direction = np.sign(np.diff(y_true_day_inv))
        pred_direction = np.sign(np.diff(y_pred_day_inv))
        direction_accuracy = np.mean(actual_direction == pred_direction) * 100

        results[f'{days}d'] = {
            'MSE': mse,
            'RMSE': rmse,
            'MAE': mae,
            'MAPE': mape,
            'R2': r2,
            'Direction_Accuracy': direction_accuracy,
            'y_true': y_true_day_inv,
            'y_pred': y_pred_day_inv
        }

    return results

print("Evaluation function defined successfully!")

## 11. Visualization Functions

In [None]:
def plot_short_term_predictions(results, ticker, prediction_days=[1, 2, 3]):
    """
    Visualization of short-term prediction results
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    axes = axes.flatten()

    for i, days in enumerate(prediction_days):
        ax = axes[i]
        day_results = results[f'{days}d']

        y_true = day_results['y_true']
        y_pred = day_results['y_pred']

        # Plot actual vs predicted
        ax.scatter(y_true, y_pred, alpha=0.5)
        ax.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
        ax.set_xlabel('Actual Price')
        ax.set_ylabel('Predicted Price')
        ax.set_title(f'{ticker} - {days} Day Prediction\nR² = {day_results["R2"]:.4f}, Direction Acc = {day_results["Direction_Accuracy"]:.1f}%')
        ax.grid(True, alpha=0.3)

    # Plot time series for last 100 predictions
    ax = axes[3]
    n_show = min(100, len(results['1d']['y_true']))
    x_axis = range(n_show)

    for days in prediction_days:
        day_results = results[f'{days}d']
        y_true = day_results['y_true'][-n_show:]
        y_pred = day_results['y_pred'][-n_show:]

        ax.plot(x_axis, y_pred, label=f'{days}d Pred', linestyle='--')

    ax.plot(x_axis, results['1d']['y_true'][-n_show:], label='Actual', color='black', linewidth=2)
    ax.set_xlabel('Time')
    ax.set_ylabel('Price')
    ax.set_title('Time Series Comparison (Last 100 points)')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

print("Visualization function defined successfully!")

## 12. Main Pipeline Function

In [None]:
def short_term_prediction_pipeline(stocks=STOCKS, optimize=True, train_new_model=True):
    """
    Main pipeline for short-term prediction (1, 2, 3 days)
    """
    config = SHORT_TERM_CONFIG
    all_results = {}

    for ticker in stocks:
        print(f"\n===== Processing {ticker} ({STOCK_NAMES.get(ticker, ticker)}) =====")

        # 1. Get data
        data = get_stock_data(ticker, config['data_years'])
        if data is None or len(data) < 1000:
            print(f"Insufficient data for {ticker}")
            continue

        # 2. Feature engineering
        print("Adding short-term features...")
        enhanced_data = add_short_term_features(data)

        if len(enhanced_data) < 500:
            print(f"Insufficient data after feature engineering for {ticker}")
            continue

        # 3. Normalization
        print("Normalizing data...")
        normalized_data, scalers = optimize_normalization(enhanced_data, ticker)

        # 4. Create sequences
        print("Creating sequences for multi-target prediction...")
        X, y = create_multi_target_sequences(
            normalized_data.drop(columns=['Ticker']),
            sequence_length=config['sequence_length'],
            prediction_days=config['prediction_days']
        )

        if len(X) < 100:
            print(f"Insufficient sequences for {ticker}")
            continue

        # 5. Split data
        test_size = int(len(X) * config['test_split'])
        val_size = int(len(X) * config['validation_split'])

        X_train = X[:-test_size-val_size]
        y_train = y[:-test_size-val_size]
        X_val = X[-test_size-val_size:-test_size]
        y_val = y[-test_size-val_size:-test_size]
        X_test = X[-test_size:]
        y_test = y[-test_size:]

        print(f"Data split - Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")

        # 6. Model training
        model_name = f"short_term_lstm_{ticker.replace('.', '_')}"
        model = None

        if not train_new_model:
            model = cache.load_model(model_name)

        if model is None:
            if optimize:
                print("Optimizing hyperparameters...")
                params = optimize_short_term_hyperparameters(X_train, y_train, X_val, y_val)
                print(f"Best parameters: {params}")

                model = build_optimized_short_term_model(
                    X_train.shape[1],
                    X_train.shape[2],
                    len(config['prediction_days']),
                    params
                )
            else:
                print("Using default model...")
                model = build_short_term_lstm_model(
                    X_train.shape[1],
                    X_train.shape[2],
                    len(config['prediction_days'])
                )

            # Train model
            callbacks = [
                EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
                ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.00001)
            ]

            print("Training model...")
            history = model.fit(
                X_train, y_train,
                epochs=150,
                batch_size=32,
                validation_data=(X_val, y_val),
                callbacks=callbacks,
                verbose=1
            )

            # Save model
            cache.save_model(model, model_name)

            # Plot training history
            plt.figure(figsize=(12, 4))
            plt.subplot(1, 2, 1)
            plt.plot(history.history['loss'], label='Train Loss')
            plt.plot(history.history['val_loss'], label='Val Loss')
            plt.title(f'{ticker} - Training Loss')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.legend()
            plt.grid(True, alpha=0.3)

            plt.subplot(1, 2, 2)
            plt.plot(history.history['mae'], label='Train MAE')
            plt.plot(history.history['val_mae'], label='Val MAE')
            plt.title(f'{ticker} - Training MAE')
            plt.xlabel('Epoch')
            plt.ylabel('MAE')
            plt.legend()
            plt.grid(True, alpha=0.3)

            plt.tight_layout()
            plt.show()

        # 7. Evaluation
        print("Evaluating model...")
        results = evaluate_short_term_model(
            model, X_test, y_test, scalers, ticker, config['prediction_days']
        )

        # Print results
        print(f"\n{ticker} ({STOCK_NAMES.get(ticker, ticker)}) Results:")
        print("-" * 60)
        for days in config['prediction_days']:
            day_results = results[f'{days}d']
            print(f"{days} Day Prediction:")
            print(f"  R²: {day_results['R2']:.4f}")
            print(f"  RMSE: {day_results['RMSE']:.4f}")
            print(f"  MAE: {day_results['MAE']:.4f}")
            print(f"  MAPE: {day_results['MAPE']:.2f}%")
            print(f"  Direction Accuracy: {day_results['Direction_Accuracy']:.1f}%")

        # 8. Visualization
        plot_short_term_predictions(results, ticker, config['prediction_days'])

        # 9. Store results
        all_results[ticker] = {
            'model': model,
            'results': results,
            'scalers': scalers,
            'config': config
        }

    return all_results

print("Main pipeline function defined successfully!")

## 13. Execute the Pipeline

Now let's run the complete pipeline. You can adjust the parameters below:

In [None]:
# Configuration for execution
stocks_to_process = STOCKS  # Can be changed to process specific stocks
enable_optimization = True   # Set to False for faster execution without hyperparameter tuning
train_new_models = True     # Set to False to use cached models if available

print("Starting Short-Term Stock Prediction Pipeline")
print(f"Stocks: {stocks_to_process}")
print(f"Prediction targets: {SHORT_TERM_CONFIG['prediction_days']} days")
print(f"Historical data: {SHORT_TERM_CONFIG['data_years']} years")
print(f"Sequence length: {SHORT_TERM_CONFIG['sequence_length']} days")
print(f"Hyperparameter optimization: {enable_optimization}")
print(f"Train new models: {train_new_models}")

# Run the pipeline
results = short_term_prediction_pipeline(
    stocks=stocks_to_process,
    optimize=enable_optimization,
    train_new_model=train_new_models
)

## 14. Summary Results

In [None]:
# Display summary results
print("\n" + "="*80)
print("SUMMARY RESULTS")
print("="*80)

for ticker, result in results.items():
    print(f"\n{ticker} ({STOCK_NAMES.get(ticker, ticker)}):")
    for days in SHORT_TERM_CONFIG['prediction_days']:
        day_results = result['results'][f'{days}d']
        print(f"  {days}d: R²={day_results['R2']:.4f}, RMSE={day_results['RMSE']:.4f}, Dir.Acc={day_results['Direction_Accuracy']:.1f}%")

print("\nPipeline completed successfully!")

## 15. Additional Analysis (Optional)

You can add more analysis cells here for deeper insights:

In [None]:
# Create a summary DataFrame for easy comparison
import pandas as pd

summary_data = []
for ticker, result in results.items():
    for days in SHORT_TERM_CONFIG['prediction_days']:
        day_results = result['results'][f'{days}d']
        summary_data.append({
            'Stock': ticker,
            'Company': STOCK_NAMES.get(ticker, ticker),
            'Prediction_Days': days,
            'R2': day_results['R2'],
            'RMSE': day_results['RMSE'],
            'MAE': day_results['MAE'],
            'MAPE': day_results['MAPE'],
            'Direction_Accuracy': day_results['Direction_Accuracy']
        })

summary_df = pd.DataFrame(summary_data)
print("\nDetailed Results Summary:")
print(summary_df.to_string(index=False))

# Save results to CSV
summary_df.to_csv('stock_prediction_results.csv', index=False)
print("\nResults saved to 'stock_prediction_results.csv'")

In [None]:
# Plot comparative performance
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# R² comparison
axes[0,0].bar(range(len(summary_df)), summary_df['R2'])
axes[0,0].set_title('R² Score Comparison')
axes[0,0].set_ylabel('R² Score')
axes[0,0].set_xticks(range(len(summary_df)))
axes[0,0].set_xticklabels([f"{row['Stock']}\n{row['Prediction_Days']}d" for _, row in summary_df.iterrows()], rotation=45)

# RMSE comparison
axes[0,1].bar(range(len(summary_df)), summary_df['RMSE'])
axes[0,1].set_title('RMSE Comparison')
axes[0,1].set_ylabel('RMSE')
axes[0,1].set_xticks(range(len(summary_df)))
axes[0,1].set_xticklabels([f"{row['Stock']}\n{row['Prediction_Days']}d" for _, row in summary_df.iterrows()], rotation=45)

# Direction Accuracy comparison
axes[1,0].bar(range(len(summary_df)), summary_df['Direction_Accuracy'])
axes[1,0].set_title('Direction Accuracy Comparison')
axes[1,0].set_ylabel('Direction Accuracy (%)')
axes[1,0].set_xticks(range(len(summary_df)))
axes[1,0].set_xticklabels([f"{row['Stock']}\n{row['Prediction_Days']}d" for _, row in summary_df.iterrows()], rotation=45)

# MAPE comparison
axes[1,1].bar(range(len(summary_df)), summary_df['MAPE'])
axes[1,1].set_title('MAPE Comparison')
axes[1,1].set_ylabel('MAPE (%)')
axes[1,1].set_xticks(range(len(summary_df)))
axes[1,1].set_xticklabels([f"{row['Stock']}\n{row['Prediction_Days']}d" for _, row in summary_df.iterrows()], rotation=45)

plt.tight_layout()
plt.show()

## 16. Model Saving and Loading for Future Use

In [None]:
# Save all results for future use
cache.save_object(results, 'final_results')
print("All results saved to cache!")

# Example of how to load and use a specific model for new predictions
def make_prediction(ticker, new_data, prediction_days=[1, 2, 3]):
    """
    Make new predictions using trained model
    """
    if ticker not in results:
        print(f"No trained model available for {ticker}")
        return None
    
    model = results[ticker]['model']
    scalers = results[ticker]['scalers']
    config = results[ticker]['config']
    
    # Process new data (feature engineering + normalization)
    enhanced_data = add_short_term_features(new_data)
    
    # Apply same normalization
    normalized_data = enhanced_data.copy()
    for feature, scaler in scalers[ticker].items():
        if feature in normalized_data.columns:
            values = normalized_data[feature].values.reshape(-1, 1)
            normalized_data[feature] = scaler.transform(values).flatten()
    
    # Create sequence for prediction
    features = [col for col in normalized_data.columns if col != 'Ticker']
    values = normalized_data[features].values
    
    if len(values) < config['sequence_length']:
        print(f"Need at least {config['sequence_length']} days of data")
        return None
    
    # Use last sequence for prediction
    X = values[-config['sequence_length']:].reshape(1, config['sequence_length'], -1)
    
    # Make prediction
    pred_normalized = model.predict(X)
    
    # Inverse transform
    close_scaler = scalers[ticker]['Close']
    predictions = close_scaler.inverse_transform(pred_normalized).flatten()
    
    # Return predictions with labels
    pred_dict = {}
    for i, days in enumerate(prediction_days):
        pred_dict[f'{days}d'] = predictions[i]
    
    return pred_dict

print("Prediction function defined! Use make_prediction(ticker, new_data) to get predictions.")