In [1]:
  # Needed to save Plotly images as PNG
#!pip install -U kaleido

In [2]:
#!pip install -U plotly


In [3]:
# -*- coding: utf-8 -*-

!pip install arch yfinance keras --quiet

import numpy as np
import pandas as pd
import yfinance as yf
from arch import arch_model
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
#from keras.layers import LSTM, Dense
from itertools import combinations
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from statsmodels.graphics.tsaplots import plot_acf

# ----------------------------- Data Fetching -----------------------------

class StockData:
    def __init__(self, tickers, start_date, end_date):
        self.tickers = tickers
        self.start_date = start_date
        self.end_date = end_date
        self.returns_scaled = None

    def fetch_data(self):
        data = yf.download(self.tickers, start=self.start_date, end=self.end_date)['Close']
        returns = np.log(data / data.shift(1)).dropna()
        self.returns_scaled = returns * 100

# ----------------------------- Data Splitting -----------------------------

class DataSplitter:
    def __init__(self, data, train_size=0.7, val_size=0.15):
        self.data = data
        self.train_size = train_size
        self.val_size = val_size

    def split(self):
        n = len(self.data)
        train_end = int(n * self.train_size)
        val_end = int(n * (self.train_size + self.val_size))
        return self.data.iloc[:train_end], self.data.iloc[train_end:val_end], self.data.iloc[val_end:]

# ----------------------------- GARCH Model -----------------------------

class GARCHModel:
    def __init__(self, returns_train):
        self.returns_train = returns_train
        self.models = {}
        self.results = {}
        self.sigmas_train = pd.DataFrame()
        self.standardized_residuals_train = pd.DataFrame()

    def fit_models(self):
        for ticker in self.returns_train.columns:
            model = arch_model(self.returns_train[ticker], vol='Garch', p=1, q=1)
            result = model.fit(disp="off")
            self.models[ticker] = model
            self.results[ticker] = result

            sigma = result.conditional_volatility
            self.sigmas_train[ticker] = sigma
            self.standardized_residuals_train[ticker] = self.returns_train[ticker] / sigma

        self.sigmas_train.index = self.returns_train.index
        self.standardized_residuals_train.index = self.returns_train.index

    def forecast(self, horizon=5):
        forecasts = {}
        for ticker in self.returns_train.columns:
            result = self.results[ticker]
            forecast_result = result.forecast(horizon=horizon)
            fcast = forecast_result.variance.iloc[-1] ** 0.5
            forecasts[ticker] = fcast
        return forecasts
    def recursive_forecast(self, horizon=5):
        # Recursive forecast for each asset's conditional variance
        forecasts = {}
        for ticker in self.returns_train.columns:
            result = self.results[ticker]
            alpha = result.params['alpha[1]']
            beta = result.params['beta[1]']
            omega = result.params['omega']
            last_sigma2 = result.conditional_volatility.iloc[-1] ** 2
            last_return2 = self.returns_train[ticker].iloc[-1] ** 2

            forecast_path = []
            sigma2 = last_sigma2
            for k in range(horizon):
                sigma2 = omega + alpha * last_return2 + beta * sigma2
                forecast_path.append(np.sqrt(sigma2))
                last_return2 = sigma2  # recursively replace with own forecast
            forecasts[ticker] = forecast_path
        return pd.DataFrame(forecasts)


# ----------------------------- DCC-GARCH -----------------------------

class DCCGARCH:
    def __init__(self, standardized_residuals, tickers):
        self.z = standardized_residuals.dropna().to_numpy()
        self.tickers = tickers
        self.T = self.z.shape[0]
        self.n = self.z.shape[1]
        self.R_matrices = []

    def dcc_recursion(self, a=0.01, b=0.97):
        Q_bar = np.cov(self.z.T)
        Q_t = Q_bar.copy()
        for t in range(self.T):
            z_t1 = self.z[t - 1].reshape(-1, 1) if t > 0 else np.zeros((self.n, 1))
            Q_t = (1 - a - b) * Q_bar + a * z_t1 @ z_t1.T + b * Q_t
            D_inv = np.diag(1 / np.sqrt(np.diag(Q_t)))
            R_t = D_inv @ Q_t @ D_inv
            self.R_matrices.append(R_t)

    def extract_pairwise_correlations(self):
        correlations = {}
        pairs = list(combinations(range(self.n), 2))
        for i, j in pairs:
            label = f"{self.tickers[i]}-{self.tickers[j]}"
            correlations[label] = [R[i, j] for R in self.R_matrices]
        self.dcc_correlations = correlations
    def forecast_dcc(self, k=5, a=0.01, b=0.97):
        Q_bar = np.cov(self.z.T)
        Q_t = Q_bar.copy()
        last_z = self.z[-1].reshape(-1, 1)

        Qt_list = []
        for step in range(k):
            if step == 0:
                Q_t = (1 - a - b) * Q_bar + a * last_z @ last_z.T + b * Q_t
            else:
                Q_t = (1 - a - b) * Q_bar + (a + b) * Q_t
            Qt_list.append(Q_t)

        Pt_list = []
        for Qt in Qt_list:
            D_inv = np.diag(1 / np.sqrt(np.diag(Qt)))
            Pt = D_inv @ Qt @ D_inv
            Pt_list.append(Pt)
        return Pt_list


# ----------------------------- Plotting -----------------------------

class PlotManager:
    def __init__(self, returns_scaled, sigmas, dcc_correlations, tickers):
        self.returns_scaled = returns_scaled
        self.sigmas = sigmas
        self.dcc_correlations = dcc_correlations
        self.tickers = tickers

    def plot_all(self):
        self.plot_returns()
        self.plot_volatilities()
        self.plot_dcc_correlations()
        self.plot_combined()
    def plot_returns(self):
      fig = go.Figure()
      for ticker in self.tickers:
          fig.add_trace(go.Scatter(x=self.returns_scaled.index, y=self.returns_scaled[ticker], mode='lines', name=f"{ticker} Returns"))
      fig.update_layout(title='Log Returns', xaxis_title='Date', yaxis_title='Returns (%)')

      fig.write_html("dcc_correlations.html")
      fig.show()

    def plot_acf_returns(self, lags=40):
        for ticker in self.tickers:
            plt.figure(figsize=(8, 4))
            plot_acf(self.returns_scaled[ticker].dropna(), lags=lags)
            plt.title(f"ACF of Returns: {ticker}")
            plt.tight_layout()
            plt.show()

    def plot_volatilities(self):
        fig = go.Figure()
        for ticker in self.tickers:
            fig.add_trace(go.Scatter(x=self.sigmas.index, y=self.sigmas[ticker], mode='lines', name=f"{ticker} GARCH Volatility", line=dict(dash='dash')))
        fig.update_layout(title='GARCH(1,1) Volatility', xaxis_title='Date', yaxis_title='Volatility (%)')
        fig.write_html("garch_volatility.html")
        fig.show()

    def plot_dcc_correlations(self):
        fig = go.Figure()
        for label, corr in self.dcc_correlations.items():
            fig.add_trace(go.Scatter(x=self.returns_scaled.index[-len(corr):], y=corr, mode='lines', name=f'DCC Correlation: {label}'))
        fig.update_layout(title='DCC Correlations', xaxis_title='Date', yaxis_title='Correlation')
        fig.write_html("dcc_correlation.html")
        fig.show()

    def plot_combined(self):
        fig = make_subplots(specs=[[{"secondary_y": True}]])
        for ticker in self.tickers:
            fig.add_trace(go.Scatter(x=self.returns_scaled.index, y=self.returns_scaled[ticker], mode='lines', name=f"{ticker} Returns"), secondary_y=False)
            fig.add_trace(go.Scatter(x=self.sigmas.index, y=self.sigmas[ticker], mode='lines', name=f"{ticker} Volatility", line=dict(dash='dash')), secondary_y=False)
        for label, corr in self.dcc_correlations.items():
            fig.add_trace(go.Scatter(x=self.returns_scaled.index[-len(corr):], y=corr, mode='lines', name=f"DCC Correlation: {label}"), secondary_y=True)
        fig.update_layout(title='Combined View: Returns, Volatility, and DCC Correlation', xaxis_title='Date', yaxis_title='Returns / Volatility', yaxis2_title='DCC Correlation', height=700, width=1000)
        fig.write_html("combine.html")
        fig.show()

# ----------------------------- Pipeline -----------------------------
    def plot_acf_residuals(self, standardized_residuals, lags=40):
            for ticker in self.tickers:
                plt.figure(figsize=(8, 4))
                plot_acf(standardized_residuals[ticker].dropna(), lags=lags)
                plt.title(f"ACF of Standardized Residuals: {ticker}")
                plt.tight_layout()
                plt.show()


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/978.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m972.8/978.3 kB[0m [31m31.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m978.3/978.3 kB[0m [31m22.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
def compute_covariance_matrices(vol_forecast, corr_forecast):
    # vol_forecast: DataFrame of shape (horizon, n)
    # corr_forecast: list of np.array matrices (n x n)
    cov_matrices = []
    for t in range(len(corr_forecast)):
        std_diag = np.diag(vol_forecast.iloc[t])
        cov = std_diag @ corr_forecast[t] @ std_diag
        cov_matrices.append(cov)
    return cov_matrices


In [5]:
def run_analysis():
    tickers = ['BMW.DE', 'SIE.DE', 'ALV.DE']
    start_date = "2014-01-01"
    end_date = "2024-06-01"

    stock_data = StockData(tickers, start_date, end_date)
    stock_data.fetch_data()

    garch_model = GARCHModel(stock_data.returns_scaled)
    garch_model.fit_models()

    dcc_model = DCCGARCH(garch_model.standardized_residuals_train, tickers)
    dcc_model.dcc_recursion()
    dcc_model.extract_pairwise_correlations()

    plotter = PlotManager(
        returns_scaled=stock_data.returns_scaled,
        sigmas=garch_model.sigmas_train,
        dcc_correlations=dcc_model.dcc_correlations,
        tickers=tickers
    )
    plotter.plot_all()
        # GARCH Recursive Forecast
    plotter.plot_acf_returns(lags=40)
    plotter.plot_acf_residuals(garch_model.standardized_residuals_train, lags=40)

    horizon = 5
    vol_forecast_df = garch_model.recursive_forecast(horizon=horizon)

    # DCC Forecast
    corr_forecast_list = dcc_model.forecast_dcc(k=horizon)

    # Compute Σ_{t+k}
    cov_matrices = compute_covariance_matrices(vol_forecast_df, corr_forecast_list)

    print("\nForecasted Covariance Matrices (Σ_{t+k}):")
    for i, Sigma in enumerate(cov_matrices, 1):
        print(f"\nStep t+{i}:\n{Sigma}")

if __name__ == "__main__":
    run_analysis()

Output hidden; open in https://colab.research.google.com to view.