In [28]:
import time
from typing import Dict, List
import pandas as pd
import numpy as np
import datetime
import glob

STOREPath =  "../data/factorData/"
OHLCPath = "../data/lakeAPIData/OHLC/"

# Use a custom list
# TARGET = ['BNB-USDT', 'BTC-USDT', 'ETH-USDT', 'XRP-USDT']

# Dynamically create the list from file names
fileLists = glob.glob(OHLCPath + "*")
TARGET = [file.split("/")[-1].split(".")[0] for file in fileLists]

fileLists = glob.glob(OHLCPath + "*")


def extractDateFromTime(timestamp:pd.Timestamp)->datetime.date:
    year = timestamp.year
    month = timestamp.month
    day = timestamp.day
    date = datetime.datetime(year=year, month=month, day=day)
    return date 

data = []

for file in fileLists:

    minuteLevelOHLC = pd.read_csv(file, compression="gzip")
    minuteLevelOHLC = minuteLevelOHLC.convert_dtypes()
    minuteLevelOHLC['origin_time'] = pd.to_datetime(minuteLevelOHLC['origin_time'])
    minuteLevelOHLC['date'] = minuteLevelOHLC['origin_time'].apply(lambda x : extractDateFromTime(x))
    data += [minuteLevelOHLC]

data = pd.concat(data)
data['return'] = (data['close'] - data['open'])/data['open']
#Market return is defined as the average return of all the cryptocurrencies return
mktReturn = data.groupby("origin_time").agg({'return':np.mean})
mktReturn.rename(columns={'return':'mktReturn'}, inplace=True)
data = pd.merge(data, mktReturn, on='origin_time', how='left')
data = data.dropna()


  mktReturn = data.groupby("origin_time").agg({'return':np.mean})


In [29]:
TOTALNUMOFCOINS = len(set(data['symbol'].values))
LOWLEVEL = int(TOTALNUMOFCOINS/3)
HIGHLEVEL= int(TOTALNUMOFCOINS - TOTALNUMOFCOINS/3)

In [30]:
def generateLiqBucketSymbols(data:pd.DataFrame)->Dict:
    """"""
    Liq = data.groupby("symbol").agg({'trades':np.mean}).sort_values(by='trades')
    Liq.rename(columns={'trades':'Liq'}, inplace=True)
    LOWLiq = Liq['Liq'].values[0:LOWLEVEL].max()
    HIGHLiq = Liq['Liq'].values[0:HIGHLEVEL].max()
    lowLiqSymbols = Liq[Liq['Liq']<=LOWLiq].reset_index()['symbol'].values
    mediumLiqSymbols = Liq[(Liq['Liq']>LOWLiq)&(Liq['Liq']<HIGHLiq)].reset_index()['symbol'].values
    highLiqSymbols = Liq[Liq['Liq']>=HIGHLiq].reset_index()['symbol'].values
    return {'low': lowLiqSymbols, 'medium':mediumLiqSymbols, 'high':highLiqSymbols}

LiqBucketSyms = generateLiqBucketSymbols(data)

  Liq = data.groupby("symbol").agg({'trades':np.mean}).sort_values(by='trades')


In [31]:
def getBeta(xVar:str, yVar:str, df:pd.DataFrame) -> float:
    df = df.sort_values("received_time").set_index("received_time")
    x = df[xVar].values
    y = df[yVar].values
    beta = np.sum( (x - np.mean(x))*(y - np.mean(y)) ) / np.sum((x - np.mean(x))**2)
    return beta 


Beta = data.groupby("symbol").apply(lambda x : getBeta('return','mktReturn', x))
Beta = pd.DataFrame({'beta':Beta})

def generateVolBucketSymbols(Beta:pd.DataFrame) ->Dict:
    Beta = Beta.sort_values(by='beta')
    LOWBeta = Beta['beta'].values[0:LOWLEVEL].max()
    HIGHBeta = Beta['beta'].values[0:HIGHLEVEL].max()
    lowBetaSymbols = Beta[Beta['beta']<=LOWBeta].reset_index()['symbol'].values
    mediumBetaSymbols = Beta[(Beta['beta']>LOWBeta)&(Beta['beta']<HIGHBeta)].reset_index()['symbol'].values
    highBetaSymbols = Beta[Beta['beta']>=HIGHBeta].reset_index()['symbol'].values
    return {'low':lowBetaSymbols, 'medium':mediumBetaSymbols, 'high':highBetaSymbols}

VolBucketSyms = generateVolBucketSymbols(Beta)

In [32]:
import functools

def getMomentum(basicRet:str, df:pd.DataFrame, windows:List = [60,2*60, 3*60,6*60,12*60]) -> float:
    df = df.sort_values(by='received_time')
    for window in windows:
        df[basicRet+str(window)] = df.rolling(window)['return'].sum()
    df['momentum'] = functools.reduce(lambda a, b: a+b, [df[basicRet+str(window)] for window in windows])

    return df 

momentum = data.groupby("symbol").apply(lambda x: getMomentum('return',x)).reset_index(drop=True)
momentum = momentum.groupby(["symbol","date"]).agg({'momentum':np.mean}).reset_index()
#last day momentum
momentum = momentum[momentum['date'] == '2024-01-30']

def generateMomBucketSymbols(Mom:pd.DataFrame, date:str)->Dict:
    Mom = Mom[Mom['date']==date].sort_values(by='momentum')
    LOWMom = Mom['momentum'].values[0:LOWLEVEL].max()
    HIGHMom = Mom['momentum'].values[0:HIGHLEVEL].max()
    lowMomSymbols = Mom[Mom['momentum']<=LOWMom].reset_index()['symbol'].values
    mediumMomSymbols = Mom[(Mom['momentum']>LOWMom)&(Mom['momentum']<HIGHMom)].reset_index()['symbol'].values
    highMomSymbols = Mom[Mom['momentum']>=HIGHMom].reset_index()['symbol'].values
    return {'low':lowMomSymbols, 'medium':mediumMomSymbols, 'high':highMomSymbols}

MomBucketSyms = generateMomBucketSymbols(momentum,'2024-01-30')

  momentum = momentum.groupby(["symbol","date"]).agg({'momentum':np.mean}).reset_index()


In [33]:
class riskFactor(object):
    
    def __init__(self, name, componentSymbols, freq = '1m'):
        self._factorName = name
        self._symbols = componentSymbols
        self._freq = '1m'
        return 
    
    @property
    def name(self):
        return self._factorName
    
    @property
    def symbols(self):
        return self._symbols
        
    @property
    def freq(self):
        return self._freq
    
    @property
    def factorRetTS(self):
        return self._factorTS
    
    def getFactorReturnTS(self, data:pd.DataFrame)->pd.DataFrame:
        """
        Calculate risk factor returns given frequency and symbols
        Parameters
        ----------
        data: a panel dataframe with all symbols and their corresponding returns

        Returns: a timeSeries table of risk factor returns and time
        -------

        """
        factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
        factorRet.rename(columns={'return':self._factorName}, inplace=True)
        self._factorTS = factorRet
        return 
        
    
    def storeData(self):
        self._factorTS.to_csv(STOREPath+self._factorName+".csv", index = False)
        

class cryptoAsset(object):
    
    def __init__(self, symbol:str, sourceData:pd.DataFrame):
        self._symbol = symbol 
        self._sourceData = sourceData
        
    @property
    def symbol(self):
        return self._symbol
    
    @property
    def retTs(self):
        data = self._sourceData[self._sourceData['symbol'] == self._symbol][['date','origin_time','symbol','return']]
        return data
    
        
BNB  = cryptoAsset('BNB-USDT', data)
BTC  = cryptoAsset('BTC-USDT', data)
ETH  = cryptoAsset('ETH-USDT', data)
XRP  = cryptoAsset('XRP-USDT', data)


## Systematic Risk Factors

>* Liquidity: High,Medium, Low, liquidity is defined as the average trading volume
>* Momentum: High, Medium, Low, momentum is defined as the crypto's past 3 hour, 6 hours, 12 hours, 18 hours
>* Market: Average crypto coins return
>* Volatility: High, Medium, Low, defined as the beta coefficient to the market risk

In [34]:
marketRisk = riskFactor("market", list(set(data['symbol'].values)))
marketRisk.getFactorReturnTS(data)
marketRisk.storeData()
#Liquidity Risk Bucket
lowLiqRisk = riskFactor("Liq:low", LiqBucketSyms['low'])
lowLiqRisk.getFactorReturnTS(data)
lowLiqRisk.storeData()

medLiqRisk = riskFactor("Liq:med", LiqBucketSyms['medium'])
medLiqRisk.getFactorReturnTS(data)
medLiqRisk.storeData()

highLiqRisk = riskFactor("Liq:high", LiqBucketSyms['high'])
highLiqRisk.getFactorReturnTS(data)
highLiqRisk.storeData()
#Volatility Risk Bucket
lowVolRisk = riskFactor("Vol:low", VolBucketSyms['low'])
lowVolRisk.getFactorReturnTS(data)
lowVolRisk.storeData()

medVolRisk = riskFactor("Vol:med", VolBucketSyms['medium'])
medVolRisk.getFactorReturnTS(data)
medVolRisk.storeData()

highVolRisk = riskFactor("Vol:high", VolBucketSyms['high'])
highVolRisk.getFactorReturnTS(data)
highVolRisk.storeData()
#Momentum Risk Bucket
lowMomRisk = riskFactor("Mom:low", MomBucketSyms['low'])
lowMomRisk.getFactorReturnTS(data)
lowMomRisk.storeData()

medMomRisk = riskFactor("Mom:med", MomBucketSyms['medium'])
medMomRisk.getFactorReturnTS(data)
medMomRisk.storeData()

highMomRisk = riskFactor("Mom:high", MomBucketSyms['high'])
highMomRisk.getFactorReturnTS(data)
highMomRisk.storeData()

  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg({'return':np.mean})
  factorRet = data[data['symbol'].isin(self._symbols)].groupby("origin_time").agg(

In [38]:
from numpy.linalg import inv
import os

class factorRiskModel(object):
    
    def __init__(self, riskFactors:List[riskFactor], coin:cryptoAsset ):
        self._coin = coin
        self._riskFactors = riskFactors
        self._factorNames = None
    
    @property
    def symbol(self):
        return self._coin.symbol
    
    @property
    def riskFactors(self):
        factors = []
        for rf in self._riskFactors:
            factors += [rf.name]
        return factors
    
    def rollingReg(self, df:pd.DataFrame, xvar:List[str], yvar:str, window:int)->pd.DataFrame:
        from numpy.linalg import inv
        Y = df[yvar].rolling(window)
        X = df[xvar].rolling(window)

        total_windows = len(df) - window + 1 
        print(f"Total windows to process: {total_windows}")

        i = 1 
        for x, y in zip(X, Y):
            if i % 10 == 0: 
                print(f"Processing window {i}/{total_windows}") #basically shows the loading progress

            if x.dropna().empty or y.dropna().empty:
                i += 1
                continue 

            x = x.values.astype('float').reshape(-1, len(xvar))
            ones = np.ones((x.shape[0], 1))
            x = np.concatenate([ones, x], axis=1).reshape(-1, len(xvar) + 1)
            y = y.values.astype('float').reshape(-1, 1)

            # Regularization paramemter in order to avoid the divide by zero stuff
            reg_param = 1e-6
            coefs = np.dot(inv(np.dot(x.T, x) + reg_param * np.eye(x.shape[1])), np.dot(x.T, y))
            Yhat = np.dot(x, coefs)
            yMean = np.mean(y)

            var_y = np.sum((y - yMean)**2)
            if var_y != 0:
                Rsqrd = 1 - np.sum((y - Yhat)**2) / var_y
                if x.shape[0] > len(xvar) + 1:
                    RsqrdAdj = 1 - (1 - Rsqrd) * (x.shape[0] - 1) / (x.shape[0] - len(xvar) - 1)
                else:
                    RsqrdAdj = 0  
            else:
                Rsqrd = 0  
                RsqrdAdj = 0 

            df.loc[window + i, 'Rsqrd'] = Rsqrd
            df.loc[window + i, 'RsqrdAdj'] = RsqrdAdj
            for j in range(len(coefs)):
                if j == 0:
                    df.loc[window + i, 'constant'] = coefs[j][0]
                else:
                    df.loc[window + i, xvar[j - 1] + '_coef'] = coefs[j][0]

            i += 1
        return df




    def calcRiskExposure(self, window=720)->pd.DataFrame:
        """
        calculate a rolling time series regression to get the dynamic risk exposure per coin to systematic risk factors
        Parameters
        ----------
        window

        Returns a time series of risk exposures
        -------

        """
        data = self._coin.retTs
        for rf in self._riskFactors:
            data = pd.merge(data, rf.factorRetTS, on = 'origin_time', how = 'left')
        
        #1440=60*24 hour rolling risk model
        rollFactorExposure = self.rollingReg(data.dropna(), self.riskFactors, 'return', 1440).dropna()

        return rollFactorExposure
    
    def saveRiskExposure(self, exposure_df: pd.DataFrame, file_path: str):
        print("Saving now....")
        directory = os.path.dirname(file_path)
        print(f"Attempting to save to {file_path}") 

        if not os.path.exists(directory):
            print("does not exist")
            os.makedirs(directory)
        exposure_df.to_csv(file_path, index=False)

    
riskFactors = [marketRisk,lowLiqRisk,medLiqRisk,highLiqRisk,lowVolRisk,medVolRisk,highVolRisk,lowMomRisk,medMomRisk,highMomRisk]



In [40]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def process_coin(risk_model, coin, exposure_path):
    try:
        print(f"Processing: {coin.symbol}")
        factor_model = risk_model(riskFactors, coin)
        exposure = factor_model.calcRiskExposure()
        factor_model.saveRiskExposure(exposure, exposure_path)
        return f"Completed processing for {coin.symbol}"
    except Exception as e:
        return f"Error processing {coin.symbol}: {e}"

coins = [(BTC, '../data/factorData/exposures/BTCFactorExposure.csv'),
         (BNB, '../data/factorData/exposures/BNBFactorExposure.csv'),
         (ETH, '../data/factorData/exposures/ETHFactorExposure.csv'),
         (XRP, '../data/factorData/exposures/XRPFactorExposure.csv')]

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_coin, factorRiskModel, coin, path) for coin, path in coins]
    for future in as_completed(futures):
        print(future.result())



Processing: BTC-USDT
Total windows to process: 86394
Processing window 10/86394
Processing window 20/86394
Processing window 30/86394
Processing window 40/86394
Processing window 50/86394
Processing window 60/86394
Processing window 70/86394
Processing window 80/86394
Processing window 90/86394
Processing window 100/86394
Processing window 110/86394
Processing window 120/86394
Processing window 130/86394
Processing window 140/86394
Processing window 150/86394
Processing window 160/86394
Processing window 170/86394
Processing window 180/86394
Processing window 190/86394
Processing window 200/86394
Processing window 210/86394
Processing window 220/86394
Processing window 230/86394
Processing window 240/86394
Processing window 250/86394
Processing window 260/86394
Processing window 270/86394
Processing window 280/86394
Processing window 290/86394
Processing window 300/86394
Processing window 310/86394
Processing window 320/86394
Processing window 330/86394
Processing window 340/86394
Proc