In [None]:
# Inputs
exp = 'exp6'
islet = 'exp6-i13'
file = islet + '.xlsx'
filepath = 'C:\\Users\\fiona\\Desktop\\School\\Nanyang Technological University\\Dissertation\\Data\\Pauline\\Data Only\\' + exp + '\\' + islet + '\\'
filename = filepath + file
coordfile = filepath + 'coords ' + file
newfilename = filepath + 'MVAR_' + file

# Import modules
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import openpyxl
from openpyxl import load_workbook

# Import Statsmodels
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.stattools import adfuller
from statsmodels.tools.eval_measures import rmse, aic

# Import data
df = pd.read_excel(filename)
coordata = pd.read_excel(coordfile)

df.to_excel(newfilename, sheet_name=file, index = False)
df

In [None]:
# Plot individual fluorescence over time
plt.plot(df)
plt.xlabel('Time')
plt.ylabel('Fluorescence')
plt.savefig('calcium traces.png', dpi=150)

wb = openpyxl.load_workbook(newfilename)
ws = wb.active    
img = openpyxl.drawing.image.Image('calcium traces.png')
ws.add_image(img, 'B3')
wb.save(newfilename)

plt.show()

In [None]:
# Testing causation

maxlag=30
test = 'ssr_chi2test'
def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False):    
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

book = load_workbook(newfilename)
writer = pd.ExcelWriter(newfilename, engine = 'openpyxl')
writer.book = book

df2 = grangers_causation_matrix(df, variables = df.columns)

def nonsig_background(cell_value):

    highlight = 'background-color: yellow;'
    default = ''

    if cell_value > 0.05:
        return highlight
    return default

df2 = df2.style.applymap(nonsig_background)

df2.to_excel(writer, sheet_name='Granger Causation Matrix')
writer.save()
writer.close()
df2

In [None]:
# Testing cointegration using Johansen test

def cointegration_test(df, alpha=0.05):
    out = coint_johansen(df,-1,5)
    d = {'0.90':0, '0.95':1, '0.99':2}
    traces = out.lr1
    cvts = out.cvt[:, d[str(1-alpha)]]
    def adjust(val, length= 6): return str(val).ljust(length)

    # Summary
    print('Name   ::  Test Stat > C(95%)    =>   Signif  \n', '--'*20)
    for col, trace, cvt in zip(df.columns, traces, cvts):
        print(adjust(col), ':: ', adjust(round(trace,2), 9), ">", adjust(cvt, 8), ' =>  ' , trace > cvt)

cointegration_test(df)

In [None]:
nobs = 4
df_train, df_test = df[0:-nobs], df[-nobs:]

# Check size
print(df_train.shape)  # (119, 8)
print(df_test.shape)  # (4, 8)

In [None]:
# Testing cointegration using ADF

def adfuller_test(series, signif=0.05, name='', verbose=False):
    r = adfuller(series, autolag='AIC')
    output = {'test_statistic':round(r[0], 4), 'pvalue':round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]}
    p_value = output['pvalue'] 
    def adjust(val, length= 6): return str(val).ljust(length)

    # Print Summary
    print(f'    Augmented Dickey-Fuller Test on "{name}"', "\n   ", '-'*47)
    print(f' Null Hypothesis: Data has unit root. Non-Stationary.')
    print(f' Significance Level    = {signif}')
    print(f' Test Statistic        = {output["test_statistic"]}')
    print(f' No. Lags Chosen       = {output["n_lags"]}')

    for key,val in r[4].items():
        print(f' Critical value {adjust(key)} = {round(val, 3)}')

    if p_value <= signif:
        print(f" => P-Value = {p_value}. Rejecting Null Hypothesis.")
        print(f" => Series is Stationary.")
    else:
        print(f" => P-Value = {p_value}. Weak evidence to reject the Null Hypothesis.")
        print(f" => Series is Non-Stationary.")
        
# ADF Test on each column
for name, column in df_train.iteritems():
    adfuller_test(column, name=column.name)
    print('\n')

In [None]:
# Testing cointegration using ADF (1st difference)

df_differenced = df_train.diff().dropna()

# ADF Test on each column of 1st Differences Dataframe
for name, column in df_differenced.iteritems():
    adfuller_test(column, name=column.name)
    print('\n')

In [None]:
model = VAR(df_differenced)
for i in range (1,101):
    result = model.fit(i)
    print('Lag Order =', i)
    print('AIC : ', result.aic)
    print('BIC : ', result.bic)
    print('FPE : ', result.fpe)
    print('HQIC: ', result.hqic, '\n')

In [None]:
x = model.select_order(maxlags=20)
x.summary()

In [None]:
model_fitted = model.fit(30)
model_fitted.summary()

In [None]:
from statsmodels.stats.stattools import durbin_watson
out = durbin_watson(model_fitted.resid)

for col, val in zip(df.columns, out):
    print(adjust(col), ':', round(val, 2))