## Backtesting sma crossover strategy

### Go to project root

In [1]:
import os
from pathlib import Path

#list the current work dir
cwd = os.getcwd()
current_path = Path(cwd)
project_root = current_path.parent.parent.parent

#change the current work dir
os.chdir(project_root)

In [2]:
from collections import deque

from trazy_analysis.bot.event_loop import EventLoop
from trazy_analysis.broker.broker_manager import BrokerManager
from trazy_analysis.broker.simulated_broker import SimulatedBroker
from trazy_analysis.broker.binance_fee_model import BinanceFeeModel
from trazy_analysis.common.clock import SimulatedClock
from trazy_analysis.feed.feed import CsvFeed, Feed
from trazy_analysis.indicators.indicators_manager import IndicatorsManager
from trazy_analysis.models.asset import Asset
from trazy_analysis.order_manager.order_creator import OrderCreator
from trazy_analysis.order_manager.order_manager import OrderManager
from trazy_analysis.order_manager.position_sizer import PositionSizer
from trazy_analysis.strategy.strategies.sma_crossover_strategy import (
    SmaCrossoverStrategy,
)
from datetime import datetime, timedelta
import pytz
from trazy_analysis.db_storage.influxdb_storage import InfluxDbStorage
from trazy_analysis.common.ccxt_connector import CcxtConnector
from trazy_analysis.market_data.historical.ccxt_historical_data_handler import CcxtHistoricalDataHandler

from trazy_analysis.feed.feed import ExternalStorageFeed
from trazy_analysis.statistics.pyfolio_statistics import PyfolioStatistics
from trazy_analysis.statistics.statistics import Statistics
from trazy_analysis.statistics.quantstats_statistics import QuantstatsStatistics

import pyfolio as pf
import black_box as bb
import rbfopt
import numpy as np

  'Module "zipline.assets" not found; multipliers will not be applied'


In [3]:
%load_ext autoreload
%autoreload

In [None]:
asset = Asset(symbol="BTCUSDT", exchange="BINANCE")

In [6]:
start = datetime(2022, 5, 7, 5, 5)

In [7]:
end = datetime(2022, 5, 7, 5, 24)

datetime.datetime(2022, 5, 7, 5, 5)

In [None]:
LOOKBACK_PERIOD = timedelta(days=1)
end = datetime.now(pytz.UTC)
start = end - LOOKBACK_PERIOD

In [None]:
start

In [None]:
end

In [None]:
db_storage = InfluxDbStorage()

In [None]:
exchanges_api_keys = {
    "BINANCE": {
        "key": None,
        "secret": None,
        "password": None,
    }
}

ccxt_connector = CcxtConnector(exchanges_api_keys=exchanges_api_keys)
historical_data_handler = CcxtHistoricalDataHandler(ccxt_connector)
historical_data_handler.save_ticker_data_in_db_storage(
    asset, db_storage, start, end
)

In [None]:
assets = [asset]
events = deque()

feed: Feed = ExternalStorageFeed(
    assets = assets,
    events = events,
    time_unit = timedelta(minutes=1),
    start = start,
    end = end,
    db_storage = db_storage,
    file_storage = None,
    market_cal = None,
)

strategies = {SmaCrossoverStrategy: [SmaCrossoverStrategy.DEFAULT_PARAMETERS]}
clock = SimulatedClock()
broker = SimulatedBroker(clock, events, initial_funds=10000.0, fee_model=BinanceFeeModel())
broker.subscribe_funds_to_portfolio(10000.0)
broker_manager = BrokerManager(brokers={"BINANCE": broker}, clock=clock)
position_sizer = PositionSizer(broker_manager=broker_manager, integer_size=False)
order_creator = OrderCreator(broker_manager=broker_manager)
order_manager = OrderManager(
    events=events,
    broker_manager=broker_manager,
    position_sizer=position_sizer,
    order_creator=order_creator,
)
indicators_manager = IndicatorsManager(preload=True, initial_data=feed.candles)
event_loop = EventLoop(
    events=events,
    assets=assets,
    feed=feed,
    order_manager=order_manager,
    strategies_parameters=strategies,
    indicators_manager=indicators_manager,
)
event_loop.loop()

print(broker.get_portfolio_cash_balance())

In [None]:
event_loop.equity_dfs["SmaCrossoverStrategy"]["BINANCE"]

In [None]:
event_loop.positions_dfs["BINANCE"]

In [None]:
event_loop.transactions_dfs["BINANCE"]

In [None]:
import pandas as pd

with pd.option_context('display.max_rows', None, 'display.max_columns', None): 
    print(event_loop.equity_dfs['SmaCrossoverStrategy']['BINANCE'][event_loop.equity_dfs['SmaCrossoverStrategy']['BINANCE']['returns'] != 0])

In [None]:
equity = event_loop.equity_dfs['SmaCrossoverStrategy']['BINANCE']
positions = event_loop.positions_dfs["BINANCE"]
transactions = event_loop.transactions_dfs["BINANCE"]

In [None]:
pf.timeseries.perf_stats(returns=returns, positions=positions, transactions=transactions).to_frame(name="Backtest results")

In [None]:
pf.create_round_trip_tear_sheet(returns=returns, positions=positions, transactions=transactions)

In [None]:
returns

In [None]:
tearsheet = PyfolioStatistics(
    equity=equity,
    positions=positions,
    transactions=transactions,
    title='Sma Crossover'
)
tearsheet.get_tearsheet()

In [None]:
tearsheet = Statistics(
    equity=equity,
    positions=positions,
    transactions=transactions,
    title='Sma Crossover'
)
tearsheet.get_tearsheet()

In [None]:
tearsheet = QuantstatsStatistics(
    equity=equity,
    positions=positions,
    transactions=transactions,
    title='Sma Crossover'
)
tearsheet.get_tearsheet()

### Test optimizers

In [None]:
def sma_cross_backtest(short_sma, long_sma, trailing_stop_order_pct):
    short_sma = int(short_sma)
    long_sma = int(long_sma)
    print(short_sma)
    print(long_sma)
    print(trailing_stop_order_pct)
    assets = [asset]
    events = deque()

    feed: Feed = ExternalStorageFeed(
        assets = assets,
        events = events,
        time_unit = timedelta(minutes=1),
        start = start,
        end = end,
        db_storage = db_storage,
        file_storage = None,
        market_cal = None,
    )

    strategies = {SmaCrossoverStrategy: [{"short_sma": short_sma, "long_sma": long_sma}]}
    clock = SimulatedClock()
    broker = SimulatedBroker(clock, events, initial_funds=10000.0, fee_model=BinanceFeeModel())
    broker.subscribe_funds_to_portfolio(10000.0)
    broker_manager = BrokerManager(brokers={"BINANCE": broker}, clock=clock)
    position_sizer = PositionSizer(broker_manager=broker_manager, integer_size=False)
    order_creator = OrderCreator(broker_manager=broker_manager, with_cover=True, trailing_stop_order_pct=trailing_stop_order_pct)
    order_manager = OrderManager(
        events=events,
        broker_manager=broker_manager,
        position_sizer=position_sizer,
        order_creator=order_creator,
    )
    indicators_manager = IndicatorsManager(preload=True, initial_data=feed.candles)
    event_loop = EventLoop(
        events=events,
        assets=assets,
        feed=feed,
        order_manager=order_manager,
        strategies_parameters=strategies,
        indicators_manager=indicators_manager,
    )
    event_loop.loop()

    return broker.get_portfolio_cash_balance()

In [None]:
sma_cross_backtest((51, 200, 0.149))

In [None]:
sma_cross_backtest((62, 126, 0.192691))

In [None]:
sma_cross_backtest((64, 124, 0.192691))

In [None]:
sma_cross_backtest((58, 128, 0.075825))

In [None]:
import time

In [None]:
!pip install ipython-autotime

### Black blox

In [None]:
best_params = bb.search_min(f = sma_cross_backtest,  # given function
                            domain = [  # ranges of each parameter
                                [5., 75.],
                                [100., 200.],
                                [0.0, 0.20]
                                ],
                            budget = 4,  # total number of function calls available
                            batch = 4,  # number of calls that will be evaluated in parallel
                            resfile = 'output.csv')  # 

In [None]:
for param in best_params:
    print(param)

In [None]:
sma_cross_backtest(best_params)

In [None]:
Discrete.__dict__

In [None]:
np.savetxt(None, [])

In [None]:
from trazy_analysis.optimization.optimizer import BlackBoxOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = BlackBoxOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 4,
        max_evals = 4
)

In [None]:
best_params

### skopt

In [None]:
from skopt.space import Real, Integer
from skopt.utils import use_named_args

space = [Integer(5, 75, name='short_sma'),
          Integer(100, 200, name='long_sma'),
          Real(0., 0.20, "uniform", name='trailing_stop_order_pct')]

from skopt import gp_minimize
res_gp = gp_minimize(sma_cross_backtest, space, n_calls=52, random_state=0)

In [None]:
print("""Best parameters:
- short_sma=%d
- long_sma=%d
- trailing_stop_order_pct=%.6f""" % (res_gp.x[0], res_gp.x[1],
                            res_gp.x[2]))

In [None]:
sma_cross_backtest(res_gp.x)

In [None]:
from skopt.plots import plot_objective
plot_objective(res_gp)

In [None]:
from trazy_analysis.optimization.optimizer import SkoptOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = SkoptOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 10,
        max_evals = 1
)

In [None]:
best_params

### Hyperopt

In [None]:
from hyperopt import fmin, tpe, hp, Trials

space = {
    'short_sma': hp.randint('short_sma', 5, 75),
    'long_sma': hp.randint('long_sma', 100, 200),
    'trailing_stop_order_pct': hp.uniform('trailing_stop_order_pct', 0., 0.20),
}
trials = Trials()

def sma_cross_backtest_hyperopt(params_dict):
    print(params_dict)
    return -sma_cross_backtest(**params_dict)

best = fmin(fn=sma_cross_backtest_hyperopt,
    space=space,
    algo=tpe.suggest,
    max_evals=52,
    trials=trials
)

In [None]:
best

In [None]:
from hyperopt.plotting import main_plot_history

main_plot_history(trials)

In [None]:
from trazy_analysis.optimization.optimizer import HyperOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = HyperOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 52,
        max_evals = 1
)

In [None]:
best_params

### Optuna

In [None]:
import optuna

def sma_cross_backtest_optuna(trial):
    short_sma = trial.suggest_int('short_sma', 5, 75)
    long_sma = trial.suggest_int('long_sma', 100, 200)
    trailing_stop_order_pct = trial.suggest_float('trailing_stop_order_pct', 0, 0.20, log=False)
    return sma_cross_backtest(short_sma, long_sma, trailing_stop_order_pct)

In [None]:
study = optuna.create_study(storage="sqlite:///db.sqlite3")  # Create a new study.
study.optimize(sma_cross_backtest_optuna, n_trials=52)

In [None]:
from trazy_analysis.optimization.optimizer import OptunaOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = OptunaOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 52,
        max_evals = 1
)

In [None]:
best_params

### ray

In [None]:
from ray import tune

def sma_cross_backtest_ray(config):
    short_sma, long_sma, trailing_stop_order_pct = config["short_sma"], config["long_sma"], config["trailing_stop_order_pct"]
    result = sma_cross_backtest(short_sma, long_sma, trailing_stop_order_pct)
    tune.report(profit=result)

search_space = {
    "short_sma": tune.randint(5, 75),
    "long_sma": tune.randint(100, 200),
    "trailing_stop_order_pct": tune.uniform(0., 0.20)
}

analysis = tune.run(sma_cross_backtest_ray, metric="profit", num_samples=26, config=search_space, mode="max", resources_per_trial={"cpu": 4})

In [None]:
best_config = analysis.get_best_config(
    metric="profit", mode="min")
print("Best config: ", best_config)

# Get a dataframe for analyzing trial results.
df = analysis.results_df

In [None]:
sma_cross_backtest((best_config["short_sma"], best_config["long_sma"], best_config["trailing_stop_order_pct"]))

In [None]:
from trazy_analysis.optimization.optimizer import RayOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = RayOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 16,
        max_evals = 1
)

### Bayesian optimization

In [None]:
from bayes_opt import BayesianOptimization

def sma_cross_backtest_bayes_opt(short_sma, long_sma, trailing_stop_order_pct):
    return sma_cross_backtest(short_sma, long_sma, trailing_stop_order_pct)

# Bounded region of parameter space
pbounds = {'short_sma': (5, 75), 'long_sma': (100, 200), 'trailing_stop_order_pct': (0, 0.20)}

optimizer = BayesianOptimization(
    f=sma_cross_backtest_bayes_opt,
    pbounds=pbounds,
    random_state=0,
)

t = optimizer.maximize(
    init_points=1,
    n_iter=15,
)

In [None]:
optimizer.max

In [None]:
from trazy_analysis.optimization.optimizer import BayesianOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = BayesianOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 15,
        max_evals = 1
)

In [None]:
best_params

### Sherpa

In [None]:
import sherpa
parameters = [
    sherpa.Discrete('short_sma', [5, 75]),
    sherpa.Discrete('long_sma', [100, 200]),
    sherpa.Continuous('trailing_stop_order_pct', [0.05, 0.20]),
]
alg = sherpa.algorithms.GPyOpt(max_num_trials=16)
study = sherpa.Study(parameters=parameters,
                     algorithm=alg,
                     lower_is_better=True)

iteration = 1
for trial in study:
    short_sma = trial.parameters["short_sma"]
    long_sma = trial.parameters["long_sma"]
    trailing_stop_order_pct = trial.parameters["trailing_stop_order_pct"]
    profit = -sma_cross_backtest(short_sma, long_sma, trailing_stop_order_pct)
    study.add_observation(trial=trial,
                          iteration=iteration,
                          objective=profit
    )
    iteration += 1

#study.finalize(trial)

In [None]:
from trazy_analysis.optimization.optimizer import SherpaOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = SherpaOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 15,
        max_evals = 1
)

In [None]:
best_params

### GPyOpt

In [None]:
import GPyOpt

bounds = [
    {'name': 'short_sma', 'type': 'discrete', 'domain': (5,75)},
    {'name': 'long_sma', 'type': 'discrete', 'domain': (100,200)},
    {'name': 'trailing_stop_order_pct', 'type': 'continuous', 'domain': (0.,0.20)},
]

def sma_cross_backtest_gpy_opt(params):
    short_sma = params[0][0]
    long_sma = params[0][1]
    trailing_stop_order_pct = params[0][2]
    return -sma_cross_backtest(short_sma, long_sma, trailing_stop_order_pct)

myProblem = GPyOpt.methods.BayesianOptimization(sma_cross_backtest_gpy_opt, bounds)
myProblem.run_optimization(16)

In [None]:
myProblem.x_opt

In [None]:
myProblem.fx_opt

In [None]:
from trazy_analysis.optimization.optimizer import GPyOptimizer
from trazy_analysis.optimization.parameter import (
    Choice,
    Continuous,
    Discrete,
    Ordinal,
    Parameter,
)

optimizer = GPyOptimizer()
best_params = optimizer.maximize(func=sma_cross_backtest,
        space={
            "short_sma": Discrete([5, 75]),
            "long_sma": Discrete([100, 200]),
            "trailing_stop_order_pct": Continuous([0.1, 0.5]),
        },
        nb_iter = 52,
        max_evals = 1
)

In [None]:
best_params