In [1]:
import statsmodels.tsa.stattools as ts
import pandas as pd
import statistics
from pandas_datareader import data as pdr
import time
import statsmodels.api as sm
import statistics as stat
from graphframes import *
from pyspark.sql.types import *

#author Mirco A. Mannucci
#author Deborah Tylor


#SP 500 ingestion

data = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
table = data[0]

tickers =table[0].tolist()
tickers = tickers[1:]
#comment out line below to exclude SP 500 Index ticker
tickers.append('^GSPC')

start = '2013-01-01'
end = '2013-12-31'


s=time.time()
d = {}
for ticker in tickers:   
  try:
    d[ticker] = pdr.DataReader(ticker, "yahoo", start=start, end=end)
  except:
    print("An exception occurred downloading ticker " + ticker)
e=time.time()
print('downloaded ticker data in time: ' + str(e-s))

pan = pd.Panel(d)
df = pan.minor_xs("Close")


#last prices 
df_lastprice =df.tail(1)
df.drop(df.tail(1).index,inplace=True)
lst_nodes=[]
for colx in df_lastprice.columns:
    lst_nodes.append([colx, df_lastprice[colx].values[0]])
    
df_nodes = pd.DataFrame(lst_nodes)
df_nodes.columns = ['id', 'lastprice']
df.drop(df.tail(1).index,inplace=True)


def cointegration_test(y, X):    
    X = sm.add_constant(X) # adding a constant
    
    # Step 1: regress one variable on the other 
    ols_result = sm.OLS(y, X).fit() 

    # extract beta_0, beta_1, mean of residual,
    #and standard deviation    
    beta0 = ols_result.params[0]    
    beta1 = ols_result.params[1]
    mean = stat.median( ols_result.resid)
    stdev =stat.stdev( ols_result.resid)
    
    # Step 2: obtain the residual (ols_resuld.resid)
    residuals = ols_result.resid 
    # Step 3: apply Augmented Dickey-Fuller test to see whether 
    # the residual is stationary  
    adf_result = ts.adfuller(residuals)    
    pvalue = adf_result[1]    
    return [beta0, beta1, mean, stdev,  pvalue]


#calculate cointegration to build edges 
#saves entire model and pvalue

res = {}
s=time.time()
for colx in df.columns:    
    for coly in df.columns:
        try:            
            X =df[colx].values
            Y =df[coly].values
            res[colx, coly] = cointegration_test(Y, X)
        except:
            print("An exception occurred calculating coint for " + colx + " " + coly)           
e=time.time()
print("results of cointegration in time: " + str(e-s))

lst_res=[]
for key in res:
    #make undirected edge list
    lst_res.append([key[0],
                    key[1],
                    res[key][0],
                    res[key][1], 
                    res[key][2], 
                    res[key][3], 
                    res[key][4]  
                    ])
df_coint = pd.DataFrame(lst_res)
df_coint.columns = ['src',
                    'dst', 
                    'beta0', 
                    'beta1', 
                    'mean', 
                    'stdev', 
                    'pvalue']
df_coint.head()

#eliminates all edges whose pvalus is not NAN and above threshold 0.05
df_coint = df_coint[df_coint.pvalue.notnull()]
df_coint = df_coint[df_coint.pvalue > 0.05]

#conversion from Panda dataframes to Spark datatrames 

myNodesSchema = StructType([ StructField("id", StringType(), True)\
                            ,StructField("lastprice", FloatType(), True)])
nodes_df = spark.createDataFrame(df_nodes,schema=myNodesSchema)

myEdgesSchema = StructType([ StructField("src", StringType(), True)\
                            ,StructField("dst", StringType(), True)\
                            ,StructField("beta0",  FloatType(), True)\
                            ,StructField("beta1",  FloatType(), True)\
                            ,StructField("mean",   FloatType(), True)\
                            ,StructField("stdev",  FloatType(), True)\
                            ,StructField("pvalue", FloatType(), True)])

edges_df = spark.createDataFrame(df_coint,schema=myEdgesSchema)
cointgraph = GraphFrame(nodes_df,  edges_df)


#last step: PREGEL calculation of alerts (using aggregateMessage() function from GraphFrames)

from pyspark.sql.functions  import *
from graphframes.lib import AggregateMessages as AM

msgToDst = (  (AM.dst["lastprice"] - (AM.edge["beta1"] * AM.src["lastprice"] + AM.edge["beta0"])) -AM.edge["mean"])/AM.edge["stdev"]
agg= cointgraph.aggregateMessages(
  max(AM.msg).alias("alertStatus"),
    sendToDst= msgToDst
)

agg.show()




