In [125]:
#Grab data from google finance api
#Daily stock data will be grab for all tickers in tickerFile from today until lastDay
#and written to riak bucket 'stocks'

today = datetime(datetime.now().year, datetime.now().month, datetime.now().day)#todays year,month,day
lastDay = datetime(2000,1,1)#last day to download stock data from
tickerFile = 'NYSE.txt'#file contains all ticker,name pairs on NYSE
dataSource = 'google'#download from 'google' or 'yahoo

In [126]:
stocks = pd.read_csv(tickerFile,sep='\t',header=None)#read in stock ticker,name pairs

In [127]:
tickers = list(stocks[0])#extract tickers

In [130]:
#These operations will populate riak with data
t0 = time.time()
dataGet = sc.parallelize(tickers[0:100]).map(lambda x: getDataByTicker(x,dataSource,lastDay,today)).collect()#get data and write to riak for each ticker
t1 = time.time()
total = t1-t0
total

133.64578199386597

In [131]:
dataGet

[3893,
 3893,
 5,
 183,
 2787,
 3415,
 1118,
 2400,
 3893,
 3575,
 639,
 3893,
 407,
 3344,
 3893,
 2823,
 601,
 536,
 336,
 0,
 3893,
 3893,
 2736,
 2483,
 3893,
 3893,
 3407,
 3893,
 2046,
 3504,
 1109,
 795,
 0,
 8,
 23,
 1190,
 3893,
 3893,
 252,
 3532,
 696,
 3893,
 2391,
 3893,
 2391,
 3893,
 3893,
 2524,
 0,
 2908,
 3893,
 3893,
 3893,
 2161,
 3893,
 3893,
 3893,
 3893,
 4,
 0,
 3375,
 2063,
 3893,
 0,
 3893,
 0,
 1191,
 0,
 0,
 0,
 0,
 1090,
 0,
 3485,
 2035,
 3121,
 2244,
 2932,
 3893,
 605,
 4,
 220,
 3794,
 2812,
 1317,
 1317,
 1317,
 1108,
 3893,
 3892,
 1869,
 537,
 2908,
 2162,
 805,
 538,
 410,
 3426,
 2978,
 2693]

In [120]:
minVol = 20000#minimum volatilty to filter on
minDays = 2000#minimum amount of data points needed
zThresh = 2
beginDay = 0
ndays = 100
critLevel = '5%' #can be '1%', '5%', or '10%'
delKeys = deleteAllKeys('tradeEntries')
#Gather the data into rdd and transform so that pairAnalysis can be run on each pair of stocks

#Spark 
#1:For each ticker we grab the data from riak using riakGetStock
#2:Filter out ticker,data pairs that have less than minDays worth of data
#3:Filter out all ticker,data pairs that have a mean volatility less than minVol
#4:Sort each tickers,data pairs data by date with most recent data at the beginning of the array using mySort
#5:Cut all ticker,data pairs data to be of length minDays using myFilter and cache the rdd in memory

t0 = time.time()#time begin
d = sc.parallelize(tickers[0:100]).map(lambda x: (x, riakGetStock(x)))\
    .filter(lambda x: len(x[1]) > minDays)\
    .filter(lambda x: numpy.mean([i[1] for i in x[1]]) > minVol)\
    .map(lambda x: (x[0],mySort(x[1])))\
    .map(lambda x: (x[0],myFilter(x[1],minDays))).cache()

#Analyze all stock pairs and return the results    
    
#Spark    
#from the cahced rdd d create a cartesian product of all possible ticker pairs
#then for each ticker pair run pairAnalysis which returns either a number or a list of values
#collect the rdd

pairs = d.cartesian(d)\
    .map(lambda x: pairAnalysis(x,ndays,beginDay,zThresh))\
    .filter(lambda x: type(x) is list)\
    .map(lambda x: writeSinglePair(x))\
    .cache().collect()
t1 = time.time()#time end

total = t1-t0
total

Successful delete: AGCO_AGM
Successful delete: AET_AGI
Successful delete: AET_ACH
Successful delete: AAP_AEM
Successful delete: AAP_AB
Successful delete: AF_AFB
Successful delete: AAP_AEO
Successful delete: AGCO_AGI
Successful delete: ABG_AGI
Successful delete: AAP_ABC
Successful delete: AHS_AGC
Successful delete: AET_AGC


76.94734597206116

In [123]:
pairs

[['AAP',
  'AB',
  '2015-06-23',
  163.89,
  30.0,
  2.0325065800785458,
  -0.92833169871411769,
  -1.179614628199488e-13,
  5.6997511318368153],
 ['AAP',
  'ABC',
  '2015-06-23',
  163.89,
  111.81,
  2.1268201231154578,
  -0.37135467337889794,
  6.1936589190736407e-14,
  5.7942745656899959],
 ['AAP',
  'AEM',
  '2015-06-23',
  163.89,
  30.1,
  2.1993243517763457,
  0.9659809460300588,
  1.1250449460931123e-13,
  5.5689604525184668],
 ['AAP',
  'AEO',
  '2015-06-23',
  163.89,
  17.77,
  2.449653665451744,
  -1.5470843605914886,
  3.8962753023952246e-13,
  5.5651553786213874],
 ['ABG',
  'AGI',
  '2015-06-23',
  92.14,
  5.84,
  2.2281646250866216,
  4.5407296405716675,
  1.2551026884466409e-14,
  4.8757504685847444],
 ['AET',
  'ACH',
  '2015-06-23',
  128.2,
  13.16,
  2.3481449968304062,
  2.1464193559242299,
  -2.4738255888223647e-14,
  9.2004750163219349],
 ['AF',
  'AFB',
  '2015-06-23',
  14.02,
  13.42,
  2.1323444361678949,
  -0.65921620714302764,
  2.9956481739645822e-15,
 

In [122]:
getAllKV('tradeEntries')

{'AAP_AB': '{"CloseB": 30.0, "ZScore": 2.0325065800785458, "CloseA": 163.89, "SignalMean": -1.179614628199488e-13, "SignalSD": 5.6997511318368153, "Date": "2015-06-23", "StockA": "AAP", "StockB": "AB", "Beta": -0.92833169871411769}',
 'AAP_ABC': '{"CloseB": 111.81, "ZScore": 2.1268201231154578, "CloseA": 163.89, "SignalMean": 6.1936589190736407e-14, "SignalSD": 5.7942745656899959, "Date": "2015-06-23", "StockA": "AAP", "StockB": "ABC", "Beta": -0.37135467337889794}',
 'AAP_AEM': '{"CloseB": 30.1, "ZScore": 2.1993243517763457, "CloseA": 163.89, "SignalMean": 1.1250449460931123e-13, "SignalSD": 5.5689604525184668, "Date": "2015-06-23", "StockA": "AAP", "StockB": "AEM", "Beta": 0.9659809460300588}',
 'AAP_AEO': '{"CloseB": 17.77, "ZScore": 2.449653665451744, "CloseA": 163.89, "SignalMean": 3.8962753023952246e-13, "SignalSD": 5.5651553786213874, "Date": "2015-06-23", "StockA": "AAP", "StockB": "AEO", "Beta": -1.5470843605914886}',
 'ABG_AGI': '{"CloseB": 5.84, "ZScore": 2.2281646250866216,

In [124]:
import sys
import numpy
import time
from operator import add
from pyspark import SparkContext
import pandas as pd
import datetime
import json
import riak
import urllib2
import pytz
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime
from pandas.io.data import DataReader
import riak
from riak import RiakClient, RiakNode, RiakObject
import numpy as np
import statsmodels.api as stat
import statsmodels.tsa.stattools as ts

#start is furthest day back and end is most recent day.  Grabs all data in between and stores in riak as json
#runs through a file of ticker values
def getData(tickerFile, dataSource, start, end):

    rc = RiakClient(pb_port=8087, protocol='pbc')#set up riak connection
    added = []#list of successful adds
    notAdded = []#list of unsuccessful adds
    stock = pd.read_csv(tickerFile,sep='\t',header=None)#read in stock tickers

    #loop over all stock tickers
    for i in range(0,len(stock.head(100))):
        
        ticker = stock.ix[i,0]
        if getDataByTicker(ticker,dataSource,start,end) == 0:
            notAdded.append(ticker)
        else:
            added.append(ticker)
    return added, notAdded

#start is furthest day back and end is closest to today, Store single stock data in riak
#only grabs one stock
def getDataByTicker(ticker, dataSource, start, end):

    rc = RiakClient(pb_port=8087, protocol='pbc')
    #get daily data for each ticker
    gtemp = pd.DataFrame()
    bucket = rc.bucket('stocks')
    try:
        gtemp = DataReader(ticker,  dataSource, start, end)
    except:
        pass
        
        #didnt get any data
    if len(gtemp) == 0:
        return 0
    #got data
    else:
        
        for j in range(0,len(gtemp.index)):
            
            #upload json to Riak Bucket
            date = gtemp.index[j].date()
            riakKey = str(ticker + '_' + str(date))
            riakVal = {'OPEN': gtemp.values[j,0],\
                        'HIGH': gtemp.values[j,1],\
                        'LOW': gtemp.values[j,2], \
                        'CLOSE': gtemp.values[j,3], \
                        'VOLUME': gtemp.values[j,4],\
                        'DATE': str(date),\
                        'TICKER': str(ticker)}
                
            obj = RiakObject(rc, bucket, riakKey)
                
            obj.add_index("ticker_bin", str(ticker))
            obj.add_index("year_int", int(date.year))
            obj.add_index("month_int", int(date.month))
            obj.add_index("day_int", int(date.day))
                
            obj.content_type = 'text/json'
            #obj.data = riakVal
            obj.data = json.dumps(riakVal)
            obj.store()

    return len(gtemp.index)

#searches riak bucket via 2i query and returns a dict of the data
def riakSearchData(searchBucket, searchTerm, searchVal1, searchVal2):
    myData = {}#empty dict
    myBucket = RiakClient(pb_port=8087, protocol='pbc').bucket(searchBucket)
    #check wether 1 or 2 search terms
    if searchVal2 != None:
        for key in myBucket.get_index(searchTerm, searchVal1, searchVal2): #get all keys with 2i match
            myData[key] = json.loads(myBucket.get(key).data)#store data for each key
    else:
        for key in myBucket.get_index(searchTerm, searchVal1):#get all keys with 2i match
            myData[key] = json.loads(myBucket.get(key).data)#store data for each key
    return myData

#store an individual key value pair in a bucket
def storeKV(myBucket, myKey, myVal):
    riak.RiakClient(pb_port=8087, protocol='pbc').bucket(myBucket).new(myKey, data = myVal).store()
    return

#delete a key from a bucket, provide feedback to ensure deletion
def deleteKey(delBucket, delKey):
    riak.RiakClient(pb_port=8087, protocol='pbc').bucket(delBucket).delete(delKey)
    if riak.RiakClient(pb_port=8087, protocol='pbc').bucket(delBucket).get(delKey).data == None:
        print 'Successful delete: %s' % delKey
    else:
        print 'Failed delete: %s' % delKey
    return

#delete key from bucket, no feedback
def quickDeleteKey(delBucket,delKey):
    riak.RiakClient(pb_port=8087, protocol='pbc').bucket(delBucket).delete(delKey)
    return

#delete all keys in a bucket, no feedback  
def quickDeleteAllKeys(delBucket):
    for keys in  riak.RiakClient(pb_port=8087, protocol='pbc').bucket(delBucket).stream_keys():
        for delKey in keys:
            quickDeleteKey(delBucket, delKey)      
    print 'Done'
    return

#delete all keys in a bucket, with feedback
def deleteAllKeys(delBucket):
    delList = []
    for keys in  riak.RiakClient(pb_port=8087, protocol='pbc').bucket(delBucket).stream_keys():
        for delKey in keys:
            deleteKey(delBucket, delKey)
            delList.append(delKey)
    return delList

#get all key value pairs from a bucket
def getAllKV(myBucket):
    myData = {}
    riak_bucket = riak.RiakClient(pb_port=8087, protocol='pbc').bucket(myBucket)
    for keys in riak_bucket.stream_keys():
        for key in keys:
            tempData = riak_bucket.get(key).data
            #print('Key: %s Value: %s' % (key, tempData))
            myData[key] = tempData
    return myData

#get single value for a key in a bucket
def getValue(myBucket, myKey):
    myVal = json.loads(riak.RiakClient(pb_port=8087, protocol='pbc').bucket(myBucket).get(myKey).data)
    return myVal

#Take a tuple of tuples in and return something
def pairAnalysis(pairTuple, ndays, beginDay = 0, zThresh = 2, critLevel = '5%'):
    
    #pair tuple looks like ([tickerA, [data]],[tickerB,[data]])
    #input is assumed to be same length and sorted by date with most recent date first
    
    #unwrap first stock ticker and data
    stockA = pairTuple[0]
    stockAData = list(stockA[1])
    
    #unwrap the data for stockA
    stockADates = [x[2] for x in stockAData]
    stockAClose = [x[0] for x in stockAData]
    stockAVolume = [x[1] for x in stockAData]
    
    #unwrap second stock ticker and data
    stockB = pairTuple[1]
    stockBData = list(stockB[1])
   
    #unwrap stockB data
    stockBDates = [x[2] for x in stockBData]
    stockBClose = [x[0] for x in stockBData]
    stockBVolume = [x[1] for x in stockBData]
    
    pair = pairCalc(stockAClose,stockBClose,beginDay,ndays, zThresh, critLevel)
    #if pair tradeable, add some more info
    if type(pair) is list:
            pair.insert(0,stockADates[beginDay])
            pair.insert(0,stockB[0])
            pair.insert(0,stockA[0])
            return pair
    else:
        return pair

def pairCalc(tsA,tsB,beginDay,ndays,zThresh = 2, critLevel = '5%'):
    
    #perform engle granger cointegration test
    coint = eg_test(tsA[beginDay:ndays],tsB[beginDay:ndays], critLevel)
    
    #if coint return 0, then the two timeseries are not cointegrated
    if (coint[0] != 1):
        return 0
    #else calculate stuff
    else:
        #signal = tsA[0] - beta*tsB[0] - CONSTANT = normal gaussian with mean 0
        signal = [a - coint[1][1]*b - coint[1][0] for a in tsA[beginDay:ndays] for b in tsB[beginDay:ndays]]
        sigMean = numpy.mean(signal)
        sigStd = numpy.std(signal)
        #zscore is (signal - signalMean) / signalStd
        zscore = (signal[beginDay] - sigMean)/sigStd
        #if current zscore is larger than zThresh, possible pair to trade
        if abs(zscore) > zThresh:
            return [tsA[0],tsB[0], zscore, coint[1][1], sigMean, sigStd]
    return 1

#write tradeable pair back into riak
def writePairs(pairList):
    
    #tradeable pairs are lists
    tradeable = [x for x in pairList if type(x) is list]
    
    for pair in tradeable:
        writeSinglePair(pair)
    
    #return a list of written pairs
    return tradeable
       
#write a signle pair to riak
#assumes pair is in a list of values
def writeSinglePair(pair):
    
    rc = RiakClient(pb_port=8087, protocol='pbc')
    bucket = rc.bucket('tradeEntries')
    
    #create key value pairs to stock in riak
    key = str(str(pair[0])+ '_' + str(pair[1]))
    val = {'StockA': pair[0], \
                'StockB': pair[1], \
                'Date': pair[2],\
                'CloseA': pair[3], \
                'CloseB': pair[4], \
                'ZScore': pair[5],\
                'Beta': pair[6],\
                'SignalMean': pair[7],\
                'SignalSD': pair[8]}
    myDate = pair[2].split('-')
    obj = RiakObject(rc, bucket, key)
        
    #add 2i tags
    obj.add_index("stocka_bin", str(pair[0]))
    obj.add_index("stockb_bin", str(pair[3]))
    obj.add_index("year_int", int(myDate[0]))
    obj.add_index("month_int", int(myDate[1]))
    obj.add_index("day_int", int(myDate[2]))
    obj.content_type = 'text/json'
    obj.data = val
    obj.data = json.dumps(val)
    #store
    obj.store()
    
    #return a list of written pairs
    return pair   
    
#return 1 if the two series are cointegrated and 0 otherwise, return regression parameters either way
#assumes y,x are aligned and of equal length
#critLevel can be '1%', '5%' or '10%'
def eg_test(y, x,critLevel):
    
    #must add a constant row of 1s to dependent variable, its a multidimensional regression thing
    x = stat.add_constant(x)
    #get residuals
    result = stat.OLS(y, x).fit()
    #regression parameters, slope and intercept
    regPar = result.params
    #run augmented dickey fuller test of stationarity of residuals
    #null hypothesis is stationaity of timeseries
    adfResults = ts.adfuller(result.resid, maxlag=0, regression='c', autolag=None, store=False, regresults=True)
    #test statistic
    tstat = adfResults[0]
    #critical value
    critVal = adfResults[2][critLevel]
    #if test stat is less than critical value, accept null hyptohesis of stationarity
    if tstat < critVal:
        return [1,regPar]
    else:
        return [0,regPar]

#get all values for a stock from riak  
#return close,volume,date values in a list of list
def riakGetStock(searchVal):
    myData = []
    myBucket = RiakClient(pb_port=8087, protocol='pbc').bucket('stocks')
    for key in myBucket.get_index('ticker_bin', searchVal): # get all from 2002 to 2012
        value = json.loads(myBucket.get(key).data)
        myData.append([(value['CLOSE']), (value['VOLUME']), str(value['DATE'])])
    return myData

#quick function to sort a list of list on the inner list 3 value(date)
def mySort(s):
    sortList = list(s)
    sortList.sort(key=lambda x: x[2], reverse=True)
    return sortList

#cut length of time series to n
def myFilter(s,n):
    return list(s[0:n])