In [23]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import scipy.optimize as spop
from sklearn.covariance import LedoitWolf
import pandas as pd
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter
import warnings
warnings.filterwarnings('ignore')

In [11]:
data = yf.download(tickers=["AAPL",'F','GE','XLY','XLE'],period='1d',start="2021-12-31", end='2022-12-31')['Adj Close']

[*********************100%%**********************]  5 of 5 completed


In [12]:
data = data.pct_change().dropna()

In [59]:
def historical_covariance(returns: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate the historical covariance matrix of returns.
    
    :param returns: DataFrame containing asset returns.
    :return: Covariance matrix as a DataFrame.
    """
    return returns.cov()

def ewma_covariance(returns: pd.DataFrame, lambda_: float) -> pd.DataFrame:
    """
    Calculate the EWMA covariance matrix of returns.
    
    :param returns: DataFrame containing asset returns.
    :param lambda_: Decay factor for EWMA.
    :return: EWMA covariance matrix as a DataFrame.
    """
    return returns.ewm(alpha=1-lambda_).cov().iloc[-len(returns.columns):].droplevel(0)

def factor_model_covariance(returns: pd.DataFrame, factors: pd.DataFrame) -> pd.DataFrame:
    """
    Estimate the covariance matrix of returns using macroeconomic factors.

    :param returns: DataFrame containing asset returns.
    :param factors: DataFrame containing factor data (returns of factors).
    :return: Estimated covariance matrix of asset returns based on factor models.
    """
    # Add a constant to the factor data for intercept (modeling asset return = B0 + B1*F1 + ... + Bn*Fn)
    factors = sm.add_constant(factors)
    
    # Prepare matrices for betas and residuals
    betas = pd.DataFrame(index=returns.columns, columns=factors.columns)
    residuals = pd.DataFrame(index=returns.index, columns=returns.columns)

    # Regression to find betas for each asset
    for asset in returns.columns:
        model = sm.OLS(returns[asset], factors).fit()
        betas.loc[asset] = model.params
        residuals[asset] = model.resid

    # Calculate covariance of residuals and factor returns
    cov_matrix_resid = residuals.cov()
    factor_covariance_matrix = factors.iloc[:, 1:].cov()  # exclude intercept term

    # Compute covariance matrix of asset returns using factor model
    asset_cov_matrix = betas.iloc[:, 1:].dot(factor_covariance_matrix).dot(betas.iloc[:, 1:].T) + cov_matrix_resid

    return asset_cov_matrix

def ledoit_wolf_covariance_and_shrinkage(returns: pd.DataFrame):
    """
    Calculate the covariance matrix using the Ledoit-Wolf shrinkage estimator
    and return the shrinkage intensity.
    
    :param returns: DataFrame containing asset returns.
    :return: A tuple containing the shrinkage covariance matrix as a DataFrame
             and the shrinkage intensity.
    """
    lw = LedoitWolf()
    lw.fit(returns)
    shrinkage_cov_matrix = lw.covariance_
    shrinkage_intensity = lw.shrinkage_

    # Convert the numpy array back to a pandas DataFrame for better compatibility and readability
    cov_matrix_df = pd.DataFrame(shrinkage_cov_matrix, index=returns.columns, columns=returns.columns)

    return cov_matrix_df, shrinkage_intensity
    
def dcc_garch(returns):
    pandas2ri.activate()
    
    rugarch = importr('rugarch')
    rmgarch = importr('rmgarch')

    with localconverter(ro.default_converter + pandas2ri.converter):
        r_returns = ro.conversion.py2rpy(returns)
    
    ro.r('''
    library(rugarch)
    library(rmgarch)

    fit_dcc_model <- function(data) {
        spec <- ugarchspec(mean.model = list(armaOrder = c(0,0), include.mean = TRUE),
                           variance.model = list(model = "sGARCH", garchOrder = c(1, 1)),
                           distribution.model = "norm")
        multispec <- multispec(replicate(ncol(data), spec))
        dcc_spec <- dccspec(uspec = multispec, dccOrder = c(1, 1), distribution = "mvnorm")
        dcc_fit <- dccfit(dcc_spec, data = data)
        
        # Extract conditional variances and correlations
        conditional_variances <- sigma(dcc_fit)^2
        conditional_correlations <- rcor(dcc_fit)
        
        # Initialize an array to store the conditional covariances
        n_assets <- ncol(data)
        n_time <- dim(conditional_correlations)[3]
        conditional_covariances <- array(0, dim = c(n_assets, n_assets, n_time))
        
        # Calculate the conditional covariances
        for (t in 1:n_time) {
            D_t <- diag(sqrt(conditional_variances[t, ]))
            corr_t <- conditional_correlations[, , t]
            # Debugging statements to check dimensions
            print(paste("Time:", t))
            print(paste("Dimensions of D_t:", nrow(D_t), ncol(D_t)))
            print(paste("Dimensions of corr_t:", nrow(corr_t), ncol(corr_t)))
            
            if (nrow(D_t) == nrow(corr_t) && ncol(D_t) == ncol(corr_t)) {
                conditional_covariances[, , t] <- D_t %*% corr_t %*% D_t
            } else {
                stop(paste("Dimension mismatch between D_t and conditional_correlations at time", t))
            }
        }
        
        return(conditional_covariances)
    }
    ''')
    
    fit_dcc_model = ro.globalenv['fit_dcc_model']
    result = fit_dcc_model(r_returns)
    
    with localconverter(ro.default_converter + pandas2ri.converter):
        result_as_array = ro.conversion.rpy2py(result)
    
    return result_as_array

In [55]:
historical_covariance(data)

Ticker,AAPL,F,GE,XLE,XLY
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
AAPL,0.000506,0.000399,0.000274,0.000159,0.000394
F,0.000399,0.00096,0.000422,0.00027,0.000469
GE,0.000274,0.000422,0.000484,0.000202,0.000289
XLE,0.000159,0.00027,0.000202,0.000494,0.000156
XLY,0.000394,0.000469,0.000289,0.000156,0.000466


In [56]:
ewma_covariance(data,.94)

Ticker,AAPL,F,GE,XLE,XLY
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
AAPL,0.000498,0.000438,0.000239,0.000216,0.000302
F,0.000438,0.000761,0.000287,0.000282,0.000375
GE,0.000239,0.000287,0.00033,0.000214,0.000151
XLE,0.000216,0.000282,0.000214,0.000334,0.000166
XLY,0.000302,0.000375,0.000151,0.000166,0.000279


In [57]:
ledoit_wolf_covariance_and_shrinkage(data)[0]

Ticker,AAPL,F,GE,XLE,XLY
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
AAPL,0.000506,0.000387,0.000266,0.000154,0.000382
F,0.000387,0.000946,0.000409,0.000261,0.000455
GE,0.000266,0.000409,0.000485,0.000195,0.00028
XLE,0.000154,0.000261,0.000195,0.000494,0.000151
XLY,0.000382,0.000455,0.00028,0.000151,0.000468


In [None]:
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter
import numpy as np

def run_dcc_garch(returns):
    pandas2ri.activate()
    
    rugarch = importr('rugarch')
    rmgarch = importr('rmgarch')

    with localconverter(ro.default_converter + pandas2ri.converter):
        r_returns = ro.conversion.py2rpy(returns)
    
    ro.r('''
    library(rugarch)
    library(rmgarch)

    fit_dcc_model <- function(data) {
        spec <- ugarchspec(mean.model = list(armaOrder = c(0,0), include.mean = TRUE),
                           variance.model = list(model = "sGARCH", garchOrder = c(1, 1)),
                           distribution.model = "norm")
        multispec <- multispec(replicate(ncol(data), spec))
        dcc_spec <- dccspec(uspec = multispec, dccOrder = c(1, 1), distribution = "mvnorm")
        dcc_fit <- dccfit(dcc_spec, data = data)
        
        # Extract conditional variances and correlations
        conditional_variances <- t(sigma(dcc_fit)^2)
        conditional_correlations <- rcor(dcc_fit)
        
        # Initialize an array to store the conditional covariances
        n_assets <- ncol(data)
        n_time <- dim(conditional_correlations)[3]
        conditional_covariances <- array(0, dim = c(n_assets, n_assets, n_time))
        
        # Calculate the conditional covariances
        for (t in 1:n_time) {
            D_t <- diag(sqrt(conditional_variances[t, ]))
            corr_t <- conditional_correlations[, , t]
            # Debugging statements to check dimensions
            print(paste("Time:", t))
            print(paste("Dimensions of D_t:", nrow(D_t), ncol(D_t)))
            print(paste("Dimensions of corr_t:", nrow(corr_t), ncol(corr_t)))
            
            if (nrow(D_t) == nrow(corr_t) && ncol(D_t) == ncol(corr_t)) {
                conditional_covariances[, , t] <- D_t %*% corr_t %*% D_t
            } else {
                stop(paste("Dimension mismatch between D_t and conditional_correlations at time", t))
            }
        }
        
        return(conditional_covariances)
    }
    ''')
    
    fit_dcc_model = ro.globalenv['fit_dcc_model']
    result = fit_dcc_model(r_returns)
    
    with localconverter(ro.default_converter + pandas2ri.converter):
        result_as_array = ro.conversion.rpy2py(result)
    
    return result_as_array

# Usage: Assuming `returns` is a pandas DataFrame with your returns data.


In [60]:
dcc_garch(data)

[1] "Time: 1"
[1] "Dimensions of D_t: 1 5"
[1] "Dimensions of corr_t: 5 5"


R[write to console]: Error in (function (data)  : 
  Dimension mismatch between D_t and conditional_correlations at time 1



RRuntimeError: Error in (function (data)  : 
  Dimension mismatch between D_t and conditional_correlations at time 1
