[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Mund99/Financial_Price_Forecasting/blob/main/ARIMA_model.ipynb)

# Utils.py

In [None]:
import pandas as pd
import yfinance as yf
import statsmodels.api as sm
import matplotlib.pyplot as plt
from sklearn import metrics


def get_stock_data(ticker_symbol, start_date, end_date):
    """
    Retrieve historical stock data from Yahoo Finance API.

    Parameters:
    - ticker_symbol: Ticker symbol of the stock (e.g., "601988.SS").
    - start_date: Start date in the format "YYYY-MM-DD".
    - end_date: End date in the format "YYYY-MM-DD".

    Returns:
    - DataFrame containing the historical stock data with 'Date' as the index
      and frequency set to 'D' (daily).
    """
    # Download historical stock data
    df = yf.download(ticker_symbol, start=start_date, end=end_date)
        
    return df

# Example usage:
# stock_data = get_stock_data("AAPL", "2001-01-01", "2022-01-01")


def perform_adf_test(data):
    """
    Perform Augmented Dickey-Fuller test to check for stationarity in time series data.

    Parameters:
    - data: Time series data (1D array, Series, or DataFrame column).

    Returns:
    - None

    This function performs the Augmented Dickey-Fuller (ADF) test on the input time series data
    to determine its stationarity. It prints the ADF test results, including the ADF statistic,
    p-value, and critical values. It also provides an interpretation of the test results,
    indicating whether the data is stationary or not based on a significance level of 0.05.
    """
    # Perform Augmented Dickey-Fuller test
    result = sm.tsa.adfuller(data)
    
    # Print ADF test results
    print("ADF Test Results:")
    print(f"ADF Statistic: {result[0]}")
    print(f"P-value: {result[1]}")
    print(f"Lags Used: {result[2]}")
    print(f"Number of Observations Used: {result[3]}")
    print("\nCritical Values:")
    
    # Print critical values
    for key, value in result[4].items():
        print(f"\t{key}: {value}")

    # Interpret the results
    if result[1] <= 0.05:
        print("\nReject the null hypothesis. The data is stationary.")
    else:
        print("\nFail to reject the null hypothesis. The data is not stationary.")

# Example usage:
# Assuming 'data' is a pandas Series or DataFrame column containing your time series data
# perform_adf_test(data)


def plot_acf_pacf(data, acf_lags=40, pacf_lags=40, figsize=(10, 8)):
    """
    Plot ACF and PACF for a given time series data.

    Parameters:
    - data: Time series data (1D array, Series, or DataFrame column).
    - acf_lags: Number of lags to include in the ACF plot. Default is 40.
    - pacf_lags: Number of lags to include in the PACF plot. Default is 40.
    - figsize: Size of the figure (width, height). Default is (10, 8).

    Returns:
    - None (displays the plots).
    """
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize)
    
    # Plot ACF
    sm.graphics.tsa.plot_acf(data, lags=acf_lags, ax=ax1)
    ax1.set_title('Autocorrelation Function (ACF)')
    ax1.set_xlabel("Correlation Coefficient")

    # Plot PACF
    sm.graphics.tsa.plot_pacf(data, lags=pacf_lags, ax=ax2)
    ax2.set_title('Partial Autocorrelation Function (PACF)')
    ax2.set_xlabel("Correlation Coefficient")

    # Display the plots
    plt.tight_layout()
    plt.show()

# Example usage:
# Assuming 'data' is a pandas Series or DataFrame column containing your time series data
# plot_acf_pacf(data, acf_lags=30, pacf_lags=20, figsize=(12, 6))


def evaluation_metric(ytest, yhat, return_dict=False):
    """
    Calculate and print evaluation metrics for regression models.

    Parameters:
    - ytest: True target values (ground truth).
    - yhat: Predicted target values.
    - return_dict: If True, return metrics as a dictionary; otherwise, print them.

    Returns:
    - Dictionary containing evaluation metrics (if return_dict=True).
    """
    MSE = metrics.mean_squared_error(ytest, yhat)
    RMSE = MSE**0.5
    MAE = metrics.mean_absolute_error(ytest, yhat)
    R2 = metrics.r2_score(ytest, yhat)

    metrics_dict = {
        'MSE': MSE,
        'RMSE': RMSE,
        'MAE': MAE,
        'R2': R2
    }

    if return_dict:
        return metrics_dict
    else:
        print('MSE: %.5f' % MSE)
        print('RMSE: %.5f' % RMSE)
        print('MAE: %.5f' % MAE)
        print('R2: %.5f' % R2)

# Example usage:
# metrics_result = evaluation_metric(y_test, y_pred, return_dict=True)

# 0. Import necessary libraries 

In [None]:
# import utils
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn import metrics
import statsmodels.api as sm
import time 
import numpy as np

# 1. Obtain Data

## 1.1. Import Data

In [None]:
# Retrieve stock data 
bank_china_stock_data = get_stock_data("601988.SS", "2007-01-04", "2022-03-18")

## 1.2. Explore Data

In [None]:
print(f"Data Shape: {bank_china_stock_data.shape}")
print(bank_china_stock_data.info()) 
bank_china_stock_data.head()

In [None]:
# Plot the time series closing price data 
plt.figure(figsize=(12, 6))

plt.plot(bank_china_stock_data['Close'], label='Bank of China Close Price', color='blue')

plt.title('Bank of China Closing Price')
plt.xlabel('Date')
plt.ylabel('Closing Price')
plt.grid(True)

plt.legend()
plt.show()

In [None]:
# Perform Augmented Dickey-Fuller (ADF) test for stationarity
adf_test_df = bank_china_stock_data[['Close']].copy()
perform_adf_test(adf_test_df['Close'])

In [None]:
# Perform first-order differencing (d=1)
adf_test_df['Close_diff'] = adf_test_df['Close'].diff()

# Drop any missing values created by differencing
adf_test_df.dropna(inplace=True)

# Perform the ADF test on the differenced data
perform_adf_test(adf_test_df['Close_diff'])

In [None]:
# Plot Autocorrelation and Partial Autocorrelation functions to identify p and q values.
plot_acf_pacf(bank_china_stock_data['Close'], acf_lags=80, pacf_lags=40, figsize=(10, 6))

In [None]:
plot_acf_pacf(adf_test_df['Close_diff'], acf_lags=30, pacf_lags=40, figsize=(10, 6))

## 1.3. Split Data

In [None]:
# Define the percentage of data to be used for training
train_percentage = 0.8

# Calculate the index where the data will be split
split_index = int(len(bank_china_stock_data) * train_percentage)

# Split the data into training and testing sets
train_data = bank_china_stock_data.iloc[:split_index]
test_data = bank_china_stock_data.iloc[split_index:]

# Print the number of data points in each set
print(f"Number of training samples: {len(train_data)}")
print(f"Number of testing samples: {len(test_data)}")

# 2. Build Model

## 2.1. Baseline

In [None]:
train_mean = train_data['Close'].mean()
pred_baseline = [train_mean] * len(train_data)
mae_baseline = metrics.mean_absolute_error(train_data['Close'], pred_baseline)

print("Train Mean: {}".format(train_mean.round(2)))
print("Baseline MAE: {}".format(mae_baseline.round(2)))

In [None]:
# Plot closing price data with baseline (mean) predictions
plt.figure(figsize=(12, 6))
plt.plot(train_data.index, train_data['Close'], label='True Close Price', color='blue')
plt.plot(train_data.index, pred_baseline, label='Baseline Predictions', color='red', linestyle='--')

# Set labels and title
plt.title('True Close Price vs. Baseline Predictions')
plt.xlabel('Date')
plt.ylabel('Closing Price')
plt.grid(True)

# Add a legend
plt.legend()

# Show the plot
plt.show()

## 2.2. Train ARIMA Model

In [None]:
# Fit the ARIMA model to the training data
order = (2, 1, 0)
model = sm.tsa.ARIMA(train_data['Close'].values, order=order).fit()
model.summary()

In [None]:
# Define a range of p and q values to explore
p_values = [0, 1, 2, 3]
q_values = [0, 1, 2]

mae_grid = {}  # Initialize an empty dictionary

# Initialize best_p and best_q
best_p = None
best_q = None
best_mae = float('inf')

# Loop through different pairs of p and q
for p in p_values:
    mae_grid[p] = []  # Initialize a list for each p value
    for q in q_values:
        start_time = time.time()
        
        # Train an ARIMA model
        order = (p, 1, q)
        model = sm.tsa.ARIMA(train_data['Close'].values, order=order).fit()

        elapsed_time = round(time.time() - start_time, 2)
        
        # Make predictions on the training data
        y_pred = model.fittedvalues
        
        # Calculate MAE for this pair of p and q
        mae = metrics.mean_absolute_error(train_data['Close'].values, y_pred).round(4)
        mae_grid[p].append(mae)
        
        # Check if this model has the lowest MAE so far
        if mae < best_mae:
            best_mae = mae
            best_p = p
            best_q = q
        
        print("Order = {}, MAE={}, Elapsed Time={} seconds".format(order, mae, elapsed_time))

print("\nBest p:", best_p)
print("Best q:", best_q)
print("Best MAE:", best_mae)

## 2.3. Prediction

In [None]:
# Perform walk-forward validation on the test data

# Initialize list to store the ARIMA predictions and the history of training data
pred_arima_wfv = list()
history = list(train_data['Close'].values.copy())

for i in range(len(test_data)):
    # Train the ARIMA model with the current history 
    model_fit = sm.tsa.ARIMA(history, order=(1, 1, 1)).fit()

    # Forecast next day value (yhat)
    yhat = model_fit.forecast()
    yhat = float(yhat[0])
    pred_arima_wfv.append(yhat)

    # Retrieve the true value for the current iteration
    # Append the predicted value to history
    true_value = test_data['Close'].iloc[i]
    history.append(true_value)

    # Print the results for each iteration
    print("{}: {} True value: {:.2f}, Predicted value: {:.4f}".format(i+1, test_data.index[i].date(), true_value, yhat))

In [None]:
# Evaluation
evaluation_metric(test_data['Close'].values,pred_arima_wfv)

# 3. Interpret

In [None]:
# Plot closing price data with ARIMA model predictions
plt.figure(figsize=(12, 6))
plt.plot(test_data.index, test_data['Close'], label='True Close Price', color='blue')
plt.plot(test_data.index, pred_arima_wfv, label='ARIMA Predictions', color='red', linestyle='--')

# Set labels and title
plt.title('True Close Price vs. ARIMA Model Predictions')
plt.xlabel('Date')
plt.ylabel('Closing Price')
plt.grid(True)

# Add a legend
plt.legend()

# Show the plot
plt.show()

In [None]:
arima_prediction_df = pd.DataFrame(
    {
        'Date': test_data.index,
        'True Value': test_data['Close'].values,
        'Predicted Value': pred_arima_wfv
    }
).set_index('Date')