In [110]:
# ANALYSIS TASK 3
#  Is there any correlation in the time series data to see if the crypto trends reflect the stock trends in price and volume changes?

In [111]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
data_main = sqlContext.read.csv('data.csv',header=False)

In [112]:
# Adding an index column
dates = data_main.rdd.map(lambda x: x[0]).collect()
# Order of columns
# 'INDEX','DATE', 'C_PCP', 'C_TMVCP', 'N_PCP', 'N_TMVCP', 'SP_PCP', 'SP_TMVCP', 'NT_PCP', 'NT_TMVCP'
data_main = data_main.rdd.map(lambda x: (dates.index(x[0]),x[0],float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]),float(x[6]),float(x[7]),float(x[8])) )
data_main.toDF().show(5)

+---+----------+-------------------+------------------+-------------------+------------------+--------------------+-------------------+-------------------+-------------------+
| _1|        _2|                 _3|                _4|                 _5|                _6|                  _7|                 _8|                 _9|                _10|
+---+----------+-------------------+------------------+-------------------+------------------+--------------------+-------------------+-------------------+-------------------+
|  0|02-07-2018|  5.121125072620959| 9.193665598572519|-1.4197329157191518|               0.0|   -6.92142945367664|                0.0|-1.3828243249559322|                0.0|
|  1|03-07-2018|-1.7263876931070241|  3.88579704445608|-0.7503701677520215|-34.69465041770667|-0.38754218086889713|-31.618496889332604|-1.4233110883018818| -32.83890509077797|
|  2|05-07-2018|-0.7984669906309387| 5.925269446030441| 1.2241598859767095| 70.59531435527903|  0.9503111182226929|  58.

In [113]:
# Before covid
from datetime import datetime
def reformat_date(date_time_str):
    dateitem = datetime.strptime(date_time_str, '%d-%m-%Y')
    return dateitem

date = datetime.strptime('31-12-2019', '%d-%m-%Y')
data_bc = data_main.filter(lambda x: reformat_date(x[1])<date)
bc_correlations = []

In [114]:
# cross correlation - time lagged
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# price change percent
# crypto and nasdaq100
data = data_bc.map(lambda x: (x[0],x[1],x[4],x[2]))
# print(data.count())
data_df = data.toDF(('INDEX','DATE','N_PCP','C_PCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_PCP change trends on C_PCP change trends
# creating time lagged values for C_PCP for upto 30 days
columns = []
columns.append('C_PCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_PCP", i).over(window)
    columns.append("C_PCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_PCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(N_PCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
bc_correlations.append(cross_correlations)

In [115]:
# price change percent
# crypto and sp100
data = data_bc.map(lambda x: (x[0],x[1],x[6],x[2]))
data_df = data.toDF(('INDEX','DATE','SP_PCP','C_PCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of SP_PCP change trends on C_PCP change trends
# creating time lagged values for C_PCP for upto 30 days
columns = []
columns.append('C_PCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_PCP", i).over(window)
    columns.append("C_PCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_PCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(SP_PCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
bc_correlations.append(cross_correlations)

In [116]:
# price change percent
# crypto and Nasdaq100Tech
data = data_bc.map(lambda x: (x[0],x[1],x[8],x[2]))
data_df = data.toDF(('INDEX','DATE','NT_PCP','C_PCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of NT_PCP change trends on C_PCP change trends
# creating time lagged values for C_PCP for upto 30 days
columns = []
columns.append('C_PCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_PCP", i).over(window)
    columns.append("C_PCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_PCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(NT_PCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
bc_correlations.append(cross_correlations)

In [117]:
# tmv change percent
# crypto and nasdaq100
data = data_bc.map(lambda x: (x[0],x[1],x[5],x[3]))
data_df = data.toDF(('INDEX','DATE','N_TMVCP','C_TMVCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_TMVCP change trends on C_TMVCP change trends
# creating time lagged values for C_TMVCP for upto 30 days
columns = []
columns.append('C_TMVCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_TMVCP", i).over(window)
    columns.append("C_TMVCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_TMVCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(N_TMVCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
bc_correlations.append(cross_correlations)

In [118]:
# tmv change percent
# crypto and sp100
data = data_bc.map(lambda x: (x[0],x[1],x[7],x[3]))
data_df = data.toDF(('INDEX','DATE','SP_TMVCP','C_TMVCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_TMVCP change trends on C_TMVCP change trends
# creating time lagged values for C_TMVCP for upto 30 days
columns = []
columns.append('C_TMVCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_TMVCP", i).over(window)
    columns.append("C_TMVCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_TMVCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(SP_TMVCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
bc_correlations.append(cross_correlations)

In [119]:
# tmv change percent
# crypto and Nadaq100Tech
data = data_bc.map(lambda x: (x[0],x[1],x[9],x[3]))
data_df = data.toDF(('INDEX','DATE','NT_TMVCP','C_TMVCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_TMVCP change trends on C_TMVCP change trends
# creating time lagged values for C_TMVCP for upto 30 days
columns = []
columns.append('C_TMVCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_TMVCP", i).over(window)
    columns.append("C_TMVCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_TMVCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(NT_TMVCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
bc_correlations.append(cross_correlations)

In [120]:
# AFTER COVID
te = datetime.strptime('31-12-2019', '%d-%m-%Y')
data_ac = data_main.filter(lambda x: reformat_date(x[1])>=date)
ac_correlations = []

In [121]:
# cross correlation - time lagged
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# price change percent
# crypto and nasdaq100
data = data_ac.map(lambda x: (x[0],x[1],x[4],x[2]))
# print(data.count())
data_df = data.toDF(('INDEX','DATE','N_PCP','C_PCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_PCP change trends on C_PCP change trends
# creating time lagged values for C_PCP for upto 30 days
columns = []
columns.append('C_PCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_PCP", i).over(window)
    columns.append("C_PCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_PCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(N_PCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
ac_correlations.append(cross_correlations)

In [122]:
# price change percent
# crypto and sp100
data = data_ac.map(lambda x: (x[0],x[1],x[6],x[2]))
data_df = data.toDF(('INDEX','DATE','SP_PCP','C_PCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of SP_PCP change trends on C_PCP change trends
# creating time lagged values for C_PCP for upto 30 days
columns = []
columns.append('C_PCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_PCP", i).over(window)
    columns.append("C_PCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_PCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(SP_PCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
ac_correlations.append(cross_correlations)

In [123]:
# price change percent
# crypto and Nasdaq100Tech
data = data_ac.map(lambda x: (x[0],x[1],x[8],x[2]))
data_df = data.toDF(('INDEX','DATE','NT_PCP','C_PCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of NT_PCP change trends on C_PCP change trends
# creating time lagged values for C_PCP for upto 30 days
columns = []
columns.append('C_PCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_PCP", i).over(window)
    columns.append("C_PCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_PCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(NT_PCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
ac_correlations.append(cross_correlations)

In [124]:
# tmv change percent
# crypto and nasdaq100
data = data_ac.map(lambda x: (x[0],x[1],x[5],x[3]))
data_df = data.toDF(('INDEX','DATE','N_TMVCP','C_TMVCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_TMVCP change trends on C_TMVCP change trends
# creating time lagged values for C_TMVCP for upto 30 days
columns = []
columns.append('C_TMVCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_TMVCP", i).over(window)
    columns.append("C_TMVCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_TMVCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(N_TMVCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
ac_correlations.append(cross_correlations)

In [125]:
# tmv change percent
# crypto and sp100
data = data_ac.map(lambda x: (x[0],x[1],x[7],x[3]))
data_df = data.toDF(('INDEX','DATE','SP_TMVCP','C_TMVCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_TMVCP change trends on C_TMVCP change trends
# creating time lagged values for C_TMVCP for upto 30 days
columns = []
columns.append('C_TMVCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_TMVCP", i).over(window)
    columns.append("C_TMVCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_TMVCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(SP_TMVCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
ac_correlations.append(cross_correlations)

In [126]:
# tmv change percent
# crypto and Nadaq100Tech
data = data_ac.map(lambda x: (x[0],x[1],x[9],x[3]))
data_df = data.toDF(('INDEX','DATE','NT_TMVCP','C_TMVCP'))
data_df.registerTempTable('TABLE')
# we are trying to study the effect of N_TMVCP change trends on C_TMVCP change trends
# creating time lagged values for C_TMVCP for upto 30 days
columns = []
columns.append('C_TMVCP')
cross_correlations = []
data_dfx = data_df
for i in range(1,31):
    window = Window.orderBy("Index")
    leadCol = F.lead("C_TMVCP", i).over(window)
    columns.append("C_TMVCP+"+str(i))
    data_dfx = data_dfx.withColumn("C_TMVCP+"+str(i), leadCol)
    data_dfx.registerTempTable('DATA')
for column in columns:
    query = sqlContext.sql('SELECT CORR(NT_TMVCP,{0}) as CORRELATION FROM DATA'.format(column))
    cross_correlations.append(query.select('CORRELATION').collect()[0].CORRELATION)
#print(cross_correlations)
ac_correlations.append(cross_correlations)

In [None]:
# Averaging out the cross correlations to have an overall view

In [127]:
for x in bc_correlations:
    print(sum(x)/len(x))

0.10301176301910034
0.03886703418526572
0.10811318930723292
-0.054234646011012806
-0.03607717065023008
-0.07585510348936104


In [None]:
# Cross correlation very low for the cases before covid 19 
# Very close to 0 in most cases

In [128]:
for x in ac_correlations:
    print(sum(x)/len(x))

0.32496428480914186
0.30090143539740305
0.32127257026528894
0.04416860015625667
0.07765885806094824
0.020390698683324285


In [None]:
# Significant rise in cross correlation noted after covid
# So, yes, the drop and rise after covid in stock and crypto markets have had a mild correlation