In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
import sys
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession, functions, types, Window
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, coint
#assert sys.version_info >= (3, 5) # make sure we have Python 3.5+
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [3]:
#remove null/nan or empty cell 
def to_null(c):
    return functions.when(~(functions.col(c).isNull() | functions.isnan(functions.col(c)) | (functions.trim(functions.col(c)) == "")), functions.col(c))


In [4]:
#clean inpou data and pivot table 
def clean(minutedt):
    minutedt=minutedt.select([to_null(c).alias(c) for c in minutedt.columns]).na.drop()

    minutedt.createOrReplaceTempView('minutedt')

    notnull = spark.sql(""" SELECT DateTime, SUBSTRING_INDEX(windcode, '.',1) AS symbol,close FROM minutedt """)
    notnull.createOrReplaceTempView('notnull')
    
    pivoted = spark.sql("""SELECT * FROM 
                    ( 
                    SELECT DateTime, symbol, close
                    FROM notnull
                    ) 
                    PIVOT 
                    (
                    SUM(close)
                    FOR symbol in ("000008" AS a, "000009" AS b, '000010' AS c, '000011' AS d, '000012' AS e, "000014" AS f, "000016" AS g, '000017' AS h, '000020' AS i, '000021' AS j)                    
                    ) 
                    ORDER BY DateTime """).cache()
    pivoted.createOrReplaceTempView('pivoted')

    dt = spark.sql("""SELECT DateTime, a,b,c,d,e,f,g,h ,i,j FROM pivoted """)
    dt=dt.select([to_null(c).alias(c) for c in dt.columns]).na.drop()
    dt.createOrReplaceTempView('dt')
    
    return dt

In [5]:
#find all cointgration stock pair under threshodl, pvalue <0.02
def find_coint(regdt):
    pvalue ={}
    count =0
    for i in regdt.columns:
        count +=1
        for j in regdt.columns[count:]:
            if i != j:
    #calculate slope, intercept using 2 year(2018-2019) window between two underlyers to determine their relationship
                logdt = spark.sql(""" SELECT log({0}) AS y,log({1}) AS x FROM regdt""".format(i,j))
                logdt.createOrReplaceTempView('logdt')
                vectorAssembler = VectorAssembler(inputCols=['x'], outputCol="features")

                lr = LinearRegression(featuresCol='features', labelCol= 'y')

                stages = [vectorAssembler, lr]  

                pipeline = Pipeline(stages=stages)
                model = pipeline.fit(logdt)

                slope = model.stages[-1].coefficients[0]
                intercept = model.stages[-1].intercept

                spddt = spark.sql(""" SELECT (y - {0}* x - {1} )AS spread FROM logdt""".format(slope,intercept))
                spddt.createOrReplaceTempView('spddt')
                spddt = spddt.toPandas()
                adfSpread = adfuller(spddt['spread'] , autolag ='AIC')
                adf_pvalue = adfSpread[1]
                print(i,j,'slope: ',slope,'intercept: ', intercept,'pvalue: ', adf_pvalue)
                pvalue[(i,j)] = (adf_pvalue, intercept, slope)
    
    #find pairs with pvalue under threshold 0.005           
    min_pvalue = 0.005
    pv =0
    slope =0
    intercept = 0
    pair1 = 0
    pair2 = 0
    result=[]
    for k,v in pvalue.items():
        if v[0] < min_pvalue:
            pv= v[0]
            slope = v[2]
            intercept = v[1]
            pair1 = k[0]
            pair2 = k[1]      
            result.append([pair1,pair2, pv, slope, intercept])   
    
    return result

In [6]:
def training(coint_pair):
    #get all valid pairs and trade each pair
    for i in range(len(coint_pair)):
        pair1 = coint_pair[i][0]
        pair2 = coint_pair[i][1]
        slope = coint_pair[i][3]
        intercept = coint_pair[i][4]
        
        #calculate spread of the train window for each valid pair
        dt2 = spark.sql(""" SELECT DateTime, {0}, {1}, (log({0}) - log({1}) * {2} - {3}) AS spread FROM train """.format(pair1,pair2,slope,intercept))
        dt2.createOrReplaceTempView('dt2')
        mu = dt2.agg(functions.avg(dt2['spread'])).collect()[0][0]
        std = dt2.agg(functions.stddev(dt2['spread'])).collect()[0][0]
        #set trading signal boundries if the spread is 2 times standard deviation from spread mean
        up = mu +std*1.8
        down = mu - std*1.8
        #set stop_loss point
        upsl = mu +std*2.8
        downsl = mu - std*2.8
        #set take profit point
        uptp = mu +std*0.2
        downtp = mu - std*0.2
        
        #simulate trade
        test1 = spark.sql(""" SELECT DateTime, {0}, {1}, (log({0}) - log({1}) * {2} - {3}) AS spread FROM test """.format(pair1,pair2,slope,intercept))
        test1.createOrReplaceTempView('test1')
        
        test1.toPandas()
        
        test3 = trade(test1,pair1,pair2, up, down, upsl,downsl,uptp,downtp, mu, slope)
        #save trading history/result to csv
        test3.to_csv('retcsv_{}_{}.csv'.format(pair1,pair2), index = True) 
    
    return 0
    

In [7]:
#trade simulation
def trade(test1,pair1,pair2, up, down, upsl,downsl,uptp,downtp, mu ,slope):
#use up/down boundries to determine trade position in test dataset (2020)
    test1 = test1.withColumn("up", functions.lit(up) )
    test1 = test1.withColumn("down",functions.lit( down) )
    test1 = test1.withColumn("upsl",functions.lit( upsl) )
    test1 = test1.withColumn("downsl", functions.lit(downsl) )
    test1 = test1.withColumn("uptp", functions.lit(uptp) )
    test1 = test1.withColumn("downtp", functions.lit(downtp) )
    test1 = test1.withColumn("mu",functions.lit( mu) )
    test1.createOrReplaceTempView('test1')
    
    #set trade positions based on signal
    test1 = spark.sql(""" SELECT DateTime, {0}, {1}, spread,
                CASE 
                    WHEN spread >=upsl THEN 3
                    WHEN spread >=up AND spread <upsl THEN 2
                    WHEN spread >=uptp AND spread <up THEN 1
                    WHEN spread < downsl THEN -3
                    WHEN spread < down AND spread >=downsl THEN -2
                    WHEN spread < downtp AND spread >=down THEN -1
                ELSE 0
                END AS level FROM test1 """ .format(pair1,pair2))

    test1.createOrReplaceTempView('test1')

    test1 = test1.withColumn('position',functions.lit(0) )
    test1 = test1.withColumn('signal',functions.lit(0) )

    test1 = test1.toPandas()
    
    #set trade signal
    for i in range(1, len(test1)):
        if test1['level'][i-1] ==1 and test1['level'][i] ==2:      #open position -> short I, long J
            test1.loc[i,'signal'] = -2
        elif test1['level'][i-1] ==1 and test1['level'][i] ==0:      #take profit -I+J -> long I, short J
            test1.loc[i,'signal'] = 2 
        elif test1['level'][i-1] ==2 and test1['level'][i] ==3:      #stop loss -I+J -> long I, short J
            test1.loc[i,'signal'] = 3 
        elif test1['level'][i-1] == -1 and test1['level'][i] == -2:      #open position -> long I, short J
            test1.loc[i,'signal'] = 1 
        elif test1['level'][i-1] == -1 and test1['level'][i] ==0:      #take profit +I-J -> short I, long J
            test1.loc[i,'signal'] = -1
        elif test1['level'][i-1] ==-2 and test1['level'][i] ==-3:      #stop loss +I-J -> short I, long J
            test1.loc[i,'signal'] = -3 
            
     #set trade position 
    for i in range(1,len(test1)):
        test1.loc[i,'position'] = test1['position'][i-1]
        if test1['signal'][i] == 1:
            test1.loc[i,'position'] = 1
        elif test1['signal'][i] == -2:
            test1.loc[i,'position'] = -1
        elif test1['signal'][i] == -1 and test1['position'][i-1] == 1:
            test1.loc[i,'position'] = 0
        elif test1['signal'][i] == 2 and test1['position'][i-1] == -1:
            test1.loc[i,'position'] = 0
        elif test1['signal'][i] == 3:
            test1.loc[i,'position'] = 0
        elif test1['signal'][i] == -3:
            test1.loc[i,'position'] = 0    
           
    #initialize portfolio
    size=1000
    test1['pair1_share'] = test1['position']*size
    test1['pair2_share'] = round( - test1['pair1_share'] *slope *  test1[pair1]/ test1[pair2] )
    test1['cash'] = 5000
    
    test3 = test1
    for i in range(1,len(test3)):
        test3.loc[i,'pair2_share'] = test3['pair2_share'][i-1]
        test3.loc[i,'cash'] = test3['cash'][i-1] 
        if test3['position'][i-1] == 0 and test3['position'][i]  ==1:
            test3.loc[i,'pair2_share'] = round( - test3['pair1_share'][i] *slope *  test3[pair1][i]/ test3[pair2][i] )
            test3.loc[i, 'cash'] =  test3['cash'][i-1] - (test3['pair1_share'][i] *  test3[pair1][i] + test3['pair2_share'][i] *  test3[pair2][i] )
        elif test3['position'][i-1] == 0 and test3['position'][i]  == -1:
            test3.loc[i,'pair2_share'] = round( - test3['pair1_share'][i] *slope *  test3[pair1][i]/ test3[pair2][i] )
            test3.loc[i, 'cash'] =  test3['cash'][i-1] - (test3['pair1_share'][i] *  test3[pair1][i] + test3['pair2_share'][i] *  test3[pair2][i] )
        elif test3['position'][i-1] == 1 and test3['position'][i]  ==0:
            test3.loc[i,'pair2_share'] = 0
            test3.loc[i,'cash'] =  test3['cash'][i-1] + (test3['pair1_share'][i-1] *  test3[pair1][i] + test3['pair2_share'][i-1] *  test3[pair2][i] )
        elif test3['position'][i-1] == -1 and test3['position'][i]  == 0:
            test3.loc[i,'pair2_share'] = 0
            test3.loc[i,'cash'] =  test3['cash'][i-1] + (test3['pair1_share'][i-1] *  test3[pair1][i] + test3['pair2_share'][i-1] *  test3[pair2][i] )

    test3['asset'] = test3['cash']+ test3['pair1_share']*test3[pair1] +test3['pair2_share']*test3[pair2] 
    
    return test3
    
    

In [8]:
#import spark.sqlContext.implicits._
def main():
    # main logic starts here
    inputs =  '15stock.csv'
    minute_schema = types.StructType([
        types.StructField('DateTime', types.StringType()),
        types.StructField('windcode', types.StringType()),   
        types.StructField('close', types.DoubleType()),

    ])

    minutedt = spark.read.csv(inputs, schema=minute_schema)
    dt = clean(minutedt)
    #training dataset
    train = spark.sql(""" SELECT * FROM dt WHERE DateTime < '2019-10-01 08:00:00' """).cache()
    train.createOrReplaceTempView('train')
    #test dataset
    test = spark.sql(""" SELECT * FROM dt WHERE DateTime > '2019-10-01 08:00:00'""").cache()
    test.createOrReplaceTempView('test')
    #select all time series other than time
    #regdt = spark.sql(""" SELECT j,i,h,g,f,e,d,c,b,a FROM train""").cache()
    regdt = spark.sql(""" SELECT j,i,h,g,f,e,d,c,b,a FROM train""").cache()
    regdt.createOrReplaceTempView('regdt')
    
    coint_pair = find_coint(regdt)
    for i in coint_pair:
        training(coint_pair)
        
    return 0        

In [9]:
if __name__ == '__main__':
    spark = SparkSession.builder.appName('example code').getOrCreate()
    #assert spark.version >= '2.4' # make sure we have Spark 2.4+
    spark.sparkContext.setLogLevel('WARN')
    sc = spark.sparkContext
    main()

j i slope:  0.6497526517518525 intercept:  0.4368152439272683 pvalue:  0.6913730698746592
j h slope:  1.0339348713179866 intercept:  0.4178605375480168 pvalue:  0.6400402787669912
j g slope:  0.6259066028269915 intercept:  1.1022599099641026 pvalue:  0.6622326986950244
j f slope:  0.7303258761524786 intercept:  0.331430279851079 pvalue:  0.8081818173753167
j e slope:  0.34791444989088577 intercept:  1.4809087623362143 pvalue:  0.7420195558000938
j d slope:  0.4458952108076766 intercept:  0.9701209677086502 pvalue:  0.7873277415581783
j c slope:  -0.15915861766796438 intercept:  2.2724734223101097 pvalue:  0.6061277000298217
j b slope:  0.7954815861680784 intercept:  0.7278908758374565 pvalue:  0.8972235131876087
j a slope:  0.23001473728679137 intercept:  1.7041211065975062 pvalue:  0.6970746998635708
i h slope:  0.7048537365501637 intercept:  1.3752956730648742 pvalue:  0.10438411402655601
i g slope:  0.47429598782812754 intercept:  1.7693236945857425 pvalue:  0.008066218279761683
i f