In [None]:
from scipy import stats
from scipy.linalg import inv
from copulas import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

qb = QuantBook()

In [None]:
tickers = ["XLF",   # Financial Select Sector SPDR Fund
           "COF",   # Capital One Financial Corporation
           "GS",    # Goldman Sachs Group, Inc.
           "JPM",   # J P Morgan Chase & Co
           "WFC"]   # Wells Fargo & Company 
symbols = [qb.AddEquity(ticker, Resolution.Daily).Symbol for ticker in tickers]
history = qb.History(symbols, datetime(2021, 1, 1), datetime(2022, 1, 1))

In [None]:
# copula test
def permutation(n):
    main_list=[np.concatenate([[k%2]*int((2**n)/(2**i)) for k in range(2**i)]) for i in range(1,n+1)]
    return np.array(main_list).T*2-1

def Sig(n):
    '''
    Sigma Matrix
    '''
    a=permutation(n)
    sub = a @ a.T
    add = n
    F=(add+sub)/2
    D=(add-sub)/2
    return ((2/15)**F)*((1/30)**D)

fh = lambda x,j: (x-1)*(3*x-1) if j==1 else x*(2-3*x)

def Tpn(rank_df):
    '''
    t stats
    '''
    n,p=rank_df.shape
    T_per=permutation(p)
    pos1=fh(rank_df.div(n+1),1)
    neg1=fh(rank_df.div(n+1),-1)
    TNP2=[pd.DataFrame(np.where(np.array(i)==1,pos1,neg1),columns=rank_df.columns,index=rank_df.index).prod(axis=1).mean() for i in T_per]
    return np.array(TNP2)

def Independence_test(df):
    '''
    copula test of independence of serveral time series
    H0: those timeseries are independent
    return stats and p-value
    
    df: dataframe of (n*p) time series dataframe
    '''
    rank_df=df.rank()
    n,p=rank_df.shape
    T=Tpn(rank_df).reshape([-1,1])
    S_inv=inv(Sig(p))
    T_stats=(T.T@S_inv@T)[0][0]*n
    p_value=1-stats.chi2.cdf(T_stats, p**2)
    return T_stats,p_value

In [None]:
# ETF
tickers= ["VDE", "USO", "XES", "XOP", "UNG", "ICLN", "ERX","UCO", "AMJ", "BNO", "AMLP", "TAN", 
                "GLD", "IAU", "SLV", "GDX", "AGQ", "PPLT", "NUGT","JNUG","QQQQ",
                "QQQ", "IGV", "QTEC", "FDN", "FXL", "TECL", "SOXL", "SKYY", "KWEB",
                "IEF", "SHY", "TLT", "IEI", "TLH", "BIL", "SPTL","TMF", "SCHO", 
                "SCHR", "SPTS", "GOVT", "SPLV", "UVXY", "EEMV", "EFAV", "USMV",
                "XLB", "XLE", "XLF", "XLI", "XLK", "XLP", "XLU", "XLV", "XLY","SPY","MDY",
                "IEV","EEM", "ILF", "VIXY","ERY", "SCO", "DGAZ","DUST", "JDST","TECS", 
                "SOXS","SHV", "TBT", "TBF", "TMV","SVXY","SQQQ","TQQQ",
                "VGSH","VGIT","VGLT"]
# Liquidate ETF 
symbols = []

for ticker in tickers:
    sym = qb.AddEquity(ticker)
    symbols.append(sym.Symbol)

start_date = DateTime(2017,1,1)
end_date =  DateTime(2021,11,1)
history = qb.History(symbols, start_date,end_date,Resolution.Daily)['close']
history = history.unstack(level = 0)
log_rtns = (np.log(history) - np.log(history.shift(1))).dropna()

In [None]:
# financial service stock selection
tickers = ['WFC','JPM','BRKB R735QTJ8XC9X', 'CMB R735QTJ8XC9X', 'NOB R735QTJ8XC9X', 'NB R735QTJ8XC9X', 
'V U12VRGLO8PR9', 'MA TIX2XDPLFR6T', 'RY R735QTJ8XC9X', 'GS RKEOGCOG6RFP', 
'TD R735QTJ8XC9X', 'USB R735QTJ8XC9X']

symbols = []

for ticker in tickers:
    sym = qb.AddEquity(ticker)
    symbols.append(sym.Symbol)

start_date = DateTime(2017,1,1)
end_date =  DateTime(2018,1,1)
history = qb.History(symbols, start_date,end_date,Resolution.Daily)['close']
history = history.unstack(level = 0)
log_rtns = (np.log(history) - np.log(history.shift(1))).dropna()

In [None]:
# basic material stock selection

tickers = ['DOW R735QTJ8XC9X', 'LYB UQRGJ93635GL', 'ECL R735QTJ8XC9X', 'PX R735QTJ8XC9X', 'APD R735QTJ8XC9X', 
'CRHCY R735QTJ8XC9X', 'PPG R735QTJ8XC9X', 'SHW R735QTJ8XC9X', 'PCU R735QTJ8XC9X', 'FCX R735QTJ8XC9X','IYM']

symbols = []

for ticker in tickers:
    sym = qb.AddEquity(ticker)
    symbols.append(sym.Symbol)

start_date = DateTime(2017,1,1)
end_date =  DateTime(2021,11,1)
history = qb.History(symbols, start_date,end_date,Resolution.Daily)['close']
history = history.unstack(level = 0)
log_rtns = (np.log(history) - np.log(history.shift(1))).dropna()

In [None]:
# HealthCare stock selection

tickers = ['JNJ R735QTJ8XC9X', 'PFE R735QTJ8XC9X', 'MRK R735QTJ8XC9X', 'UNH R735QTJ8XC9X', 'AMGN R735QTJ8XC9X', 
'ABBV VCY032R250MD', 'MDT R735QTJ8XC9X', 'BMY R735QTJ8XC9X', 'GILD R735QTJ8XC9X', 'WAG R735QTJ8XC9X']

symbols = []

for ticker in tickers:
    sym = qb.AddEquity(ticker)
    symbols.append(sym.Symbol)

start_date = DateTime(2017,1,1)
end_date =  DateTime(2018,1,1)
history = qb.History(symbols, start_date,end_date,Resolution.Daily)['close']
history = history.unstack(level = 0)
log_rtns = (np.log(history) - np.log(history.shift(1))).dropna()

In [None]:
# Utilities Industry Selection

tickers = ['FPL R735QTJ8XC9X', 'DUK R735QTJ8XC9X', 'SO R735QTJ8XC9X', 'D R735QTJ8XC9X', 'PE R735QTJ8XC9X', 
'AEP R735QTJ8XC9X', 'SRE RBYFBGNAJMUD', 'EIX R735QTJ8XC9X', 'PPL R735QTJ8XC9X', 
'ED R735QTJ8XC9X', 'PEG R735QTJ8XC9X', 'NSP R735QTJ8XC9X', 'WEC R735QTJ8XC9X', 
'DTE R735QTJ8XC9X', 'NU R735QTJ8XC9X']

symbols = []

for ticker in tickers:
    sym = qb.AddEquity(ticker)
    symbols.append(sym.Symbol)
start_date = DateTime(2017,1,1)
end_date =  DateTime(2018,1,1)
history = qb.History(symbols, start_date,end_date,Resolution.Daily)['close']
history = history.unstack(level = 0)
log_rtns = (np.log(history) - np.log(history.shift(1))).dropna()

In [None]:
# Energy Industry Selection

tickers = ['XON R735QTJ8XC9X',
'SLB R735QTJ8XC9X', 'IPPIF R735QTJ8XC9X', 'P R735QTJ8XC9X', 'EOG R735QTJ8XC9X',
 'SU R735QTJ8XC9X', 'EPD RCQZA696RIHX', 'OXY R735QTJ8XC9X', 'PSX V67S42RJBF51', 'CED RWTPESR2XAAT']

symbols = []

for ticker in tickers:
    sym = qb.AddEquity(ticker)
    symbols.append(sym.Symbol)
start_date = DateTime(2017,1,1)
end_date =  DateTime(2018,1,1)
history = qb.History(symbols, start_date,end_date,Resolution.Daily)['close']
history = history.unstack(level = 0)
log_rtns = (np.log(history) - np.log(history.shift(1))).dropna()

# Kendall Correlation

In [None]:
results = pd.DataFrame(columns=['tau'])
for s1 in log_rtns.columns:
    for s2 in log_rtns.columns:
        if s1 != s2 and (f'{s2}-{s1}' not in results.index):
            results.loc[f'{s1}-{s2}'] = stats.kendalltau(log_rtns[s1], log_rtns[s2])[0]

In [None]:
def parse_pair(pair):
    s1 = pair[:pair.find('-')]
    s2 = pair[pair.find('-')+1:]
    return s1,s2

selected_stocks = []
selected_pairs = []
tau_benchmark = 0

for pair in results.loc[(results['tau']>tau_benchmark)|(results['tau']<-1*tau_benchmark),:].sort_values(by = 'tau',ascending=False).index:
    s1,s2 = parse_pair(pair)
    selected_pairs.append(pair)
    if (s1 not in selected_stocks):
        selected_stocks.append(s1)
    if (s2 not in selected_stocks):
        selected_stocks.append(s2)

In [None]:
results.sort_values(by=['tau'])

# Co - integration Test

In [None]:
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
def adf_pvalue(x,y):
    x = np.log(x)
    y = np.log(y)

    X = sm.add_constant(x)
    model = sm.OLS(y,X)
    results = model.fit()

    sigma = math.sqrt(results.mse_resid)

    results = model.fit()

    slope = results.params[1]
    intercept = results.params[0]

    res = results.resid
    zscore = res/sigma
    adf = adfuller(res)
    
    return adf[:2][1]

In [None]:
coint_dig = pd.DataFrame(columns=['p_value'])
for pair in selected_pairs:
    s1,s2 = parse_pair(pair)
    stock1,stock2 = history[s1],history[s2]
    coint_dig.loc[f'{s1}-{s2}'] = adf_pvalue(stock1.values,stock2.values)
    coint_dig.loc[f'{s2}-{s1}'] = adf_pvalue(stock2.values,stock1.values)

In [None]:
coint_dig[coint_dig['p_value']<0.01].sort_values(by='p_value')

In [None]:
selected_pairs = coint_dig[coint_dig['p_value']<0.01].sort_values(by='p_value').index.to_list()
selected_pairs

In [None]:
# QQ Plot Distance

In [None]:
from statsmodels.distributions.empirical_distribution import ECDF

In [None]:
def cal_dig_distance(pair):
    
    s1,s2 = parse_pair(pair)
    stock1,stock2 = log_rtns[s1],log_rtns[s2]

    ecdf_x,ecdf_y = ECDF(stock1),ECDF(stock2)
    u, v = ecdf_x(stock1), ecdf_x(stock2)

    total_dig = 0

    for x,y in zip(u,v):
        
        p3=np.array([x,y])
        p1=np.array([0,0])
        p2=np.array([1,1])
        dig_distance = np.abs(np.linalg.norm(np.cross(p2-p1, p1-p3)))/np.linalg.norm(p2-p1)

        total_dig += dig_distance

    return total_dig/len(u)

dig_dist = pd.DataFrame(columns=['QQplot'])
for pair in selected_pairs:
    s1,s2 = parse_pair(pair)
    dig_dist.loc[f'{s1}-{s2}'] = cal_dig_distance(pair)

In [None]:
dig_dist.sort_values(by='QQplot')

In [None]:
selected_stocks = []
selected_pairs = []

for pair in dig_dist.loc[dig_dist['QQplot']<1,:].sort_values(by='QQplot').index:
    s1,s2 = parse_pair(pair)
    selected_pairs.append(pair)
    if (s1 not in selected_stocks):
        selected_stocks.append(s1)
    if (s2 not in selected_stocks):
        selected_stocks.append(s2)

In [None]:
dig_dist.sort_values(by='QQplot')

In [None]:
for pair in ['ABBV VCY032R250MD-BMY R735QTJ8XC9X']:
    s1,s2 = parse_pair(pair)
    stock1,stock2 = log_rtns[s1],log_rtns[s2]

    ecdf_x,ecdf_y = ECDF(stock1),ECDF(stock2)
    u, v = ecdf_x(stock1), ecdf_x(stock2)

    plt.scatter(u,v)
    plt.axline([0, 0], [1, 1],c = 'red')
    plt.title(pair)
    plt.show()

# Copula Test

In [None]:
selected_pairs =coint_dig[coint_dig['p_value']<0.01].index.to_list()

In [None]:
copula_dig = pd.DataFrame(columns=['test_stats'])

for pair in selected_pairs:

    s1,s2 = parse_pair(pair)
    data = log_rtns[[s1,s2]]
    copula_dig.loc[f'{s1}-{s2}'] = Independence_test(data)[0]

In [None]:
copula_dig.sort_values(by = 'test_stats')

In [None]:
for pair in selected_pairs:
    s1,s2 = parse_pair(pair)
    stock1,stock2 = log_rtns[s1],log_rtns[s2]

    ecdf_x,ecdf_y = ECDF(stock1),ECDF(stock2)
    u, v = ecdf_x(stock1), ecdf_x(stock2)

    plt.scatter(u,v)
    plt.axline([0, 0], [1, 1],c = 'red')
    plt.title(pair)
    plt.show()

In [None]:
import numpy as np
dist = pd.DataFrame(columns=['ED'])
for pair in selected_pairs:
    s1,s2 = parse_pair(pair)
    stock1,stock2 = log_rtns[s1],log_rtns[s2]
    dist.loc[f'{s1}-{s2}'] = np.linalg.norm(stock1-stock2)

In [None]:
dist.sort_values(by='ED',ascending=True)

In [None]:
marginals_df = pd.DataFrame(index=selected_stocks, columns=['Distribution', 'AIC', 'BIC', 'KS_pvalue'])

for stock in selected_stocks:
    data = log_rtns[stock]
    dists = ['Normal', "Student's t", 'Logistic', 'Extreme','Uniform']
    best_aic = np.inf
    for dist,name in zip([stats.norm, stats.t, stats.genlogistic, stats.genextreme,stats.uniform], dists):
        params = dist.fit(data)
        dist_fit = dist(*params)
        log_like = np.log(dist_fit.pdf(data)).sum()
        aic = 2*len(params) - 2 * log_like
        if aic<best_aic:
            best_dist = name
            best_aic = aic
            best_bic = len(params) * np.log(len(data)) - 2 * log_like
            ks_pval = stats.kstest(data, dist_fit.cdf, N=100)[1]
            
    marginals_df.loc[stock] = [best_dist, best_aic, best_bic, ks_pval]

In [None]:
from copulas.multivariate.gaussian import GaussianMultivariate
from copulas.bivariate.clayton import Clayton
from copulas.bivariate.frank import Frank
from copulas.bivariate.gumbel import Gumbel

In [None]:
copulas_df = pd.DataFrame(index=selected_pairs, columns=['copula', 'aic'])
returns_form = log_rtns

for pair in selected_pairs:
    s1,s2 = parse_pair(pair)
    # fit marginals
    params_s1 = stats.t.fit(returns_form[s1])
    dist_s1 = stats.t(*params_s1)
    params_s2 = stats.t.fit(returns_form[s2])
    dist_s2 = stats.t(*params_s2)
    # apply probability integral transform
    u = dist_s1.cdf(returns_form[s1])
    v = dist_s2.cdf(returns_form[s2])
    
    best_aic = np.inf

    for copula in [GaussianMultivariate(),Clayton(), Gumbel(), Frank()]:
        data = pd.DataFrame(np.array([u,v]).T,columns=['u','v'])
        copula.fit(data.values)
        L = copula.log_probability_density(data.values).sum()
        aic = 2 - 2 * L
        print(aic)
        if aic < best_aic:
            best_aic = aic
            best_copula = str(copula)
            
    copulas_df.loc[pair] = [best_copula,best_aic]

In [None]:
copulas_df

In [None]:
data

In [None]:
copula = Clayton()
copula.fit(data.values)

In [None]:
log_prob = copula.log_probability_density(data.values)

In [None]:
log_prob = log_prob[~np.c(log_prob)].sum()

In [None]:
log_prob

In [None]:
copula.partial_derivative(data.values)

In [None]:
copula.partial_derivative(data[['v','u']].values)

In [None]:
s1,s2 = parse_pair(pair)
s3 = 'VDE T2FCD04TATET'
# fit marginals
params_s1 = stats.t.fit(returns_form[s1])
dist_s1 = stats.t(*params_s1)
params_s2 = stats.t.fit(returns_form[s2])
dist_s2 = stats.t(*params_s2)
params_s3 = stats.t.fit(returns_form[s3])
dist_s3 = stats.t(*params_s3)
# apply probability integral transform
u = dist_s1.cdf(returns_form[s1])
v = dist_s2.cdf(returns_form[s2])
y = dist_s3.cdf(returns_form[s3])

data = pd.DataFrame(np.array([u,v,y]).T,columns=['u','v','y'])

In [None]:
from copulas.multivariate.vine import VineCopula
copula = VineCopula('center')