### load required packages 

In [None]:
import numpy as np
import pandas as pd

import statsmodels.api as sm

import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from mpl_toolkits.mplot3d import Axes3D

from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tsa.stattools import kpss
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import acf, pacf

### load data

In [None]:
# load data
dai = pd.read_csv('index_df.csv')
crix = pd.read_csv('new_crix.csv').rename(columns={'price':'crix'})
eth = pd.read_csv('ETH-USD.csv').rename(columns={'Date':'date', 'Adj Close':'eth', 'Volume':'eth_vol'}).drop(columns=['Open','High', 'Low', 'Close'])
# dai_crypto = pd.read_csv('DAI-USD.csv').rename(columns={'Date':'date', 'Adj Close':'dai_crypto', 'Volume':'dai_vol'}).drop(columns=['Open','High', 'Low', 'Close'])

df_price = pd.merge(dai, crix, on='date')
df_price = pd.merge(df_price, eth, on='date')
# df_price = pd.merge(df_price, dai_crypto, on='date')
df_price = df_price.set_index('date')

### return

In [None]:
df_return = df_price.apply(lambda x: x.div(x.shift(1))-1)
df_return = df_return.iloc[1: , :]

In [None]:
fig = make_subplots(rows=2, cols=1,
                    vertical_spacing=0)

fig['layout'].update(height=800, width=1600,
                    title='',
                    showlegend=False,
                    font=dict(family='Times New Roman', size=20))


fig.add_trace(go.Scatter(x=df_return.index, 
                         y=df_return['hh_index'], 
                         line=dict(color='blue')),
              row=1, col=1)

fig.add_trace(go.Scatter(x=df_return.index, 
                         y=df_return['crix'], 
                         line=dict(color='blue')),
              row=2, col=1)



fig.update_xaxes(showline=True, linewidth=1, 
                 linecolor='black', 
                 mirror=True,
                 tickformat="%b\n%Y", 
                 showgrid=False)
fig.update_yaxes(showline=True, linewidth=1, 
                 linecolor='black', 
                 mirror=True, 
                 showgrid=False)

# Update xaxis properties
fig.update_xaxes(showticklabels=False, row=1, col=1)
fig.update_xaxes(title='Date', row=2, col=1, dtick = 'M3')


# Update yaxis properties
fig.update_yaxes(title='',  
                 row=1, col=1)
fig.update_yaxes(title='',  
                 row=2, col=1)


fig.update_layout({'plot_bgcolor': 'rgba(0,0,0,0)',
                    'paper_bgcolor': 'rgba(0,0,0,0)'},                  
                    font_color='black')

fig.show()

### autocorrelation 

In [None]:
def create_corr_plot(series, date, save_path, plot_pacf=False):
    corr_array = pacf(series.dropna(), alpha=0.05, nlags=40) if plot_pacf else acf(series.dropna(), alpha=0.05, nlags=40)
    lower_y = corr_array[1][:,0] - corr_array[0]
    upper_y = corr_array[1][:,1] - corr_array[0]
   
    fig = make_subplots(rows=2, cols=1, vertical_spacing=0.18)
    
    fig.add_trace(go.Scatter(y=series,
                             x=date,
                             mode='lines',
                             line=dict(color='blue', width=1)
                             ), 
                  row=1, col=1)
    
    fig['layout'].update(height=800, width=1200,
                    title='',
                    showlegend=False,
                    font=dict(family='Times New Roman', size=20))
    [fig.add_trace(go.Scatter(y=(0,corr_array[0][x]),
                             x=(x,x),
                             mode='lines',
                             line=dict(color='black', width=2)
                             ), 
                   row=2, col=1)
     for x in range(len(corr_array[0]))]
    
    fig.add_trace(go.Scatter(y=corr_array[0],
                             x=np.arange(len(corr_array[0])),
                             mode='markers',
                             marker=dict(color='blue', size=8)
                             ), 
                  row=2, col=1)
    
    fig.add_trace(go.Scatter(y=upper_y,
                             x=np.arange(len(corr_array[0])),
                             mode='lines',
                             line=dict(color='rgba(173, 216, 230,0.4)', width=1)
                             ), 
                  row=2, col=1)
    fig.add_trace(go.Scatter(x=np.arange(len(corr_array[0])), y=lower_y,
                             mode='lines',
                             fillcolor='rgba(173, 216, 230,0.4)',
                             fill='tonexty', 
                             line_color='rgba(173, 216, 230,0.4)'), 
                  row=2, col=1)

    fig.update_xaxes(title='Date', dtick = 'M3', row=1, col=1,)
    fig.update_xaxes(title='Lags', range=[-1,42], row=2, col=1,)
    fig.update_yaxes(title='Return', zerolinecolor='black', row=1, col=1,)
    fig.update_yaxes(title='ACF', zerolinecolor='black', row=2, col=1,)
    
    fig.update_xaxes(showline=True, linewidth=1, 
                 linecolor='black', 
                 mirror=True,
                 tickformat="%b\n%Y", 
                 showgrid=False)
    fig.update_yaxes(showline=True, linewidth=1, 
                 linecolor='black', 
                 mirror=True, 
                 showgrid=False)
    fig.update_layout({'plot_bgcolor': 'rgba(0,0,0,0)',
                    'paper_bgcolor': 'rgba(0,0,0,0)'},                  
                    font_color='black')

    fig.show()
    fig.write_image(save_path)

In [None]:
create_corr_plot(df_return['dcs_index'], df_return.index, save_path=None)

### stationarity

In [None]:
def kpss_test(df):    
    statistic, p_value, n_lags, critical_values = kpss(df.values)
    print(f'KPSS Statistic: {statistic}')
    print(f'p-value: {p_value}')
    print(f'num lags: {n_lags}')
    print('Critial Values:')
    for key, value in critical_values.items():
        print(f'   {key} : {value}')

def adf_test(df):
    result = adfuller(df.values)
    print('ADF Statistics: %f' % result[0])
    print('p-value: %f' % result[1])
    print('Critical values:')
    for key, value in result[4].items():
        print('\t%s: %.3f' % (key, value))

### granger causality


In [None]:
def is_GrangerCause(data=None, maxlag=10):
    """This function find if x2 Granger cause x1 vis versa """    
    gc = grangercausalitytests(data, maxlag=maxlag, verbose=False)    
    for i in range(maxlag):
        x=gc[i+1][0]
        p1 = x['lrtest'][1] # pvalue for lr test
        p2 = x['ssr_ftest'][1] # pvalue for ssr ftest
        p3 = x['ssr_chi2test'][1] #pvalue for ssr_chi2test
        p4 = x['params_ftest'][1] #pvalue for 'params_ftest'
        
        condition = ((p1 < 0.05 and p2 < 0.05) and (p3 < 0.05 and p4 < 0.05))
        
        if condition == True:
            cols = data.columns
            print('Yes: {} Granger causes {}'.format(cols[0], cols[1]))
            print('maxlag = {}\nResults: {}'.format(i, x))
            break
            
        else:
            if i == maxlag - 1:
                cols = data.columns
                print('No: {} does not Granger cause {}'.format(cols[0], cols[1]))

In [None]:
is_GrangerCause(data = df_return[['dcs_index', 'eth_vol']])
gc = grangercausalitytests(df_return[['dcs_index', 'eth_vol']], 4)


### convergent cross mapping 

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.spatial import distance
from scipy.stats import pearsonr
class ccm:
    """
    checking causality X -> Y       
    Args
        X: timeseries for variable X that could cause Y
        Y: timeseries for variable Y that could be caused by X
        tau: time lag. default = 1
        E: shadow manifold embedding dimension. default = 2
        L: time period/duration to consider (longer = more data). default = length of X
    """
    def __init__(self, X, Y, tau, E, L=None):
        '''
        X: timeseries for variable X that could cause Y
        Y: timeseries for variable Y that could be caused by X
        tau: time lag
        E: shadow manifold embedding dimension
        L: time period/duration to consider (longer = more data)
        We're checking for X -> Y
        '''
        self.X = X
        self.Y = Y
        self.tau = tau
        self.E = E
        if L == None:
            self.L = len(X)
        else:
            self.L = L
        self.My = self.shadow_manifold(Y) # shadow manifold for Y (we want to know if info from X is in Y)
        self.t_steps, self.dists = self.get_distances(self.My) # for distances between points in manifold

    def shadow_manifold(self, V):
        """
        Given
            V: some time series vector
            tau: lag step
            E: shadow manifold embedding dimension
            L: max time step to consider - 1 (starts from 0)
        Returns
            {t:[t, t-tau, t-2*tau ... t-(E-1)*tau]} = Shadow attractor manifold, dictionary of vectors
        """
        V = V[:self.L] # make sure we cut at L
        M = {t:[] for t in range((self.E-1) * self.tau, self.L)} # shadow manifold
        for t in range((self.E-1) * self.tau, self.L):
            v_lag = [] # lagged values
            for t2 in range(0, self.E-1 + 1): # get lags, we add 1 to E-1 because we want to include E
                v_lag.append(V[t-t2*self.tau])
            M[t] = v_lag
        return M

    # get pairwise distances between vectors in the time series
    def get_distances(self, M):
        """
        Args
            M: The shadow manifold from the time series
        Returns
            t_steps: timesteps
            dists: n x n matrix showing distances of each vector at t_step (rows) from other vectors (columns)
        """
        t_vec = [(k, v) for k,v in M.items()]
        t_steps = np.array([i[0] for i in t_vec])
        vecs = np.array([i[1] for i in t_vec])
        dists = distance.cdist(vecs, vecs)
        return t_steps, dists

    def get_nearest_distances(self, t, t_steps, dists):
        """
        Args:
            t: timestep of vector whose nearest neighbors we want to compute
            t_teps: time steps of all vectors in the manifold M, output of get_distances()
            dists: distance matrix showing distance of each vector (row) from other vectors (columns). output of get_distances()
            E: embedding dimension of shadow manifold M
        Returns:
            nearest_timesteps: array of timesteps of E+1 vectors that are nearest to vector at time t
            nearest_distances: array of distances corresponding to vectors closest to vector at time t
        """
        t_ind = np.where(t_steps == t) # get the index of time t
        dist_t = dists[t_ind].squeeze() # distances from vector at time t (this is one row)

        # get top closest vectors
        nearest_inds = np.argsort(dist_t)[1:self.E+1 + 1] # get indices sorted, we exclude 0 which is distance from itself
        nearest_timesteps = t_steps[nearest_inds] # index column-wise, t_steps are same column and row-wise
        nearest_distances = dist_t[nearest_inds]

        return nearest_timesteps, nearest_distances

    def predict(self, t):
        """
        Args
            t: timestep at manifold of y, My, to predict X at same time step
        Returns
            X_true: the true value of X at time t
            X_hat: the predicted value of X at time t using the manifold My
        """
        eps = 0.000001 # epsilon minimum distance possible
        t_ind = np.where(self.t_steps == t) # get the index of time t
        dist_t = self.dists[t_ind].squeeze() # distances from vector at time t (this is one row)
        nearest_timesteps, nearest_distances = self.get_nearest_distances(t, self.t_steps, self.dists)

        # get weights
        u = np.exp(-nearest_distances/np.max([eps, nearest_distances[0]])) # we divide by the closest distance to scale
        w = u / np.sum(u)

        # get prediction of X
        X_true = self.X[t] # get corresponding true X
        X_cor = np.array(self.X)[nearest_timesteps] # get corresponding Y to cluster in Mx
        X_hat = (w * X_cor).sum() # get X_hat

    #     DEBUGGING
    #     will need to check why nearest_distances become nan
    #     if np.isnan(X_hat):
    #         print(nearest_timesteps)
    #         print(nearest_distances)

        return X_true, X_hat


    def causality(self):
        '''
        Args:
            None
        Returns:
            (r, p): how much X causes Y. as a correlation between predicted X and true X and the p-value (significance)
        '''
        X_true_list = []
        X_hat_list = []

        for t in list(self.My.keys()): # for each time step in My
            X_true, X_hat = self.predict(t) # predict X from My
            X_true_list.append(X_true)
            X_hat_list.append(X_hat)

        x, y = X_true_list, X_hat_list
        r, p = pearsonr(x, y)

        return r, p

    def visualize_cross_mapping(self):
        """
        Visualize the shadow manifolds and some cross mappings
        """

        f, axs = plt.subplots(1, 2, figsize=(24, 10))

        for i, ax in zip((0, 1), axs): # i will be used in switching Mx and My in Cross Mapping visualization
            #===============================================
            # Shadow Manifolds Visualization

            X_lag, Y_lag = [], []
            for t in range(1, len(self.X)):
                X_lag.append(self.X[t-self.tau])
                Y_lag.append(self.Y[t-self.tau])
            X_t, Y_t = self.X[1:], self.Y[1:] # remove first value

            ax.scatter(X_t, X_lag, s=50, label='$M_x$', 
                       color='forestgreen')
            ax.scatter(Y_t, Y_lag, s=50, label='$M_y$', 
                       c='#F49EC4')

            #===============================================
            # Cross Mapping Visualization

            A, B = [(self.Y, self.X), (self.X, self.Y)][i]
            cm_direction = ['Mx to My', 'My to Mx'][i]

            Ma = self.shadow_manifold(A)
            Mb = self.shadow_manifold(B)

            t_steps_A, dists_A = self.get_distances(Ma) # for distances between points in manifold
            t_steps_B, dists_B = self.get_distances(Mb) # for distances between points in manifold

            # Plot cross mapping for different time steps
            timesteps = list(Ma.keys())
            for t in np.random.choice(timesteps, size=3, replace=False):
                Ma_t = Ma[t]
                near_t_A, near_d_A = self.get_nearest_distances(t, t_steps_A, dists_A)

                for i in range(self.E+1):
                    # points on Ma
                    A_t = Ma[near_t_A[i]][0]
                    A_lag = Ma[near_t_A[i]][1]
                    ax.scatter(A_t, A_lag, c='b', marker='s', s=150)

                    # corresponding points on Mb
                    B_t = Mb[near_t_A[i]][0]
                    B_lag = Mb[near_t_A[i]][1]
                    ax.scatter(B_t, B_lag, c='r', marker='*', s=150)

                    # connections
                    ax.plot([A_t, B_t], [A_lag, B_lag], c='r', linestyle=':')

            # ax.set_title(f'{cm_direction} cross mapping. time lag, tau = {self.tau}, E = 2')
            # ax.legend(prop={'size': 14})

            ax.set_xlabel('$x{(t)}$, $y{(t)}$', size=30)
            ax.set_ylabel('$x{(t-1)}$, $y{(t-1)}$', size=30)
            ax.patch.set_alpha(0)

        plt.show()
        # f.savefig('./cmm_ethvol_dai_2_2.pdf', format='PDF',facecolor=f.get_facecolor(), edgecolor='none')

    def plot_ccm_correls(self):
        """
        Args
            X: X time series
            Y: Y time series
            tau: time lag
            E: shadow manifold embedding dimension
            L: time duration
        Returns
            None. Just correlation plots between predicted X|M_y and true X
        """
        X_My_true, X_My_pred = [], []
        for t in range(self.tau, self.L):
            true, pred = self.predict(t)
            X_My_true.append(true)
            X_My_pred.append(pred)

        # predicting X from My
        r, p = np.round(pearsonr(X_My_true, X_My_pred), 4)
        fig = plt.figure(figsize=(15, 15), dpi=80)
        plt.scatter(X_My_true, X_My_pred, s=30, c='blue')
        plt.xlabel('$x(t)$', size=30)
        plt.ylabel('$\hat{x}(t)|M_y$', size=30)
        # plt.title(f'tau={self.tau}, E={self.E}, L={self.L}, Correlation coeff = {r}')
        plt.xlim([-0.5, 0.5])
        plt.ylim([-0.5, 0.5])

        plt.show()
        # fig.savefig('./cmm_correl_1.pdf', format='PDF',facecolor=fig.get_facecolor(), edgecolor='none')

In [None]:
from matplotlib import rc
font = {'family':'serif','serif':['Times'],
        'size'   : 30}
rc('font', **font)

In [None]:
ccm1 = ccm(X=df_return['dcs_index'], Y=df_return['eth_vol'], tau=2, E=2)
ccm1.causality()
ccm1.visualize_cross_mapping()
corr_, p = ccm1.causality()
corr_, p

In [None]:
ccm1.plot_ccm_correls()

In [None]:
X=df_return['dcs_index']
Y=df_return['eth']
L_range = range(5, len(X), 10) # L values to test
tau = 2
E = 2

Xhat_My, Yhat_Mx = [], [] # correlation list
for L in L_range: 
    ccm_XY = ccm(X, Y, tau, E, L) # define new ccm object # Testing for X -> Y
    ccm_YX = ccm(Y, X, tau, E, L) # define new ccm object # Testing for Y -> X    
    Xhat_My.append(ccm_XY.causality()[0]) 
    Yhat_Mx.append(ccm_YX.causality()[0]) 
    
print('X->Y r', np.round(Xhat_My[-1], 2), 'p value', np.round(ccm_XY.causality()[1], 4))
print('Y->X r', np.round(Yhat_Mx[-1], 2), 'p value', np.round(ccm_YX.causality()[1], 4))    
    
# plot convergence as L->inf. Convergence is necessary to conclude causality
fig = make_subplots()

fig['layout'].update(height=800, width=800,
                    title='',
                    showlegend=False,
                    font=dict(family='Times New Roman', size=24))

fig.add_trace(go.Scatter(x=[*L_range], 
                         y=Xhat_My, 
                         line=dict(color='blue', width=2)))
fig.add_trace(go.Scatter(x=[*L_range], 
                         y=Yhat_Mx, 
                         line=dict(color='red',dash='dot', width=2)))

fig.update_xaxes(showline=True, linewidth=1, 
                 linecolor='black', 
                 mirror=True,
                 tickformat="%b\n%Y", 
                 showgrid=False)
fig.update_yaxes(showline=True, linewidth=1, 
                 linecolor='black', 
                 mirror=True, 
                 showgrid=False)

# Update xaxis properties
fig.update_xaxes(title='L', dtick=50)

# Update yaxis properties
fig.update_yaxes(title='Correlation')

fig.update_layout({'plot_bgcolor': 'rgba(0,0,0,0)',
                    'paper_bgcolor': 'rgba(0,0,0,0)'},                  
                    font_color='black')

fig.show()
# fig.write_image('images/ccm_conv_1.pdf')

### delay embedding

In [None]:

POINTS = -1      ## Number of points to use, more can be slower to render.                  
                 ## -1 if all(but last).
TAU = 3       ## Delay, integer

time_series = df_return['dcs_index'][:POINTS]

delay_coordinates1 = [
            time_series[:-TAU if TAU else len(time_series)],    # t-T
            time_series[TAU:]                                   # t
            ]

delay_coordinates2 = [
    time_series[TAU:-TAU if TAU else len(time_series)], # t-tau
    time_series[2 * TAU:],                              # t
    time_series[:-2 * TAU if TAU else len(time_series)] # t-2tau
    ]
  
## visulize embedding
fig = make_subplots(rows=2, cols=2, 
                    vertical_spacing=0.1,
                    horizontal_spacing=0.01,
                    specs=[[{'type': 'xy', 'colspan': 2}, {'type': 'xy'}],
                           [{'type': 'xy'}, {'type': 'scene'}]],
                    row_heights=[0.5,0.5]
)
fig['layout'].update(height=1000, width=1000,
                    title='',
                    showlegend=False,
                    font=dict(family='Times New Roman', size=14))

# 3d scatter
fig.add_trace(go.Scatter3d(x=delay_coordinates2[0],
                            y=delay_coordinates2[1],
                            z=delay_coordinates2[2], 
                            mode = 'markers',  
                            marker=dict(color='blue', size=5, opacity=1, 
                                        line=dict(width=1, color='red'))),
                row=2, col=2)
fig.update_layout(template="none", scene_camera_eye=dict(x=1.5, y=1.5, z=1))
fig.update_layout( scene=dict(
    xaxis_title=u'x(t-\u03C4)',
    yaxis_title='x(t)',
    zaxis_title=u'x(t-2\u03C4)',),) 

# init time series 
fig.add_trace(go.Scatter(y=time_series,
                         x=[*range(len(time_series))],
                         mode='markers',
                         marker=dict(color='blue', size=5)), 
              row=1, col=1)
# 2d scatter
fig.add_trace(go.Scatter(x=delay_coordinates1[0],
                             y=delay_coordinates1[1],
                         mode='markers',
                         marker=dict(color='blue', size=5, 
                                     line=dict(width=0.1, color='red'))), 
              row=2, col=1)

fig.update_xaxes(title=u'x(t-\u03C4)', row=2, col=1, range=[-0.4,0.8])
fig.update_yaxes(title='x(t)', row=2, col=1,range=[-0.4,0.8])
fig.update_xaxes(title='t', row=1, col=1, range=[0,600])
fig.update_yaxes(title='x(t)', row=1, col=1)

    
fig.update_xaxes(showline=True, linewidth=1, 
                linecolor='black', 
                mirror=True,
                tickformat="%b\n%Y", 
                showgrid=False)
fig.update_yaxes(showline=True, linewidth=1, 
                linecolor='black', 
                mirror=True, 
                showgrid=False)

fig.update_layout({'plot_bgcolor': 'rgba(0,0,0,0)',
                'paper_bgcolor': 'rgba(0,0,0,0)'},                  
                font_color='black')

# fig.update_xaxes(zerolinecolor='red')
# # fig.update_xaxes(zerolinecolor='red', row=2, col=1)
# fig.update_yaxes(zerolinecolor='red', row=2, col=1)

fig.show()

# fig.write_image('images/taken_example.pdf')
