Simulate the Donchian High entry signal and calculate performance statistics.

In [None]:
import pandas as pd
import numpy as np
import datetime as dt
from itertools import product
inlib = '/data/2_dataprep/symboldata'
worklib = '/scratch' #Ephemeral directory
outlib = '/data/3_entry'
symbolList = pd.read_pickle('/data/2_dataprep/symbolList.p')

In [None]:
#Donchian High entry signal
def entry_DonchianHigh(data,p_high,p_pullback,p_days_pullback,
                       price,variation):
    '''
    Outputs a dataset of dates and prices that triggered the Donchian High signal.
    
    data: dataset containing historical prices for an individual stock
    p_high: the number of days used to calculate the Donchian breakout criteria
    p_pullback: pullback multiple of variation
    p_days_pullback: number of days after the Donchian breakout for the pullback to be effective
    price: price measure to consider (eg. adjClose, close)
    variation: variation measure used to calculate pullback criteria
    '''
    temp = data.copy()
    temp.sort(['date'],inplace=True)
    temp.index = range(temp.shape[0])
    temp['entry_date'] = np.nan
    temp['entry_price'] = np.nan
    temp['entry_'+price] = np.nan
    
    #Calculate Donchian cutoff
    temp['__cutoff'] = pd.rolling_max(temp[price],window=p_high,min_periods=p_high).shift()
    
    #Loop across dates that meet the Donchian cutoff
    for i in temp[(temp[price] > temp['__cutoff'])].index:
        #Check if the pullback criteria is met
        pullback_criteria = temp.ix[i,price]*(1.0-p_pullback*temp.ix[i,variation])
        check_pullback = temp.ix[i:(min(i+p_days_pullback,temp.shape[0]-1))]
        check_pullback = check_pullback[check_pullback[price] < pullback_criteria]
        if check_pullback.shape[0] > 0:
            pullback_i = check_pullback.index[0]
            temp.ix[i,'entry_date'] = temp.ix[pullback_i,'date']
            temp.ix[i,'entry_price'] = temp.ix[pullback_i,'adjClose']
            temp.ix[i,'entry_'+price] = temp.ix[pullback_i,price]
    
    #Cleanup
    temp2 = temp.drop([col for col in temp.columns if col.startswith('__')],axis=1)
    temp2 = temp2[temp2['entry_price']>0].drop_duplicates(['entry_date'])
    temp2.sort(['entry_date'],inplace=True)
    return temp2

In [None]:
#Calculate the performance of entry signals at various intervals
def entryPerformance(data,entry_data,transaction,intervals):
    '''
    Attaches performance statistics at various intervals to the input entry_data dataset.
    
    data: dataset containing historical prices for an individual stock
    entry_data: dataset containing entry trades for an individual stock
    transaction: transaction cost percent for each trade
    intervals: list of durations in days to compute performance metrics
    '''
    temp = entry_data.copy()
    tempdata = data[['date','adjClose']].copy()
    tempdata.sort(['date'],inplace=True)
    tempdata.index = range(tempdata.shape[0])
    max_date = tempdata['date'].max() #Latest date available
    
    #Initialise columns
    for j in intervals:
        temp['p_MFE_'+str(j)] = np.nan
        temp['p_MAE_'+str(j)] = np.nan
        temp['p_DER_'+str(j)] = np.nan
    if temp.shape[0] == 0: return temp #Returns original dataset if no trades
    
    #Loop across entry trades and intervals
    for i,j in product(temp.index,intervals):
        entry_date = temp.ix[i,'entry_date']
        entry_price = temp.ix[i,'entry_price']
        final_date = entry_date + dt.timedelta(days=j) #Final date for performance evaluation
        if final_date > max_date: continue #Skip to the next loop if not enough data
        
        #Subset data
        temp_price = tempdata[(tempdata['date'] > entry_date) &
                              (tempdata['date'] <= final_date)]        
        if temp_price.shape[0]==0: continue #Prevents errors if there are gaps in data
        
        #Calculate maximum favourable/adverse execution
        highest = temp_price['adjClose'].max()-(transaction*entry_price)
        lowest = temp_price['adjClose'].min()-(transaction*entry_price)
        temp.ix[i,'p_MFE_'+str(j)] = 365.25*np.log(highest/entry_price)/j
        temp.ix[i,'p_MAE_'+str(j)] = 365.25*np.log(lowest/entry_price)/j
        
        #Calculate duration ending return
        ending = temp_price.ix[temp_price.index[-1],'adjClose']-(transaction*entry_price)
        temp.ix[i,'p_DER_'+str(j)] = 365.25*np.log(ending/entry_price)/j    
    return temp

In [None]:
from IPython.parallel import Client
rc = Client()
l_view = rc.load_balanced_view()
d_view = rc[:]

In [None]:
%%px
import pandas as pd
import numpy as np
import datetime as dt
from itertools import product
import socket
import platform
inlib = '/data/2_dataprep/symboldata'
worklib = '/scratch' #Ephemeral directory
outlib = '/data/3_entry'

In [None]:
d_view.push({'entry_DonchianHigh':entry_DonchianHigh,'entryPerformance':entryPerformance});
for i in rc.ids: #Identify machines
    dv = rc[i]
    dv.push({'machine_id':i})

In [None]:
#Function to score an individual symbol
def entryScore(fileloc,symbol,entryFunction,transaction,intervals,outloc,**kwargs):
    '''
    Outputs triggered entry signals of an individual symbol and associated performance statistics.
    
    fileloc: location of file that contains the required individual company data
    symbol: company to process
    entryFunction: entry signal function
    transaction: transaction costs as a percentage of each trade
    intervals: durations to calculate performance metrics
    outloc: location to store final dataset
    '''
    indata = pd.read_pickle(fileloc) #Read data
    data1 = entryFunction(data=indata,**kwargs) #Find triggered entry signals
    data2 = entryPerformance(indata,data1,transaction,intervals) #Calculate performance statistics
    data2.to_pickle(outloc) #Output final dataset
    return machine_id #Check which machine processes this function

In [None]:
def inv_DonchianHigh(label,p_high,p_pullback,p_days_pullback,price,variation):    
    #Loop across symbols and process in parallel
    allResults = []
    for i,symbol in enumerate(symbolList):
        fileloc = inlib+'/'+symbol+'.p'
        asyncResult = l_view.apply(entryScore,
                                   fileloc=fileloc,symbol=symbol,
                                   entryFunction=entry_DonchianHigh,
                                   transaction=0.01,intervals=[50,100,150,200,250,300,350,700,1050],
                                   outloc=worklib+'/'+label+'_'+symbol+'.p',
                                   p_high=p_high,p_pullback=p_pullback,p_days_pullback=p_days_pullback,
                                   price=price,variation=variation)
        allResults.append((symbol,asyncResult))        
    
    #Check results and append final datasets
    i = 0
    for symbol,result in allResults:
        try:
            result_test = result.get()
            if i==0: 
                final = pd.read_pickle(worklib+'/'+label+'_'+symbol+'.p')                
            else:
                temp = pd.read_pickle(worklib+'/'+label+'_'+symbol+'.p')
                final = final.append(temp)                
            i = i + 1
            #if i%500 == 0: print i
        except:
            print 'Error processing symbol: '+symbol
            pass
    
    #Output final dataset
    final.to_pickle(outlib+'/'+label+'.p')
    print 'Procesed: '+label

In [None]:
%%time
#inv_DonchianHigh('DonchianHigh_350_0_30',350,0,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_300_0_30',300,0,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_250_0_30',250,0,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_200_0_30',200,0,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_150_0_30',150,0,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_350_3_30',350,3,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_300_3_30',300,3,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_300_3_30_close',300,3,30,'close','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_250_3_30',250,3,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_200_3_30',200,3,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_150_3_30',150,3,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_350_3_60',350,3,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_300_3_60',300,3,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_250_3_60',250,3,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_200_3_60',200,3,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_150_3_60',150,3,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_350_5_30',350,5,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_300_5_30',300,5,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_250_5_30',250,5,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_200_5_30',200,5,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_150_5_30',150,5,30,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_350_5_60',350,5,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_300_5_60',300,5,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_250_5_60',250,5,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_200_5_60',200,5,60,'adjClose','z_varclose_avg30')
#inv_DonchianHigh('DonchianHigh_150_5_60',150,5,60,'adjClose','z_varclose_avg30')