<a href="https://colab.research.google.com/github/ConnorWatts/AFML-Exercises/blob/main/AdvancesinFML3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Answers to Exercises from Advances in Financial Machine Learning Chapter 3

In [78]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import multiprocessing as mp
import time
import sys
import datetime as dt
from datetime import datetime

Preliminary functions from the book

In [79]:
# CUSUM FILTER

def getTEvents(gRaw,h):
  tEvents, sPos, sNeg = [], 0, 0
  diff = gRaw.diff()
  for i in diff.index[1:]:
    sPos, sNeg = max(0,sPos+diff.loc[i]),min(0,sNeg+diff.loc[i])
    if sNeg < -h:
      sNeg = 0; tEvents.append(i)
    elif sPos > h:
      sPos = 0; tEvents.append(i)
    return pd.DatetimeIndex(tEvents)

In [80]:
# DAILY VOLS

def getDailyVol(close,span0=100):
  df0 = close.index.searchsorted(close.index-pd.Timedelta(days = 1))
  df0 = df0[df0>0]
  df0 = pd.Series(close.index[df0-1], index = close.index[close.shape[0]-df0.shape[0]:])
  df0 = close.loc[df0.index]/close.loc[df0.values].values -1
  df0 = df0.ewm(span=span0).std()
  return df0

In [81]:
# ADDING A VERTICAL BARRIER

def addVerticalBarrier(t_events, df, num_days=1):
    t1 = df.index.searchsorted(t_events + pd.Timedelta(days=num_days))
    t1 = t1[t1 < df.shape[0]]
    t1 = pd.Series(df.index[t1], index=t_events[:t1.shape[0]])  # NaNs at end
    return t1

In [82]:
def applyPtSlOnT1(close,events,ptSl,molecule):
  # apply stop loss/profit taking, if it takes place before t1 
  events_= events.loc[molecule]
  out=events_[['t1']].copy(deep=True)
  if ptSl[0]>0:pt=ptSl[0]*events_['trgt']
  else:pt=pd.Series(index=events.index)
  if ptSl[1]>0:sl=-ptSl[1]*events_['trgt']
  else:sl=pd.Series(index=events.index)
  for loc, t1 in events_['t1'].fillna(close.index[-1]).iteritems():
    df0=close[loc:t1]
    df0=(df0/close[loc]-1)*events_.at[loc,'side']
    out.loc[loc,'sl']=df0[df0<sl[loc]].index.min()
    out.loc[loc,'pt']=df0[df0>pt[loc]].index.min()
  return out

In [83]:
def getEvents(close,tEvents,ptSl,trgt,minRet,numThreads,t1=False,side=None):

  #1) get target
  trgt = trgt.loc[tEvents]
  trgt = trgt[trgt>minRet]

  #2) get t1 (max holding period)
  if t1 is False: t1 = pd.Series(pd.NaT,index=tEvents)

  #3) form events object, apply stop loss on t1
  if side is None: side_, pt_sl = pd.Series(1., index=trgt.index),[ptSl[0], ptSl[0]]
  else: side_,  pt_sl= side.loc[trgt.index],ptSl[:2]
  events = pd.concat({'t1':t1,'trgt':trgt,'side':side_}, \
                     axis=1).dropna(subset=['trgt'])
  df0 = MultiProcessingFunctions.mp_pandas_obj(func=applyPtSlOnT1,pd_obj = ('molecule',events.index) , \
                    num_threads = numThreads, close = close, events = events, ptSl = ptSl)#[ptSl,ptSl])
  events['t1'] = df0.dropna(how='all').min(axis=1) 
  events = events.drop('side',axis = 1)
  return events

In [84]:
def getBins(events,close):
  # 1) align prices with events
  events_ = events.dropna(subset=['t1'])
  px = events_.index.union(events_['t1'].values).drop_duplicates()
  px = close.reindex(px,method='bfill')
  # 2) create out object
  out= pd.DataFrame(index=events_.index)
  out['ret'] = px.loc[events_['t1'].values].values/px.loc[events_.index]-1
  if 'side' in events_: out['ret'] *= events_['side'] 
  out['bin'] = np.sign(out['ret'])
  if 'side' in events_: out.loc[out['ret'] <= 0, 'bin'] = 0
  return out

In [85]:
def dropLabels(events,minPtc=0.05):
  # apply weights, drop labels with insufficient examples
  while True:
    df0 = events['bin'].value_counts(normalize=True)
    if df0.min() > minPtc or df0.shape[0]<3:break
    print ("dropped labels, %s, %s" % (df0.argmin(),df0.min()) )
    events=events[events['bin']!=df0.argmin()]
  return events

In [86]:
class MultiProcessingFunctions:
	""" This static functions in this class enable multi-processing"""
  # from https://colab.research.google.com/drive/1FmnCJ1CI98khBu88kezLXKqvS7U8Nw_h?usp=sharing#scrollTo=k6s5ZC0eGEbW
	def __init__(self):
		pass

	@staticmethod
	def lin_parts(num_atoms, num_threads):
		""" This function partitions a list of atoms in subsets (molecules) of equal size.
		An atom is a set of indivisible set of tasks.
		"""

		# partition of atoms with a single loop
		parts = np.linspace(0, num_atoms, min(num_threads, num_atoms) + 1)
		parts = np.ceil(parts).astype(int)
		return parts

	@staticmethod
	def nested_parts(num_atoms, num_threads, upper_triangle=False):
		""" This function enables parallelization of nested loops.
		"""
		# partition of atoms with an inner loop
		parts = []
		num_threads_ = min(num_threads, num_atoms)

		for num in range(num_threads_):
			part = 1 + 4 * (parts[-1] ** 2 + parts[-1] + num_atoms * (num_atoms + 1.) / num_threads_)
			part = (-1 + part ** .5) / 2.
			parts.append(part)

		parts = np.round(parts).astype(int)

		if upper_triangle:  # the first rows are heaviest
			parts = np.cumsum(np.diff(parts)[::-1])
			parts = np.append(np.array([0]), parts)
		return parts

	@staticmethod
	def mp_pandas_obj(func, pd_obj, num_threads=24, mp_batches=1, lin_mols=True, **kargs):
		"""	
		:param func: (string) function to be parallelized
		:param pd_obj: (vector) Element 0, is name of argument used to pass the molecule;
						Element 1, is the list of atoms to be grouped into a molecule
		:param num_threads: (int) number of threads
		:param mp_batches: (int) number of batches
		:param lin_mols: (bool) Tells if the method should use linear or nested partitioning
		:param kargs: (var args)
		:return: (data frame) of results
		"""

		if lin_mols:
			parts = MultiProcessingFunctions.lin_parts(len(pd_obj[1]), num_threads * mp_batches)
		else:
			parts = MultiProcessingFunctions.nested_parts(len(pd_obj[1]), num_threads * mp_batches)

		jobs = []
		for i in range(1, len(parts)):
			job = {pd_obj[0]: pd_obj[1][parts[i - 1]:parts[i]], 'func': func}
			job.update(kargs)
			jobs.append(job)

		if num_threads == 1:
			out = MultiProcessingFunctions.process_jobs_(jobs)
		else:
			out = MultiProcessingFunctions.process_jobs(jobs, num_threads=num_threads)

		if isinstance(out[0], pd.DataFrame):
			df0 = pd.DataFrame()
		elif isinstance(out[0], pd.Series):
			df0 = pd.Series()
		else:
			return out

		for i in out:
			df0 = df0.append(i)

		df0 = df0.sort_index()
		return df0

	@staticmethod
	def process_jobs_(jobs):
		""" Run jobs sequentially, for debugging """
		out = []
		for job in jobs:
			out_ = MultiProcessingFunctions.expand_call(job)
			out.append(out_)
		return out

	@staticmethod
	def expand_call(kargs):
		""" Expand the arguments of a callback function, kargs['func'] """
		func = kargs['func']
		del kargs['func']
		out = func(**kargs)
		return out

	@staticmethod
	def report_progress(job_num, num_jobs, time0, task):
		# Report progress as asynch jobs are completed

		msg = [float(job_num) / num_jobs, (time.time() - time0)/60.]
		msg.append(msg[1] * (1/msg[0] - 1))
		time_stamp = str(dt.datetime.fromtimestamp(time.time()))

		msg = time_stamp + ' ' + str(round(msg[0]*100, 2)) + '% '+task+' done after ' + \
			str(round(msg[1], 2)) + ' minutes. Remaining ' + str(round(msg[2], 2)) + ' minutes.'

		if job_num < num_jobs:
			sys.stderr.write(msg+'\r')
		else:
			sys.stderr.write(msg+'\n')

		return

	@staticmethod
	def process_jobs(jobs, task=None, num_threads=24):
		""" Run in parallel. jobs must contain a 'func' callback, for expand_call"""
		if task is None:
			task = jobs[0]['func'].__name__

		pool = mp.Pool(processes=num_threads)
		outputs, out, time0 = pool.imap_unordered(MultiProcessingFunctions.expand_call,jobs),[],time.time()
		for i, out_ in enumerate(outputs, 1):
			out.append(out_)
			MultiProcessingFunctions.report_progress(i, len(jobs), time0, task)
		pool.close()
		pool.join()  # this is needed to prevent memory leaks
		return out

3.1) Form dollar bars for E-mini S&P 500 futures

In [87]:
# first import the data

dollar_bars_df = pd.read_parquet('BTC_1m_2018_dollar.parq')  
dollar_bars_df['timestamp']= pd.to_datetime((dollar_bars_df['timestamp']).astype(float), unit='ms')
dollar_bars_df = dollar_bars_df.set_index('timestamp')

In [88]:
dollar_bars_df.head()

Unnamed: 0_level_0,open,high,low,close
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2018-08-03 00:03:00.060,7515.94,7528.91,7505.64,7512.9
2018-08-03 00:07:00.060,7506.27,7512.9,7487.0,7487.52
2018-08-03 00:11:00.060,7487.99,7505.25,7480.72,7484.42
2018-08-03 00:17:00.060,7486.62,7502.64,7480.01,7490.92
2018-08-03 00:23:00.060,7494.43,7503.02,7485.0,7487.0


a) Apply a symmetric CUSUM filter where the threshold is the standard deviation of daily returns

In [89]:
# for this we use the preliminary functions getDailyVol and a modified getTEvents

def getTEventsStd(gRaw,h):
  # we modify the getTEvents such that the treshold is now the std for that timestamp
  tEvents, sPos, sNeg = [], 0, 0
  #diff = gRaw.diff()
  diff = np.log(gRaw).diff().dropna()
  for i in diff.index[1:]:
    sPos, sNeg = max(0,sPos+diff.loc[i]),min(0,sNeg+diff.loc[i])
    if sNeg < -h.loc[i]:
      sNeg = 0; tEvents.append(i)
    elif sPos > h.loc[i]:
      sPos = 0; tEvents.append(i)
  return pd.DatetimeIndex(tEvents)

First we find the standard deviations of the daily returns



In [90]:
close = dollar_bars_df['close']
daily_vols = getDailyVol(close,span0=50)

daily_vols.head()

timestamp
2018-08-04 00:04:00.060         NaN
2018-08-04 00:07:00.060    0.000120
2018-08-04 00:19:00.060    0.002984
2018-08-04 00:26:00.060    0.003371
2018-08-04 00:38:00.060    0.008358
Name: close, dtype: float64

Now we use these vols to find the events

In [91]:
cusum_events = getTEventsStd(close[daily_vols.index],daily_vols)

b) Use snippet 3.4 on a pandas series t1, where numDays = 1

In [92]:
vertical_barriers = addVerticalBarrier(t_events=cusum_events,
                                         df=close, num_days=1)


c) On those sampled features, apply the triple-barrier method, where ptSl = [1,1] and t1 is the series you created in b)

In [93]:
ptSl = [1,1]
minRet = 0.0005
#what is trgt? 

In [94]:
triple_barrier_events = getEvents(close=close,
                                  tEvents=cusum_events,
                                  ptSl=ptSl,
                                  trgt = daily_vols,
                                  minRet=minRet,
                                  numThreads=10,
                                  t1=vertical_barriers)

2022-11-03 14:15:47.892568 100.0% applyPtSlOnT1 done after 0.58 minutes. Remaining 0.0 minutes.


d) Apply getBins to generate the labels

In [95]:
labelled_events = getBins(triple_barrier_events,close)

In [96]:
labelled_events.bin.value_counts()

 1.0    7527
-1.0    7302
Name: bin, dtype: int64

3.2) From exercise 1, use Snippet 3.8 to drop rare lables.

In [97]:
dropped_labels = dropLabels(labelled_events,minPtc=0.05)

In [98]:
dropped_labels

Unnamed: 0,ret,bin
2018-08-04 01:57:00.060,-0.010972,-1.0
2018-08-04 06:00:00.060,-0.008068,-1.0
2018-08-04 08:14:00.060,-0.005991,-1.0
2018-08-04 09:33:00.060,-0.005543,-1.0
2018-08-04 10:21:00.060,-0.006553,-1.0
...,...,...
2020-08-02 18:35:00.060,-0.005947,-1.0
2020-08-02 19:02:00.060,-0.003903,-1.0
2020-08-02 19:17:00.060,-0.004966,-1.0
2020-08-02 19:39:00.060,-0.010559,-1.0


3.3) Adjust the getBins function (Snippet 3.5) to return a 0 whenever the vertical barrier is the first touched

In [99]:
def getBins0(events,close):
  # 1) align prices with events
  events_ = events.dropna(subset=['t1'])
  px = events_.index.union(events_['t1'].values).drop_duplicates()
  px = close.reindex(px,method='bfill')
  # 2) create out object
  out= pd.DataFrame(index=events_.index)
  out['ret'] = px.loc[events_['t1'].values].values/px.loc[events_.index]-1
  if 'side' in events_: out['ret'] *= events_['side'] 
  out['bin'] = np.sign(out['ret'])
  if 'side' in events_: out.loc[out['ret'] <= 0, 'bin'] = 0
  return out

In [100]:
triple_barrier_events.dropna(subset=['t1'])

Unnamed: 0,t1,trgt
2018-08-04 01:57:00.060,2018-08-04 10:33:00.060,0.010506
2018-08-04 06:00:00.060,2018-08-04 12:05:00.060,0.007128
2018-08-04 08:14:00.060,2018-08-04 10:17:00.060,0.005732
2018-08-04 09:33:00.060,2018-08-04 10:33:00.060,0.004493
2018-08-04 10:21:00.060,2018-08-04 12:05:00.060,0.004243
...,...,...
2020-08-02 18:35:00.060,2020-08-02 19:39:00.060,0.003847
2020-08-02 19:02:00.060,2020-08-02 19:17:00.060,0.003772
2020-08-02 19:17:00.060,2020-08-02 19:39:00.060,0.003690
2020-08-02 19:39:00.060,2020-08-02 21:35:00.060,0.005341


3.4) Develop a trend-following strategy based on a popular technical analysis statistic (e.g. crossing moving averages). For each observation, the model suggests a side, but not a size of the bet

In [106]:
def sma(data,w1,w2,mp):
  sma_1 = data.rolling(w1,min_periods = mp).mean()
  sma_2 = data.rolling(w2,min_periods = mp).mean()
  return sma_1, sma_2

In [112]:
window_1, window_2 = 20, 50
data = dollar_bars_df
data['20_SMA'] , data['50_SMA'] = sma(data['close'],window_1,window_2,mp = 1)
data['side'] = np.where(data['20_SMA'] > data['50_SMA'], 1.0, -1.0)
data.dropna(axis=0, how='any', inplace=True)  
data.head()

Unnamed: 0_level_0,open,high,low,close,20_SMA,50_SMA,side
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2018-08-03 00:03:00.060,7515.94,7528.91,7505.64,7512.9,7512.9,7512.9,-1.0
2018-08-03 00:07:00.060,7506.27,7512.9,7487.0,7487.52,7500.21,7500.21,-1.0
2018-08-03 00:11:00.060,7487.99,7505.25,7480.72,7484.42,7494.946667,7494.946667,-1.0
2018-08-03 00:17:00.060,7486.62,7502.64,7480.01,7490.92,7493.94,7493.94,-1.0
2018-08-03 00:23:00.060,7494.43,7503.02,7485.0,7487.0,7492.552,7492.552,-1.0
