In [1]:
import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_absolute_error

# ----- 1. Read the CSV with MultiIndex Columns -----
# The CSV file has two header rows:
#  - First row: Data type (e.g., Close, High, etc.)
#  - Second row: Ticker (e.g., AAPL, AMD, AMZN, MSFT, NVDA)
df = pd.read_csv('yahoo_finance_historical_data_10year.csv', header=[0, 1], index_col=0)
df.index = pd.to_datetime(df.index)
print("Full Data Preview:")
print(df.head())

# ----- 2. Extract Close Prices for the 5 Stocks -----
# Extract the 'Close' prices and reorder columns as needed.
close_prices = df['Close'][['AAPL', 'MSFT', 'AMZN', 'AMD', 'NVDA']]
print("\nClose Prices Preview:")
print(close_prices.head())

# ----- 3. Compute Daily Returns -----
# Convert prices to daily returns using percentage change.
returns = close_prices.pct_change().dropna()
print("\nDaily Returns Preview:")
print(returns.head())

# ----- 4. (Optional) Check for Stationarity -----
def adf_test(series, series_name):
    result = adfuller(series)
    print(f"\nADF Test for {series_name}:")
    print(f"  ADF Statistic: {result[0]:.4f}")
    print(f"  p-value: {result[1]:.4f}")
    if result[1] <= 0.05:
        print(f"  {series_name} appears to be stationary.")
    else:
        print(f"  {series_name} appears to be non-stationary. Consider further differencing or transformation.")

for col in returns.columns:
    adf_test(returns[col], col)

# ----- 5. Train/Test Split for Forecast Evaluation -----
# Split the returns into 80% training and 20% testing samples.
train_size = int(len(returns) * 0.8)
train = returns.iloc[:train_size]
test = returns.iloc[train_size:]
print(f"\nTraining set: {train.shape}, Test set: {test.shape}")

# ----- 6. Fit the VAR Model on Training Data -----
model = VAR(train)
lag_order_results = model.select_order(maxlags=15)
print("\nLag Order Selection Results:")
print(lag_order_results.summary())

# Use the lag order suggested by the AIC criterion.
optimal_lag = lag_order_results.aic
model_fitted = model.fit(optimal_lag)
print("\nVAR Model Summary:")
print(model_fitted.summary())

# ----- 7. Rolling Forecast on Test Data -----
# We forecast one step at a time in a rolling fashion.
predictions = []
actuals = []

# For each time step in the test set:
for i in range(len(test)):
    # Use the last 'optimal_lag' observations from the combined train and previous test predictions.
    if i == 0:
        input_data = train.values[-optimal_lag:]
    else:
        # If the test set is long enough, use the last 'optimal_lag' rows from test.
        if i >= optimal_lag:
            input_data = test.values[i - optimal_lag:i]
        else:
            # Otherwise, combine the tail of the train set with the available test data.
            input_data = np.vstack([train.values[-(optimal_lag - i):], test.values[:i]])

    forecast = model_fitted.forecast(y=input_data, steps=1)
    predictions.append(forecast[0])
    actuals.append(test.values[i])

# Convert predictions and actuals into DataFrames
pred_df = pd.DataFrame(predictions, index=test.index, columns=returns.columns)
actual_df = pd.DataFrame(actuals, index=test.index, columns=returns.columns)

print("\nForecasted Returns (first 5 rows):")
print(pred_df.head())
print("\nActual Returns (first 5 rows):")
print(actual_df.head())

# ----- 8. Calculate Mean Absolute Error (MAE) -----
# Compute MAE for each stock and overall.
mae_per_stock = {col: mean_absolute_error(actual_df[col], pred_df[col]) for col in returns.columns}
overall_mae = mean_absolute_error(actual_df.values, pred_df.values)

print("\nMean Absolute Error (MAE) per stock:")
for stock, error in mae_per_stock.items():
    print(f"  {stock}: {error:.6f}")
print(f"\nOverall Mean Absolute Error (MAE): {overall_mae:.6f}")

# ----- 9. Calculate Directional Accuracy -----
# Directional accuracy checks if the forecasted return direction (up/down) matches the actual.
direction_matches = (np.sign(actual_df.values) == np.sign(pred_df.values))
# Calculate accuracy per stock (column)
directional_accuracy = direction_matches.mean(axis=0)
overall_directional_accuracy = directional_accuracy.mean()

print("\nDirectional Accuracy per stock:")
for stock, acc in zip(returns.columns, directional_accuracy):
    print(f"  {stock}: {acc*100:.2f}%")
print(f"\nOverall Directional Accuracy: {overall_directional_accuracy*100:.2f}%")


Full Data Preview:
Price           Close                                             High        \
Ticker           AAPL   AMD       AMZN       MSFT      NVDA       AAPL   AMD   
Date                                                                           
2014-01-02  17.215368  3.95  19.898500  30.996416  0.373932  17.336750  3.98   
2014-01-03  16.837219  4.00  19.822001  30.787884  0.369452  17.233110  4.00   
2014-01-06  16.929035  4.13  19.681499  30.137262  0.374403  17.018359  4.18   
2014-01-07  16.807959  4.18  19.901501  30.370813  0.380533  16.992209  4.25   
2014-01-08  16.914406  4.18  20.096001  29.828621  0.385720  16.979764  4.26   

Price                                     ...       Open                   \
Ticker         AMZN       MSFT      NVDA  ...       AAPL   AMD       AMZN   
Date                                      ...                               
2014-01-02  19.9680  31.196610  0.376761  ...  17.294734  3.85  19.940001   
2014-01-03  20.1355  31.046466  

  self._init_dates(dates, freq)
