![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

# Copula estimation using Kernel Desnity Estimation




**Objective:** 
Here, we will investigate whether using Kernel Density Estimation seems effective for modelling a copula using empirical data, rather than using a parametric model. 

In [1]:
from statsmodels.distributions.empirical_distribution import ECDF
from copulas.bivariate.base import Bivariate
from copulas.multivariate import GaussianMultivariate

class Parameter():
    def __init__(self,
                resolution,
                start_year, 
                start_month=1, 
                start_day=1,
                n_years=0,
                n_months=0, 
                n_days=0): 
        
        self.Resolution = resolution
        
        end_year = start_year + n_years
        end_month = start_month + n_months
        end_day = start_day + n_days 
        self.Start = datetime(start_year,start_month,start_day,9,30,0)
        self.End = datetime(end_year, end_month, end_day,16,30,0)
    
class Data():
    def __init__(self, df):
        self._df = df.dropna()
    
    @property 
    def prices(self):
        return self._df
    
    @property
    def returns(self):
        return self._df.diff() 
    
    @property
    def columns(self):
        return self._df.columns
    
    def plot(self, *args, **kwargs): 
        return self._df.plot(*args, **kwargs)
    
class EmpiricalCopula():
    def __init__(self, X):
        '''
            args:
                X: numpy array of shape (n_pairs, 2)
        '''
        self.data = X
        self._len = X.shape[0]
    
    def __len__(self):
        return self._len
    
    @staticmethod 
    def _get_bin_width(self, data):
        return freedman_diaconis(data)
    
    def cdf(self, u, v):
        '''returns C(u,v) = P(u <= U, v <= V)'''
        cnt = np.sum((self.data[:,0] <= u) & (self.data[:,1] <= v))
        return float(cnt / self._len)
    
    def pdf(self, u, v):
        pass 
    
    def cond_pdf(u, given_p):
        '''P(U<=u | V=v) = dC(u,v)/dv = P(U<=u, V=v)/p(V=v)'''
        pass 
        
def emp_copula_test():
    u = np.expand_dims(np.arange(0,100,1)+1, axis=1)
    v = np.expand_dims(np.arange(0,100,1)+1, axis=1)
    x = np.concatenate((u,v),axis=1)
    print("u:",u.shape)
    print("v:",v.shape)
    print("x:",x.shape)
        
    test_copula = EmpiricalCopula(x)
    assert len(test_copula) == len(u), f"{len(test_copula)} != {len(u)}"
    
    pvalue = test_copula.cdf(50,50)
    assert pvalue == 0.5, f"p = {pvalue} not 0.5"
    pvalue = test_copula.cdf(25,25)
    assert pvalue == 0.25, f"p = {pvalue} not 0.25"
    pvalue = test_copula.cdf(100,0)
    assert pvalue == 0.0, f"p={pvalue} not 0"
    pvalue = test_copula.cdf(0,100)
    assert pvalue == 0.0, f"p={pvalue} not 0"
    pvalue = test_copula.cdf(20,100)
    assert pvalue == 0.2, f"p={pvalue} not 0.2"
    pvalue = test_copula.cdf(100,20)
    assert pvalue == 0.2, f"p={pvalue} not 0.2"
    pvalue = test_copula.cdf(100,100)
    assert pvalue == 1.0, f"p={pvalue} not 1.0"
    
emp_copula_test()

In [2]:
import random

def sample_from_bivariate(x_domain, y_domain, weights, n_samples):
    x_domain = x_domain.ravel()
    y_domain = y_domain.ravel()
    weights = np.nan_to_num(weights.ravel()) # account for nans
    
    population = np.array([x_domain, y_domain]).T
    print(population)
    return np.array(random.choices(population, weights, k=n_samples))


In [3]:
# Utils
def np_remove_nan(x): 
    return x[np.logical_not(np.isnan(x).any(axis=1))]

def np_remove_inf(x):
    return x[np.logical_not(np.isinf(x).any(axis=1))]

def np_remove_inf_1D(x):
    return x[~np.isinf(x)]

def np_remove_nan_1D(x):
    return x[~np.isinf(x)]

# test remove nan 
data = np.array([[np.inf,np.inf],
                 [1,2],
                 [1,3],
                 [np.nan, 4],
                 [5,6]])

data_no_nan = np_remove_nan(data)
data_no_inf = np_remove_inf(data)
data_no_non_num = np_remove_nan(np_remove_inf(data))

check_for_nan = lambda x: True not in np.isnan(x)
check_for_inf = lambda x: True not in np.isinf(x)

assert check_for_nan(data_no_nan)
assert check_for_inf(data_no_inf)
assert check_for_nan(data_no_non_num) and check_for_inf(data_no_non_num)
assert data_no_non_num.shape == (3,2)

# print("-"*15)
# print("input")
# print(data)
# print("-"*15)
# print(data_no_nan)
# print("-"*15)
# print(data_no_inf)
# print("-"*15)
# print(data_no_non_num)

In [4]:
## model comparison criteria

def AIC(log_lik, n_params):
    '''
        AIC = 2k - 2*ln(L)
        Args: 
            log_lik: total log likelihood over all sampoles
            n_samples: number of samples
            n_params: number of parameters
    '''
    return 2 * n_params - 2 * log_lik

def BIC(log_lik, n_samples, n_params):
    '''
        BIC = k*ln(N) - 2*ln(L)
        where 
            k = number of parameters
            N = number of samples 
            L = likelihood
        Args: 
            log_lik: total log likelihood over all sampoles
            n_samples: number of samples
            n_params: number of parameters
    '''
    return k * np.log(n_samples) - 2 * log_lik

# test AICimport, BIC
_n_samples = 1000
_dist1_params = {"loc":0, "scale":1}
_dist2_params = {"loc":10, "scale":1}

from scipy.stats import norm
samples_dist1 = norm.rvs(**_dist1_params, size=_n_samples)
samples_dist2 = norm.rvs(**_dist2_params, size=_n_samples)

loglik_samp1_on_dist1 = np.sum(norm.logpdf(samples_dist1, **_dist1_params))
loglik_samp2_on_dist1 = np.sum(norm.logpdf(samples_dist2, **_dist1_params))
loglik_samp2_on_dist2 = np.sum(norm.logpdf(samples_dist2, **_dist2_params))
loglik_samp1_on_dist2 = np.sum(norm.logpdf(samples_dist1, **_dist2_params))

aic_samp1_on_dist1 = AIC(loglik_samp1_on_dist1, len(_dist1_params))
aic_samp2_on_dist1 = AIC(loglik_samp2_on_dist1, len(_dist1_params))
aic_samp2_on_dist2 = AIC(loglik_samp2_on_dist2, len(_dist2_params))
aic_samp1_on_dist2 = AIC(loglik_samp1_on_dist2, len(_dist2_params))

_msg = "AIC values have unexpected magnitudes"
assert aic_samp1_on_dist1 < aic_samp2_on_dist1, _msg
assert aic_samp2_on_dist2 < aic_samp2_on_dist1, _msg

__plot__ = False 
if __plot__: 
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))
    fig.suptitle('Normal samples')
    ax1.hist(samples_dist1, bins=100)
    ax2.hist(samples_dist2, bins=100)

In [5]:
# QuantBook Analysis Tool 
# For more information see [https://www.quantconnect.com/docs/research/overview]
qb = QuantBook()

# parameters
START_YEAR = 2005
START_MONTH = 1
START_DAY = 1
N_YEARS = 5
N_MONTHS = 0
N_DAYS = 0
RESOLUTION = Resolution.Daily
PARAM = Parameter(RESOLUTION, START_YEAR, START_MONTH, START_DAY,
                  N_YEARS, N_MONTHS, N_DAYS)

# Specify list of correlated tickers for S&P 500 
tickers = ["SPY","XLK", "VGT", "IYW", "IGV"]

# register tickers to quantbook
for ticker in tickers: 
    qb.AddEquity(ticker)
    
# get historical prices
history = qb.History(tickers, PARAM.Start, PARAM.End, PARAM.Resolution)

# Unpack dataframe
Open = history['open'].unstack(level=0)
Close = history['close'].unstack(level=0)
High = history['high'].unstack(level=0)
Low = history['low'].unstack(level=0)

# create data object
close = Data(Close)


In [6]:
# Empirically estimate marginal distributions
marginal_dist = {}
for asset in close.columns:
    marginal_dist[asset] = ECDF(close.returns[asset])

marginal_values = pd.DataFrame()
for asset in close.columns: 
    marginal_values[asset] = close.returns[asset].apply(marginal_dist[asset])

In [7]:
# Calculate Kendall Tau
corr = close.returns.corr(method='kendall')
corr.style.background_gradient(cmap='viridis')

In [8]:
def get_max_corr_pair(corr_matrix): 
    sol = (corr.where(np.triu(np.ones(corr.shape), k=1).astype(np.bool))
                 .stack()
                 .sort_values(ascending=False))
    
    col1 = close.columns[sol.index[sol == max(sol)].labels[0]].values[0]
    col2 = close.columns[sol.index[sol == max(sol)].labels[1]].values[0]
    return(col1, col2)

pair_override = None #('IYW RUTTRZ1RC7L1','XLK RGRPZX100F39')
pair = get_max_corr_pair(corr)
# pair = pair_override if not None else pair 
print("Pair with maximum correlation: {}".format(pair))

data = np.array(marginal_values[list(pair)].values)
close.prices[list(pair)].plot(figsize=(20,10))
close.returns[list(pair)].plot(figsize=(20,10))

### Copula Estimate using Transformation Method

Let $\Phi$ be the standard normal cdf and $\phi$ its density. Then the random vector $(X,Y) = (\Phi^{-1}(U), \Phi^{-1}(V))$ has normally distributed margins and is supported on the full $\mathbb{R}^2$. By Sklar's Theroem, its density $f$ can be written as: 

$$f(x,y) = c(\Phi(x), \Phi(y))\phi(x)\phi(y)$$

To estimate this density, we transform samples $(U_i,V_i)$ to $(X_i,Y_i) = (\Phi^{-1}(U_i), \Phi^{-1}(V_i))$ for $i = 1,...,n$. To this transformed sample, we now apply the standard kernel density estimator for $(x,y) \in \mathbb{R}^2$: 

$$\hat{f}_n(x,y) = \frac{1}{n}\sum_{i=1}^{n}K_{b_n}(x-X_i)K_{b_n}(y-Y_i)$$

Hence, the copula density can be expressed as : 

$$
    \hat{c}_n(u,v) = \frac{\sum_{i=1}^{n}K_{b_n}\big(\Phi^{-1}(u) - \Phi^{-1}(U_i)\big)K_{b_n}\big(\Phi^{-1}(v) - \Phi^{-1}(V_i)\big)}{n\phi(\Phi^{-1}(u))\phi(\Phi^{-1}(v))}
$$
for all $(u,v) \in [0,1]^2$

In [9]:
#-----------------------------------------------------------------------------------------
# Fit KDE estiamte on transformed data
#-----------------------------------------------------------------------------------------
from time import time
from scipy.stats import norm
import statsmodels.api as sm

# transform samples to R^2 domain
X_ = norm.ppf(data)
X_ = np_remove_inf(X_)

# fit a kernel density estimate 
_BANDWIDTH = 'cv_ls' #'normal_reference' # 'cv_ml', 'cv_ls'
kernel = sm.nonparametric.KDEMultivariate
t = time()
model = kernel(data=X_, var_type='cc', bw=_BANDWIDTH)
elapsed = time() - t 

print("Model fit in {} seconds".format(elapsed))
print("Model bandwidth {}".format(model.bw))

In [10]:
#-------------------------------------------------------------------------------------------------
# generate pdf in R^2 domain
#-------------------------------------------------------------------------------------------------
from time import time 

_width = 0.01
_axis_offset = 1
x_valuesR = np.arange(np.floor(np.min(X_)), np.ceil(np.max(X_)),_width)
y_valuesR = np.arange(np.floor(np.min(X_)-_axis_offset), np.ceil(np.max(X_)+_axis_offset),_width)

x_meshR, y_meshR = np.meshgrid(x_valuesR, y_valuesR)
data_predictR = pd.DataFrame({"U":x_meshR.ravel(), "V":y_meshR.ravel()})

t = time()
z_valuesR = model.pdf(data_predict=data_predictR)
elapsed = time() - t
print("{} seconds for {} data".format(elapsed, len(data_predictR)))

In [11]:
#--------------------------------------------------------------------------------------
# Plot the transformed distribution and KDE estimate in R^2 domain
#--------------------------------------------------------------------------------------
z_meshR = z_valuesR.reshape(x_meshR.shape)

# contour plot of resulting kde estimate of copula pdf in [0,1]^2
plt.figure(figsize=(12.5,10))
plt.contourf(x_meshR, y_meshR, z_meshR, 100, cmap='viridis')
plt.colorbar()
plt.title("Contour plot of copula pdf using Kernel Density Estimation in $\mathbb{R}^2$ domain")

# get samples from distribution in R^2 and plot 
n_samples=len(X_)
samples = sample_from_bivariate(x_meshR, y_meshR, z_meshR, n_samples)
plt.figure(figsize=(10,10))
plt.scatter(samples[:,0], samples[:,1], s=0.3)
plt.title("Scatter Plot of samples from Kernel Density Estimate")

# plot dataset in R^2 domain
plt.figure(figsize=(10,10))
plt.scatter(X_[:,0], X_[:,1], s=0.3)
plt.title("Scatter Plot of Dataset")

In [12]:
#----------------------------------------------------------------------------------------------
# Plot the resulting KDE by Transformation Method in [0,1]^2 
#----------------------------------------------------------------------------------------------
# define domain in [0,1]^2
x_valuesU = np.arange(0,1,0.0025)
y_valuesU = np.arange(0,1,0.0025)

# transform to R^2 domain and make meshgrid
x_hat_valuesR = norm.ppf(x_valuesU)
y_hat_valuesR = norm.ppf(y_valuesU)
x_hat_meshR, y_hat_meshR = np.meshgrid(x_hat_valuesR, y_hat_valuesR)

# make predictions in R^2 domain and make meshgrid
data_predictR = pd.DataFrame({"Xhat":x_hat_meshR.ravel(), "Yhat":y_hat_meshR.ravel()})
z_hat_meshR = model.pdf(data_predict=data_predictR)

# transform predictions in R^2 back to [0,1]^2 domain 
z_valuesU = np.array(z_hat_meshR / (data_predictR.Xhat.apply(norm.pdf)*data_predictR.Yhat.apply(norm.pdf)))

# make meshgrid in [0,1]^2 domain for plotting
x_meshU, y_meshU = np.meshgrid(x_valuesU, y_valuesU)
z_meshU = z_valuesU.reshape(x_hat_meshR.shape)

# collect sample data from KDE
samples = sample_from_bivariate(x_meshU, y_meshU, z_meshU, n_samples)

# 3d plot of resulting kde estimate of copula pdf in [0,1]^2
from mpl_toolkits import mplot3d

fig = plt.figure(figsize=(10,10))
ax = fig.gca(projection='3d')
surf = ax.plot_surface(x_meshU, y_meshU, z_meshU,
                       linewidth=0, antialiased=False, cmap='viridis')
ax.view_init(20, 270)

# contour plot of resulting kde estimate of copula pdf in [0,1]^2
plt.figure(figsize=(10,8))
plt.contourf(x_meshU, y_meshU, z_meshU, 200, cmap='viridis')
plt.xlim([0,1])
plt.ylim([0,1])
plt.colorbar()
plt.title("Contour plot of copula pdf using Kernel Density Estimation via Transformation Method")

fig, (ax1, ax2) = plt.subplots(1,2,figsize=(16,8))
ax1.scatter(samples[:,0], samples[:,1],s=0.3)
ax2.scatter(data[:,0], data[:,1],s=0.3)
ax1.set_title("KDE Sample scatter plot")
ax2.set_title("Empirical data scatter plot")

Computationally, it seems we want to avoid using the KDE model itself to compute or sample from as it is computationally expensive. 

Taking note of [kdecopula: An R Package for the Kernel Estimation
of Bivariate Copula Densities](https://arxiv.org/pdf/1603.04229.pdf), we will fit our KDE model to the transformed data and use spline interpolation to estimate the copula density. We will then use this spline estimate to carry out further operations and avoid using the KDE model. 


In [13]:
from time import time
from scipy.stats import norm
import statsmodels.api as sm

#-------------------------------------------------------------------------------------------
# Fit Kernel Density Estimate to the data
#-------------------------------------------------------------------------------------------
# transform samples to R^2 domain
data = np.array(marginal_values[list(pair)].values)
X_ = norm.ppf(data)
X_ = np_remove_inf(X_)

# fit a kernel density estimate 
_BANDWIDTH = 'cv_ls' #'normal_reference' # 'cv_ml', 'cv_ls'
kernel = sm.nonparametric.KDEMultivariate

# fit the model to the transformed data
t = time()
model = kernel(data=X_, var_type='cc', bw=_BANDWIDTH)
elapsed = time() - t 

print("Model fit in {} seconds".format(elapsed))
print("Model bandwidth {}".format(model.bw))

#-------------------------------------------------------------------------------------------
# Generate pdf grid from model
#-------------------------------------------------------------------------------------------

# define domain in [0,1]^2
x_valuesU = np.arange(0,1,0.0025)
y_valuesU = np.arange(0,1,0.0025)

# transform to R^2 domain and make meshgrid
x_hat_valuesR_raw, y_hat_valuesR_raw = norm.ppf(x_valuesU), norm.ppf(y_valuesU)
x_hat_valuesR, y_hat_valuesR = np_remove_inf_1D(x_hat_valuesR_raw), np_remove_inf_1D(x_hat_valuesR_raw)
x_hat_meshR, y_hat_meshR = np.meshgrid(x_hat_valuesR, y_hat_valuesR)

# make predictions in R^2 domain and make meshgrid
data_predictR = pd.DataFrame({"Xhat":x_hat_meshR.ravel(), "Yhat":y_hat_meshR.ravel()})
z_hat_meshR = model.pdf(data_predict=data_predictR)

# transform predictions in R^2 back to [0,1]^2 domain 
z_valuesU = np.array(z_hat_meshR / (data_predictR.Xhat.apply(norm.pdf)*data_predictR.Yhat.apply(norm.pdf)))

# make meshgrid in [0,1]^2 domain for plotting
x_meshU, y_meshU = np.meshgrid(x_valuesU, y_valuesU)
z_meshU = z_values.reshape(x_hat_meshR.shape)

In [None]:
data.shape

In [None]:
#-----------------------------------------------------------------------------------------------
# Use interpolation on pdf estimate 
# 1. Fit the interpolation on the R^2 domain as the interpolation itself is in a real domain
# 2. Fit the interpolation on the [0,1]^2 domain and use bounding boxes to restrict the domain
#-----------------------------------------------------------------------------------------------
import scipy

# Real domain
interp_model = scipy.interpolate.RectBivariateSpline(x_hat_valuesR, y_hat_valuesR, z_meshU)

In [None]:
Z = interp_model(x_hat_valuesR, y_hat_valuesR)

fig, ax = plt.subplots(nrows=1, ncols=2, subplot_kw={'projection': '3d'})
ax[0].plot_wireframe(x_hat_valuesR, y_hat_valuesR, z_meshU, color='k')
ax[1].plot_wireframe(x_hat_valuesR, y_hat_valuesR, Z, color='k')

In [None]:
x_hat_valuesR