# ARIMA-GARCH-SCA-LSTM Hybrid Model for Stock Price Forecasting

This notebook implements the ARIMA-GARCH-SCA-LSTM model to forecast stock prices using log returns of stock prices. The motivation behind this choice lies in the model's ability to address both the linear and non-linear dynamics inherent in financial time series data, particularly stock market returns. 

## Why This Model?

### 1. **ARIMA-GARCH for Trend and Volatility**
Stock returns are known to have both linear and non-linear dependencies, and volatility clustering is a well-documented phenomenon. The ARIMA (AutoRegressive Integrated Moving Average) model is widely used to capture the linear dependencies and trends in time series data, while the GARCH (Generalized Autoregressive Conditional Heteroskedasticity) model captures time-varying volatility (volatility clustering).

- **ARIMA**: It captures the trend or linear patterns in the log returns. ARIMA is appropriate for modeling stable and predictable changes over time, making it a strong choice for identifying the underlying trend.
  
- **GARCH**: Since stock returns tend to exhibit periods of high volatility followed by low volatility, GARCH models are effective in capturing this clustering effect, where large changes in returns are often followed by large changes, and small changes are followed by small changes. GARCH is chosen to model the time-varying volatility in stock returns.

### 2. **LSTM for Non-Linear Dependencies**
While ARIMA-GARCH is powerful for modeling linear dependencies and volatility, it is not well-suited to capture the non-linear relationships in stock price movements. For that, we use a Long Short-Term Memory (LSTM) network, a type of recurrent neural network (RNN) that excels in learning long-term dependencies in time series data.

- **LSTM**: LSTMs are ideal for capturing the residual errors from ARIMA-GARCH, which represent the non-linear, chaotic elements in the stock price. By modeling the residuals, the LSTM can learn and predict the non-linear patterns that ARIMA-GARCH may fail to capture, improving overall accuracy.

### 3. **Sine Cosine Algorithm (SCA) for Hyperparameter Optimization**
The Sine Cosine Algorithm is employed to optimize the hyperparameters for both ARIMA-GARCH and LSTM. Selecting the best hyperparameters is crucial for maximizing the model's performance, but traditional grid search methods are computationally expensive. SCA provides a more efficient approach, allowing us to automatically tune key parameters such as:

- For ARIMA-GARCH: ARIMA's \(p\), \(d\), \(q\) parameters and GARCH's volatility modeling parameters.
- For LSTM: Hidden layer size, learning rate, and epochs.

SCA alternates between exploration and exploitation, balancing the search between diverse areas of the hyperparameter space and refining near-optimal solutions. This enables the model to capture both stable trends and volatile spikes efficiently.

## Summary of Model Benefits:

1. **Comprehensive Modeling**: The combination of ARIMA-GARCH for stable and volatile components, and LSTM for non-linear residuals, ensures that all aspects of stock price movements are accounted for.
  
2. **Adaptability**: By using SCA for hyperparameter tuning, the model is adaptable to different datasets, optimizing performance without manual intervention.
  
3. **Financial Time Series Specificity**: The model architecture directly addresses key features of financial data, including volatility clustering and long-term dependencies, while maintaining robustness in the face of unpredictable market shocks.

## Notebook Workflow:

1. **Data Preprocessing**:
   - Normalize log returns.
   - Split the time series into training and testing sets.
  
2. **ARIMA-GARCH Model**:
   - Fit the ARIMA-GARCH model to capture linear trends and volatility.
   - Optimize ARIMA-GARCH parameters using SCA.

3. **Residual Extraction**:
   - Extract the residuals from the ARIMA-GARCH model for further analysis.

4. **LSTM Model**:
   - Train the LSTM model on the residuals from ARIMA-GARCH.
   - Optimize LSTM parameters using SCA.
  
5. **Forecasting**:
   - Combine ARIMA-GARCH and LSTM outputs for final stock price predictions.
   - Evaluate the model's performance on a test dataset.

6. **Evaluation**:
   - Evaluate the performance using metrics such as RMSE, MAE, and assess the robustness of predictions.

This model provides a strong foundation for forecasting stock price movements by leveraging advanced statistical models (ARIMA-GARCH) and deep learning techniques (LSTM) with efficient hyperparameter optimization (SCA).


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import keras

# ARIMA and GARCH models
from statsmodels.tsa.arima.model import ARIMA
from arch import arch_model

# LSTM for deep learning
from keras import models
from keras import layers
from keras.src.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler

# Optimization algorithm
import random

# Performance metrics
from sklearn.metrics import mean_squared_error
from scipy.stats import t

import warnings
warnings.filterwarnings('ignore')


## 1. Data Preprocessing

The first step in our workflow is to preprocess the stock price data. This involves:

1. Fetching stock price data using the `yfinance` API.
2. Calculating log returns for the close prices to stabilize the variance.
3. Splitting the data into training and test sets for model development and evaluation.
4. Scaling the log returns for input to the neural network (LSTM).

We also visualize the stock price and the log returns to understand the data's underlying behavior.


In [None]:
def load_stock_data(ticker, folder='data/processed'):
    """
    Load stock price data from a saved CSV file.
    
    Parameters:
    ticker (str): Stock ticker symbol.
    folder (str): Folder path where CSVs are stored.
    
    Returns:
    pd.DataFrame: DataFrame containing stock data and log returns.
    """
    filename = f"{folder}/{ticker}_processed.csv"
    data = pd.read_csv(filename, index_col='Date', parse_dates=True)

    return data

In [None]:
def fit_arima(data, order=(1, 0, 1)):
    """
    Fit ARIMA model to log returns.
    
    Parameters:
    data (pd.DataFrame): DataFrame containing log returns.
    order (tuple): ARIMA (p,d,q) order.
    
    Returns:
    model: Fitted ARIMA model.
    """
    model = ARIMA(data['Log_Return_Close_Close'], order=order)
    arima_result = model.fit()
    return arima_result

In [None]:
def fit_garch(arima_residuals, order=(1, 1)):
    """
    Fit GARCH model to ARIMA residuals.
    
    Parameters:
    arima_residuals (np.array): Residuals from ARIMA model.
    order (tuple): GARCH (p, q) order.
    
    Returns:
    model: Fitted GARCH model.
    """
    garch_model = arch_model(arima_residuals, vol='Garch', p=order[0], q=order[1])
    garch_result = garch_model.fit(disp='off')
    return garch_result


In [None]:
def prepare_lstm_data(data, look_back=5):
    """
    Prepare data for LSTM model.
    
    Parameters:
    data (pd.DataFrame): DataFrame containing log returns.
    look_back (int): Number of time steps to consider for the LSTM input.
    
    Returns:
    X_train, y_train: Training data for LSTM.
    """
    # Scale data for LSTM
    scaler = MinMaxScaler(feature_range=(-1, 1))
    log_returns_scaled = scaler.fit_transform(data['Log_Return_Close_Close'].values.reshape(-1, 1))

    # Prepare the dataset with look-back
    X, y = [], []
    for i in range(len(log_returns_scaled) - look_back):
        X.append(log_returns_scaled[i:i+look_back])
        y.append(log_returns_scaled[i+look_back])

    X, y = np.array(X), np.array(y)
    return X, y, scaler


In [None]:
def build_lstm(input_shape, units=50):
    """
    Build and compile LSTM model.
    
    Parameters:
    input_shape (tuple): Shape of the input data for LSTM.
    units (int): Number of LSTM units.
    
    Returns:
    model: Compiled LSTM model.
    """
    model = keras.Sequential()
    model.add(layers.LSTM(units=units, input_shape=input_shape))
    model.add(layers.Dense(1))
    model.compile(loss='mean_squared_error', optimizer='adam')
    return model

In [None]:
def sine_cosine_algorithm(obj_func, dim, lb, ub, max_iter=100, n_agents=10):
    """
    Sine Cosine Algorithm (SCA) for global optimization.
    
    Parameters:
    obj_func (function): Objective function to minimize.
    dim (int): Number of dimensions (parameters to optimize).
    lb (list): Lower bounds for the parameters.
    ub (list): Upper bounds for the parameters.
    max_iter (int): Maximum number of iterations.
    n_agents (int): Number of agents (candidate solutions).
    
    Returns:
    list: Best solution found.
    float: Best fitness value.
    """
    # Initialize the population (random solutions within bounds)
    positions = np.random.uniform(lb, ub, (n_agents, dim))

    # Initialize best solution
    best_pos = np.zeros(dim)
    best_fitness = float("inf")

    for iteration in range(max_iter):
        for i in range(n_agents):
            # Update position using sine and cosine
            r1 = np.random.rand()
            r2 = np.random.rand()
            r3 = np.random.rand()
            r4 = np.random.rand()

            for j in range(dim):
                if r3 < 0.5:
                    positions[i, j] += r1 * np.sin(r2) * abs(r4 * best_pos[j] - positions[i, j])
                else:
                    positions[i, j] += r1 * np.cos(r2) * abs(r4 * best_pos[j] - positions[i, j])

                # Enforce boundaries
                positions[i, j] = np.clip(positions[i, j], lb[j], ub[j])

        # Evaluate fitness for all agents
        for i in range(n_agents):
            fitness = obj_func(positions[i])
            if fitness < best_fitness:
                best_fitness = fitness
                best_pos = positions[i].copy()

        print(f"Iteration {iteration + 1}/{max_iter}, Best Fitness: {best_fitness}")

    return best_pos, best_fitness

In [None]:
# Define objective function to minimize
def objective_function(params, data):
    p, q, lstm_units = int(params[0]), int(params[1]), int(params[2])

    # Fit ARIMA with (p, 0, q)
    arima_result = fit_arima(data, order=(p, 0, q))

    # Fit GARCH on ARIMA residuals
    garch_result = fit_garch(arima_result.resid)

    # Fit LSTM
    X_train, y_train, scaler = prepare_lstm_data(data)
    lstm_model = build_lstm(X_train.shape[1:], units=lstm_units)
    lstm_model.fit(X_train, y_train, epochs=10, batch_size=1, verbose=0)

    # Predict with LSTM and evaluate on training set
    predictions = lstm_model.predict(X_train)
    predictions_inverse = scaler.inverse_transform(predictions)
    y_train_inverse = scaler.inverse_transform(y_train.reshape(-1, 1))

    error = mean_squared_error(y_train_inverse, predictions_inverse)
    return error


In [None]:
def split_data(data, train_size=0.8):
    """
    Split the data into training and testing sets.
    
    Parameters:
    data (pd.DataFrame): The input dataset.
    train_size (float): Proportion of the dataset to include in the training set (default is 0.8).
    
    Returns:
    train_data (pd.DataFrame): Training set.
    test_data (pd.DataFrame): Testing set.
    """
    split_index = int(len(data) * train_size)
    train_data = data[:split_index]
    test_data = data[split_index:]

    return train_data, test_data


In [None]:
from statsmodels.tsa.arima.model import ARIMA
from arch import arch_model

def arima_garch_model(train_data):
    """
    Fit an ARIMA-GARCH model to the training data.
    
    Parameters:
    train_data (pd.DataFrame): The input training dataset.
    
    Returns:
    arima_garch_predictions (np.array): The predicted values using the ARIMA-GARCH model.
    """
    # Fit ARIMA model
    arima_model = ARIMA(train_data['Log_Return_Close_Close'], order=(5, 1, 0))
    arima_result = arima_model.fit()

    # Fit GARCH model
    garch_model = arch_model(arima_result.resid, vol='Garch', p=1, q=1)
    garch_result = garch_model.fit(disp="off")

    # Generate predictions using ARIMA + GARCH
    forecast = arima_result.get_forecast(steps=len(train_data))
    arima_garch_predictions = forecast.predicted_mean + garch_result.conditional_volatility

    return arima_garch_predictions


In [None]:
def combine_predictions(arima_garch_predictions, lstm_predictions, weight=0.5):
    """
    Combine the predictions from ARIMA-GARCH and LSTM models.
    
    Parameters:
    arima_garch_predictions (np.array): Predictions from the ARIMA-GARCH model.
    lstm_predictions (np.array): Predictions from the LSTM model.
    weight (float): The weighting to apply to the ARIMA-GARCH model (default is 0.5 for equal weighting).
    
    Returns:
    combined_predictions (np.array): The combined predictions.
    """
    combined_predictions = (weight * arima_garch_predictions) + ((1 - weight) * lstm_predictions)

    return combined_predictions


In [None]:
def run_arima_garch_sca_lstm_pipeline(ticker, data):
    """
    Run the entire pipeline for the ARIMA-GARCH-SCA-LSTM model.

    Parameters:
    ticker (str): Stock ticker symbol.
    data (pd.DataFrame): Preprocessed stock data with log returns and other features.

    Returns:
    pd.Series: Final combined forecast.
    """
    # Step 1: Split data into train and test sets
    train_data, test_data = split_data(data)

    # Step 2: Run ARIMA-GARCH model on the training data
    arima_garch_predictions = arima_garch_model(train_data)

    # Step 3: Run LSTM model (after SCA optimization) on the training data
    lstm_predictions = run_sca_lstm(train_data)  # run_sca_lstm combines SCA optimization and LSTM model

    # Step 4: Combine ARIMA-GARCH and LSTM predictions
    final_predictions = combine_predictions(arima_garch_predictions, lstm_predictions)

    # Step 5: Evaluate on test data (if you have evaluation code or metrics)
    # evaluate_model(test_data, final_predictions)

    return final_predictions


In [None]:
def run_sca_lstm(data, population_size=10, generations=50):
    """
    Run Sine Cosine Algorithm (SCA) to optimize and train an LSTM model.
    
    Parameters:
    data (pd.DataFrame): Training data.
    population_size (int): Number of candidates in the population for SCA.
    generations (int): Number of generations for SCA optimization.
    
    Returns:
    pd.Series: Forecasted LSTM predictions.
    """
    # Step 1: Initialize SCA population
    sca_population = initialize_sca(population_size, dimension=3)  # e.g., hidden units, learning rate, etc.

    # Step 2: Optimize LSTM hyperparameters using SCA
    best_hyperparams = optimize_sca(sca_population, data, generations)

    # Step 3: Train the LSTM model with the optimized hyperparameters
    lstm_model = train_lstm(data, best_hyperparams)

    # Step 4: Make predictions using the LSTM model
    lstm_predictions = lstm_model.predict(data)

    return lstm_predictions


In [None]:
def initialize_sca(population_size, dimension):
    """
    Initialize the population for the Sine Cosine Algorithm (SCA).

    Parameters:
    population_size (int): Number of candidates in the population.
    dimension (int): Number of hyperparameters to optimize.

    Returns:
    np.array: Initialized population of size (population_size, dimension).
    """
    # Randomly initialize population within predefined bounds
    # For example, [hidden units, learning rate, dropout] - adjust bounds as needed
    lower_bound = [10, 0.001, 0.1]  # min values for each parameter
    upper_bound = [100, 0.01, 0.5]  # max values for each parameter

    population = np.random.uniform(low=lower_bound, high=upper_bound, size=(population_size, dimension))
    return population


In [None]:
def update_candidate(candidate, gen, max_gen):
    """
    Update the candidate solution using sine and cosine functions.
    
    Parameters:
    candidate (np.array): Current candidate solution.
    gen (int): Current generation number.
    max_gen (int): Maximum number of generations.
    
    Returns:
    np.array: Updated candidate solution.
    """
    # Random numbers to control the balance between exploration and exploitation
    r1 = np.random.uniform(0, 1)
    r2 = np.random.uniform(0, 2 * np.pi)
    r3 = np.random.uniform(0, 1)

    # A coefficient that controls the movement direction (influence of current generation on exploration/exploitation)
    a = r3 - (gen / max_gen)

    # Update each hyperparameter in the candidate using the sine/cosine mechanism
    updated_candidate = candidate + a * np.sin(r2) * abs(r1 * candidate) if r1 < 0.5 else candidate + a * np.cos(r2) * abs(r1 * candidate)

    return updated_candidate

In [None]:
def evaluate_candidate(candidate, data):
    """
    Evaluate the fitness of a candidate hyperparameter set by training an LSTM model.
    
    Parameters:
    candidate (np.array): Hyperparameter set to evaluate (e.g., [hidden_units, learning_rate, dropout]).
    data (pd.DataFrame): Training data.
    
    Returns:
    float: Fitness score (lower is better).
    """
    # Train the LSTM model with the given candidate hyperparameters
    model = train_lstm(data, candidate)

    # Evaluate the model on validation data
    X_val, y_val = prepare_lstm_data(data, validation=True)  # Prepare validation data
    loss = model.evaluate(X_val, y_val)

    return loss  # Lower loss means better fitness


In [None]:
def optimize_sca(population, data, generations):
    """
    Optimize hyperparameters using the Sine Cosine Algorithm (SCA).

    Parameters:
    population (np.array): Initialized population of hyperparameter sets.
    data (pd.DataFrame): Training data for the LSTM model.
    generations (int): Number of generations to optimize over.

    Returns:
    np.array: Best hyperparameters found by SCA.
    """
    best_candidate = None
    best_fitness = float('inf')  # We're minimizing the error (fitness)

    for gen in range(generations):
        for i in range(population.shape[0]):
            candidate = population[i]

            # Update the candidate solution using the sine and cosine components (this is SCA logic)
            candidate = update_candidate(candidate, gen, generations)

            # Evaluate the candidate by training an LSTM model with those hyperparameters
            fitness = evaluate_candidate(candidate, data)

            # Track the best candidate
            if fitness < best_fitness:
                best_fitness = fitness
                best_candidate = candidate

    return best_candidate


In [None]:






def train_lstm(data, hyperparameters):
    """
    Train an LSTM model using the optimized hyperparameters.

    Parameters:
    data (pd.DataFrame): Training data for the LSTM model.
    hyperparameters (list): Optimized hyperparameters [hidden_units, learning_rate, dropout].

    Returns:
    keras.Model: Trained LSTM model.
    """

    # Extract hyperparameters
    hidden_units = int(hyperparameters[0])
    learning_rate = hyperparameters[1]
    dropout_rate = hyperparameters[2]

    # Build LSTM model
    model = keras.models.Sequential()
    model.add(layers.LSTM(units=hidden_units, return_sequences=True, input_shape=(data.shape[1], 1)))
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.LSTM(units=hidden_units))
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(1))  # Output layer for predicting stock prices

    # Compile the model with Adam optimizer and mean squared error loss
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='mean_squared_error')

    # Train the model (assuming you have prepared X_train and y_train)
    X_train, y_train = prepare_lstm_data(data)
    model.fit(X_train, y_train, epochs=10, batch_size=32)  # Tune epochs and batch size as needed

    return model


In [None]:
def run_workflow_sca_lstm(data, generations=50, population_size=10):
    """
    Workflow to run Sine Cosine Algorithm (SCA) combined with LSTM model training.
    
    Parameters:
    data (pd.DataFrame): Stock log returns data.
    generations (int): Number of generations for optimization.
    population_size (int): Number of candidate solutions in each generation.
    
    Returns:
    best_candidate: The best candidate hyperparameters found.
    """
    # Step 1: Initialize the population (random candidates)
    population = initialize_sca(population_size)

    best_candidate = None
    best_score = float('inf')

    for gen in range(generations):
        print(f"Generation {gen + 1}/{generations}")

        for candidate in population:
            # Step 2: Evaluate each candidate (train LSTM and compute fitness score)
            fitness_score = evaluate_candidate(candidate, data)

            # Step 3: Update the best solution if current candidate is better
            if fitness_score < best_score:
                best_score = fitness_score
                best_candidate = candidate

        # Step 4: Update population using SCA (optimize the candidates)
        population = optimize_sca(population, gen, generations)

    print(f"Best candidate found: {best_candidate} with score {best_score}")
    return best_candidate
