In [1]:
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, grangercausalitytests, coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen

In [2]:
def adf_test(timeseries):    
    test = adfuller(timeseries, autolag='AIC')
    result = pd.Series(test[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in test[4].items() :
        result['Critical Value (%s)'%key] = value
    return result

In [3]:
def is_nonstationary(result):
    if abs(result['Test Statistic']) > abs(result['Critical Value (5%)']):
        return False
    else:
        return True

In [4]:
def print_adf_result(result):
    print ('Augmented Dickey-Fuller Unit Root\nNull Hypothesis : Time Series in Non-Stationary')
    print (result[['#Lags Used','Test Statistic','p-value','Critical Value (5%)']])
    print("At 5% significance level")
    if abs(result['Test Statistic']) > abs(result['Critical Value (5%)']):
        print ("Reject Null Hypothesis - Time Series is Stationary")
    else:
        print ("Failed to Reject Null Hypothesis - Time Series is Non-Stationary")

In [5]:
def print_coint_result(result):
    print("Engel Granger Cointegration\nNull Hypothesis : No Cointegration")
    print("t-Statistic :\t\t", result[0])
    print("p-value :\t\t", result[1])
    print("Critical Value 1% :\t", result[2][0])
    print("Critical Value 5% :\t", result[2][1])
    print("Critical Value 10% :\t", result[2][2])
    print("At 5% significance level")
    if(result[1]<=0.05):
        print("Reject Null Hypothesis - Cointegration is present")
    else:
        print("Failed to reject Null Hypothesis - No Cointegration")

In [6]:
def make_stationary(timeseries):
    diff_timeseries = timeseries.copy(deep=True)
    while(is_nonstationary(adf_test(diff_timeseries))):
        diff_timeseries = diff_timeseries.diff().dropna()
        
    return diff_timeseries

In [7]:
def print_granger_result(result):
    pvalues = []
    for i in result:
        pvalues.append(result[i][0]['params_ftest'][1])
    best_lag = pvalues.index(min(pvalues))+1
    print("Granger Causality")
    print("Best number of Lags :\t", best_lag)
    print("F-statistic :\t", result[best_lag][0]['params_ftest'][0])
    print("p-value :\t", result[best_lag][0]['params_ftest'][1])
    print("At 5% significance level")
    if(result[best_lag][0]['params_ftest'][1] < 0.05):
        print("Causality is significant")
    else:
        print("Causality is insignificant")

In [8]:
def run_country(cntry):
    cntrycode = {'ind':'INDIA','pak':'PAKISTAN','sri':'SRI LANKA','saf':'SOUTH AFRICA','bra':'BRAZIL'}
    print(cntrycode[cntry],'\n')
    
    datadf = pd.read_csv(cntry + '.csv', index_col="Series Name")
    df = datadf[["M2-LCU","Government-expenditure-US$","GDP-US$"]].rename(columns={"M2-LCU":"M2","Government-expenditure-US$":"GE","GDP-US$":"GDP"})
    df = df.dropna().apply(np.log)
    
    print("Broad Money (current US$) (M2)")
    print_adf_result(adf_test(df['M2']))
    print('\n')
    print("Government Expenditure (current US$) (GE)")
    print_adf_result(adf_test(df['GE']))
    print('\n')
    print("Gross Domestic Product (current US$) (GDP)")
    print_adf_result(adf_test(df['GDP']))
    print('\n')
    
    df['M2_s'] = make_stationary(df['M2'])
    df['GE_s'] = make_stationary(df['GE'])
    df['GDP_s'] = make_stationary(df['GDP'])
    
    print("First Difference - Broad Money (current US$)")
    print_adf_result(adf_test(df['M2_s'].dropna()))
    print('\n')
    print("First Difference - Government Expenditure (current US$)")
    print_adf_result(adf_test(df['GE_s'].dropna()))
    print('\n')
    print("First Difference - Gross Domestic Product (current US$)")
    print_adf_result(adf_test(df['GDP_s'].dropna()))
    print('\n')
    
    print("Cointegration GDP-M2")
    coint_M2 = coint(df['GDP'], df['M2'], trend='c', maxlag=15)
    print_coint_result(coint_M2)
    print("\nCointegration GDP-GE")
    coint_GE = coint(df['GDP'], df['GE'], trend='c', maxlag=15)
    print_coint_result(coint_GE)
    
    print("\nCausality from M2 to GDP")
    granger_M2GDP = grangercausalitytests(df[['GDP_s','M2_s']].dropna(), maxlag=15, verbose=False)
    print_granger_result(granger_M2GDP)
    print("\nCausality from GDP to M2")
    granger_GDPM2 = grangercausalitytests(df[['M2_s','GDP_s']].dropna(), maxlag=15, verbose=False)
    print_granger_result(granger_GDPM2)
    print("\nCausality from GE to GDP")
    granger_GEGDP = grangercausalitytests(df[['GDP_s','GE_s']].dropna(), maxlag=15, verbose=False)
    print_granger_result(granger_GEGDP)
    print("\nCausality from GDP to GE")
    granger_GDPGE = grangercausalitytests(df[['GE_s','GDP_s']].dropna(), maxlag=15, verbose=False)
    print_granger_result(granger_GDPGE)
    print('\n\n\n')

In [None]:
sys.stdout=open("results.txt","w")

countries = ['ind','pak','sri','saf','bra']
for cntry in countries:
    run_country(cntry)

sys.stdout.close()