# How to perform what-if scenarios for trading strategies with Amazon FinSpace

This notebook will: 

1. Create and connect to a FinSpace managed cluster
2. Load the data view selected in FinSpace web application into a Spark DataFrame and access data using Spark SQL
3. Train a machine learning model with Spark ML
4. Run multiple what-if trading strategies

Before executing the notebook: 
1. Select the FinSpace PySpark Kernel in the top right corner of this notebook
2. Wait less than 5 minutes until the FinSpace PySpark Kernel is available 


## 1. Connect to FinSpace Cluster

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
    finspace_clusters = FinSpaceClusterManager()
    finspace_clusters.auto_connect()
else:
    print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

## 2. Load data view into Spark DataFrame

In [None]:
#####----------------------------------------------------------
##### REPLACE WITH CORRECT IDS!
##### Dataset: "US Equity Time-Bar Summary - 1 min, 14 Symbols - Sample"
#####----------------------------------------------------------
dataset_id = ""
data_view_id = ""

### Display Dataset
FinSpace API to get the dataset as a Spark DataFrame

In [None]:
from aws.finspace.analytics import FinSpaceAnalyticsManager
finspace_analytics = FinSpaceAnalyticsManager(spark = spark)

df = finspace_analytics.read_data_view(dataset_id = dataset_id, data_view_id = data_view_id)

### Display the DataFrame's Schema

In [None]:
df.printSchema()

### Preview Dataset
Sample a few rows of the DataFrame

In [None]:
df.show(5)

## 3. Query data and prepare features for model training


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import pyspark.sql.functions as F
import pyspark.sql.types as T

from aws.finspace.timeseries.spark.analytics import *
from aws.finspace.timeseries.spark.windows import *

from aws.finspace.timeseries.spark.util import string_to_timestamp_micros

In [None]:
#Please adjust date range as needed  
sDate = dt.datetime(2020, 1, 1)
eDate = dt.datetime(2020, 2, 28)

df = ( df.filter(df.eventtype == "TRADE NB").filter( df.date.between(sDate, eDate) ).filter(df.ticker == "AMZN") )

In [None]:
df.show(7)

In [None]:
df.createOrReplaceTempView("df")

In [None]:
# group the sets of values
partitionList = ["ticker", "eventtype"]

tenor = 10
numStd = 2

timeCol  = 'end'
priceCol = 'vwap'
highCol  = 'high'
lowCol   = 'low'
volCol   = 'volume'

emaDef = exponential_moving_average(tenor, timeCol, priceCol)

#Example: use MACD instead of simpler EMA
#macdDef = moving_average_converge_diverge_hist( 12, 26, 9, timeCol, priceCol ),

df = compute_analytics_on_features(df, "exponential_moving_average", emaDef, partition_col_list = partitionList)

# will be working with the once calculated, lets cache it
df = df.cache()

In [None]:
df.printSchema()

In [None]:
df.show(11)

In [None]:
futureDF = df.withColumn("futurestart", df.start + F.expr('INTERVAL 5 MINUTES'))
futureDF = futureDF.withColumnRenamed("vwap", "label")
futureDF.show(4)

In [None]:
#we're adding a new column that contains volume weighted average price and "move" the values ahead of 5 min (5 time period) and use this as label
df = df.alias('df')
futureDF = futureDF.alias('futureDF')

fullDF = df.join(futureDF, df.start == futureDF.futurestart).select('df.*', 'futureDF.label', 'futureDF.futurestart', 'futureDF.start')
fullDF.show(4)

In [None]:
#just a final check of our features before using them for model training
pd.DataFrame(fullDF.take(3), columns=fullDF.columns).transpose()

## 4. Train multiple RandomForest models with different parameters via Spark ML

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from time import *

# Take only subset without null
df = fullDF.collect()[50:100]
# Take full dataset
#df = fullDF.collect()[50:]

df = spark.createDataFrame(df)

feature_list = ["activity_count", "vwap", "open", "high", "low", "close", "volume", "exponential_moving_average"]
        
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

rf = RandomForestRegressor().setFeaturesCol("features").setLabelCol("label")

pipeline = Pipeline(stages=[assembler, rf])

#hyperparameters values: numTrees start, numTrees stop, numTrees num, maxDepth start, maxDepth stop, maxDepth num, numFolds
set1=[ 5, 25, 3, 5, 25, 3, 3]
set2=[ 10, 50, 3, 5, 25, 3, 9]
set3=[ 5, 25, 3, 5, 25, 3, 9]

params=[set1, set2, set3]
models=[]

for i in params:

    print("set" + str(i) )
    paramGrid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = i[0], stop = i[1], num = i[2])]) \
        .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = i[3], stop = i[4], num = i[5])]) \
        .build()

    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(),
                              numFolds=i[6])

    (trainingData, testData) = df.randomSplit([0.8, 0.2])

    starttime = time()

    m=crossval.fit(trainingData)
    
    models.append(m)
    
    endtime = time()
    trainingtime = endtime - starttime
    print("Training time: %.3f seconds" % (trainingtime))


## 5. Install backtesting library and define trading strategy


In [None]:
sc.install_pypi_package("backtrader")

Define helper classes for using the backtesting library. This contains code to calculate performance metrics, to read time series data from the Spark cluster, and to define a base template of strading strategy.

In [None]:
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.analyzers as btanalyzers
from backtrader.feed import DataBase
from backtrader import date2num
from backtrader import TimeFrame
import os
import pytz
from pytz import timezone
import json
import time
import itertools
import datetime

# More documentation about backtrader: https://www.backtrader.com/

class AlgoStrategy():
    
    def __init__(self,strategy):       
        self.cerebro = bt.Cerebro()        
        strategy.init_broker(self.cerebro.broker)
        data=strategy.add_data(self.cerebro)
        strategy.data=data
       
        self.cerebro.addstrategy(strategy)
        
        self.portfolioStartValue=self.cerebro.broker.getvalue()                            
        self.cerebro.addanalyzer(btanalyzers.DrawDown, _name='dd')
        self.cerebro.addanalyzer(btanalyzers.SharpeRatio_A, _name='sharpe')
        self.cerebro.addanalyzer(btanalyzers.SQN, _name='sqn')
        self.cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='ta')
        
    def performance(self):
        analyzer=self.thestrat.analyzers.ta.get_analysis()
        dd_analyzer=self.thestrat.analyzers.dd.get_analysis()
      
        #Get the results we are interested in
        total_open = analyzer.total.open
        total_closed = analyzer.total.closed
        total_won = analyzer.won.total
        total_lost = analyzer.lost.total
        win_streak = analyzer.streak.won.longest
        lose_streak = analyzer.streak.lost.longest
        pnl_net = round(analyzer.pnl.net.total,2)
        strike_rate=0
        if total_closed>0:
            strike_rate = (total_won / total_closed) * 100
        #Designate the rows
        h1 = ['Total Open', 'Total Closed', 'Total Won', 'Total Lost']
        h2 = ['Strike Rate','Win Streak', 'Losing Streak', 'PnL Net']
        h3 = ['DrawDown Pct','MoneyDown', '', '']
        self.total_closed=total_closed
        self.strike_rate=strike_rate
        self.max_drawdown=dd_analyzer.max.drawdown
        r1 = [total_open, total_closed,total_won,total_lost]
        r2 = [('%.2f%%' %(strike_rate)), win_streak, lose_streak, pnl_net]
        r3 = [('%.2f%%' %(dd_analyzer.max.drawdown)), dd_analyzer.max.moneydown, '', '']
        #Check which set of headers is the longest.
        header_length = max(len(h1),len(h2),len(h3))
        #Print the rows
        print_list = [h1,r1,h2,r2,h3,r3]
        row_format ="{:<15}" * (header_length + 1)
        print("Trade Analysis Results:")
        for row in print_list:
            print(row_format.format('',*row))

        analyzer=self.thestrat.analyzers.sqn.get_analysis()
        sharpe_analyzer=self.thestrat.analyzers.sharpe.get_analysis()
        self.sqn = analyzer.sqn
        self.sharpe_ratio = sharpe_analyzer['sharperatio']
        if self.sharpe_ratio is None:
            self.sharpe_ratio=0
        self.pnl = self.cerebro.broker.getvalue()-self.portfolioStartValue
        print('[SQN:%.2f, Sharpe Ratio:%.2f, Final Portfolio:%.2f, Total PnL:%.2f]' % (self.sqn,self.sharpe_ratio,self.cerebro.broker.getvalue(),self.pnl))
        
    def run(self):
        thestrats = self.cerebro.run()
        self.thestrat = thestrats[0]
        self.performance()

class MyFeed(DataBase):
    def __init__(self):
        super(MyFeed, self).__init__()
        self.list=testData.select("end", "activity_count", "vwap", "open", "high", "low", "close", "volume", "exponential_moving_average").collect()
        self.n=0
        
        self.fromdate=self.list[0]['end']
        self.todate=self.list[len(self.list)-1]['end']
        self.timeframe=bt.TimeFrame.Minutes
        print("from=%s,to=%s" % (self.fromdate,self.todate))
        
        self.m={}
        #print(self.list)
        
    def start(self):
        # Nothing to do for this data feed type
        pass

    def stop(self):
        # Nothing to do for this data feed type
        pass
    
    def _load(self):
        if self.n>=len(self.list):
            return False
        
        r=self.list[self.n]
        self.lines.datetime[0] = date2num(r['end'])
        
        self.lines.open[0] = r['open']
        self.lines.high[0] = r['high']
        self.lines.low[0] = r['low']
        self.lines.close[0] = r['close']
        self.lines.volume[0] = r['volume']
        self.m[r['end']]=r
        
        self.n=self.n+1
        return True
        
class StrategyTemplate(bt.Strategy):
    
    def __init__(self):         
        self.lastDay=-1
        self.lastMonth=-1
        self.dataclose = self.datas[0].close
    
    @staticmethod
    def init_broker(broker):
        pass
    
    @staticmethod
    def add_data(cerebro):
        pass
     
    def next(self):
        dt=self.datas[0].datetime.datetime(0)
        #print("[NEXT]:%s:close=%s" % (dt,self.dataclose[0]))
        
        #SOM
        if self.lastMonth!=dt.month:
            if self.lastMonth!=-1:
                chg=self.broker.getvalue()-self.monthCash
                #print("[%s] SOM:chg=%.2f,cash=%.2f" % (dt,chg,self.broker.getvalue()))
            self.lastMonth=dt.month
            self.monthCash=self.broker.getvalue()
        
        #SOD
        if self.lastDay!=dt.day:
            self.lastDay=dt.day
            #print("[%s] SOD:cash=%.2f" % (dt,self.broker.getvalue()))

Define the trading strategy that we want to backtest. The method next defines the trade logic at each timeseries record.

In [None]:
class MyStrategy(StrategyTemplate):

    def __init__(self):  # Initiation
        super(MyStrategy, self).__init__()
        
    def init_broker(broker):
        broker.setcash(1000000.0)
        broker.setcommission(commission=0.0) 
        
    def add_data(cerebro):
        data = MyFeed()
        cerebro.adddata(data)
        return data
    
    def next(self):  # Processing
        super(MyStrategy, self).next()
        dt=self.datas[0].datetime.datetime(0)
        r=self.data.m[dt]
        print(r)
        size=self.cerebro.strat_params['size']
        threshold_PctChg=self.cerebro.strat_params['pct_chg']
               
        model=self.cerebro.strat_params['model']
        df=spark.createDataFrame([r])
        closePrice=r['close']
        predicedPrice = model.transform(df).collect()[0]['prediction']
        expectedPctChg=(predicedPrice-closePrice)/closePrice*100.0
        
        goLong=expectedPctChg>threshold_PctChg
        goShort=expectedPctChg<-threshold_PctChg
        print("expectedPctChg=%s,goLong=%s,goShort=%s" % (expectedPctChg,goLong,goShort))
        
        if not self.position:
            if goLong:
                self.buy(size=size) # Go long
            else:
                self.sell(size=size) # Go short
        elif self.position.size>0 and goShort:
            self.sell(size=size*2)
        elif self.position.size<0 and goLong:          
            self.buy(size=size*2)

## 6. Create what-if scenarios and run trading strategies with different configurations

In [None]:
# create scenarios
scenarios=[]
for p in range(0,len(models)):
    for s in range(0,1):
        c={"size":100,"pct_chg":0.10*s,"model":models[p]}
    scenarios.append(c)
print(scenarios)

In [None]:
# run scenarios
best_config=None
best_pnl=None
n=0
for c in scenarios:
    print("*** [%s] RUN SCENARIO:%s" % ((n+1),c))
    config=c
    algo=AlgoStrategy(MyStrategy)
    algo.cerebro.strat_params=config
    algo.run()
    if best_pnl is None or best_pnl<algo.pnl:
        best_config=c
        best_pnl=algo.pnl
    n+=1
        
# best scenario
print("*** BEST SCENARIO:pnl=%s,config=%s" % (best_pnl,best_config))