In [None]:
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint
from scipy.stats import norm, logistic, genextreme
from copulas.multivariate import GaussianMultivariate

import itertools

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from statsmodels.regression.rolling import RollingOLS
import glob
import os
import time
import optuna
from statsmodels.tsa.stattools import coint
from copulas.multivariate import GaussianMultivariate
optuna.logging.set_verbosity(optuna.logging.WARNING)

path = r"C:\Users\manan\Downloads\Pt 1\data\\DataBETA"
csv_files = glob.glob(os.path.join(path, "*.csv"))
file_data_dict = {}

dfl = []
for f in csv_files:
    x = pd.read_csv(f)
    x.index = pd.to_datetime(x['time'])
    x = x['close']
    dfl.append(x)

for idx, file in enumerate(csv_files):
    file_name = os.path.basename(file)
    file_data_dict[idx] = file_name
modified_dict = {}
for key, file_name in file_data_dict.items():
    new_value = file_name.split('data_')[1].split('-USDT-SWAP.csv')[0]
    modified_dict[key] = new_value

df_dict = {}
for i in range(len(dfl)):
    df_dict[i] = pd.DataFrame({modified_dict[i]: dfl[i]})
row_col_names = list(modified_dict.values())
df = pd.DataFrame(1, index=row_col_names, columns=row_col_names)
merged_df = pd.concat(dfl, axis=1)
merged_df.columns = [modified_dict[i] for i in range(len(dfl))]
merged_df = merged_df.dropna()
merged_df = merged_df.iloc[:, :6]

# Step 1: Load and prepare returns data
returns_df = merged_df.pct_change().dropna()

# Step 2: Define helper functions for marginal fitting and copula selection
def fit_marginal_distribution(series, dist_type='normal'):
    if dist_type == 'normal':
        return norm.fit(series)
    elif dist_type == 'logistic':
        return logistic.fit(series)
    elif dist_type == 'genextreme':
        return genextreme.fit(series)
    else:
        raise ValueError("Unsupported distribution type")

def get_residuals(series, params, dist_type='normal'):
    if dist_type == 'normal':
        return (series - params[0]) / params[1]
    elif dist_type == 'logistic':
        location, scale = params
        return (series - location) / scale
    elif dist_type == 'genextreme':
        c, loc, scale = params
        return (series - loc) / scale
    else:
        raise ValueError("Unsupported distribution type")

# Step 3: Check cointegration between all pairs
cointegrated_pairs = []
p_value_threshold = 0.05

for (col1, col2) in itertools.combinations(returns_df.columns, 2):
    score, p_value, _ = coint(returns_df[col1], returns_df[col2])
    if p_value < p_value_threshold:
        cointegrated_pairs.append((col1, col2))


copula_signals = {}

for pair in cointegrated_pairs:
    stock1, stock2 = pair
    print(f"Processing pair: {stock1}, {stock2}")

    
    params1 = fit_marginal_distribution(returns_df[stock1], dist_type='normal')
    params2 = fit_marginal_distribution(returns_df[stock2], dist_type='normal')

    
    u1 = norm.cdf(get_residuals(returns_df[stock1], params1, dist_type='normal'))
    u2 = norm.cdf(get_residuals(returns_df[stock2], params2, dist_type='normal'))

    
    copula_signals[pair] = []
    

# Fit copula (Gaussian example)
    copula = GaussianMultivariate()  # Use GaussianMultivariate instead of GaussianCopula
    copula.fit(pd.DataFrame({'U1': u1, 'U2': u2}))
    
    # Generate copula-based signals
    copula_signals[pair] = []
    threshold = 0.975  # Arbitrary threshold; adjust based on strategy requirements
    
    for i in range(len(u1)):
        joint_prob = copula.cumulative_distribution([u1.iloc[i], u2.iloc[i]])
        if joint_prob > threshold:
            copula_signals[pair].append(('Sell', stock1, stock2))  # Excess positive correlation
        elif joint_prob < 1 - threshold:
            copula_signals[pair].append(('Buy', stock1, stock2))  # Excess negative correlation
        else:
            copula_signals[pair].append(('Hold', stock1, stock2))



initial_cash = 100000  
cash = initial_cash
position_size = initial_cash / len(copula_signals) / 2  
returns = []

for pair, signals in copula_signals.items():
    stock1, stock2 = pair
    stock1_returns = returns_df[stock1]
    stock2_returns = returns_df[stock2]
    
    for i, (signal, s1, s2) in enumerate(signals):
        if signal == 'Buy':
            cash += position_size * (stock1_returns.iloc[i] - stock2_returns.iloc[i])
        elif signal == 'Sell':
            cash += position_size * (stock2_returns.iloc[i] - stock1_returns.iloc[i])

    returns.append(cash / initial_cash - 1)

total_return = cash / initial_cash - 1
print(f"Total Return from Copula-based Pairs Trading: {total_return * 100:.2f}%")

import matplotlib.pyplot as plt
plt.plot(np.cumsum(returns), label="Copula-based Strategy")
plt.xlabel("Trading Days")
plt.ylabel("Cumulative Return")
plt.legend()
plt.show()


Processing pair: 1INCH, AAVE
