In [12]:
# data analysis
import numpy as np
import pandas as pd
import pandas_ta
import matplotlib.pyplot as plt
import plotly.graph_objects as go

# data communication
import urllib.parse
from urllib.parse import urlencode
from typing import Optional, Dict, Any
import requests
from requests import Session, Request, Response
import configparser
import hmac
import hashlib
import json

# utilities
import time
import math
import logging
import unittest
from datetime import datetime, timedelta
import calendar
import apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler

In [13]:
logging.basicConfig(filename='scalp_bot.log', level=logging.WARNING)

In [14]:
class BinanceClient:
    """Provides an interface for the trading bot to communicate with the Binance exchange via its' API."""
    
    def __init__(self, endpoint):
        self.maker_fees_USD_futures = 0.0002
        self.taker_fees_USD_futures = 0.0004
        self.session = None
        self.config = None
        self.api_key = None
        self.api_secret = None
        self.endpoint = endpoint
        
        
    def open_session(self):
        """Opens a session with the Binance server."""
        
        self.session = Session()
        
        
    def read_config(self):
        """Initializes the Binance client."""
        
        self.config = configparser.ConfigParser()
        self.config.read('config.ini')
        
        self.api_key = self.config['BINANCE API']['api_key']
        self.api_secret = self.config['BINANCE API']['api_secret']
        
        
    def process_response(self, response: Response) -> Any:
        """Processes the response the server sends to the clients request."""
        
        try:
            data = response.json()
        except ValueError:
            response.raise_for_status()
            raise
        else:
            return data
    
    
    def sign_request(self, request: Request) -> None:
        """Signs confidential requests that need the users API key and API secret."""
        
        ts = int(time.time() * 1000)
        request.params['timestamp'] = str(ts)
        signature_payload = urlencode(request.params).encode("utf-8")
        signature = hmac.new(self.api_secret.encode("utf-8"), signature_payload, hashlib.sha256).hexdigest()
        request.headers['X-MBX-APIKEY'] = self.api_key
        request.params['signature'] = signature
        
        
    def request(self, method: str, path: str,**kwargs,) -> Any:
        """Executes a non-confidential request to the Binance server."""
        
        request = Request(method, self.endpoint + path, **kwargs)
        response = self.session.send(request.prepare())
        return self.process_response(response)
    
    
    def signed_request(self, method: str, path: str, params, **kwargs) -> Any:
        """Executes a confidential request to the Binance server"""
        
        request = Request(method, self.endpoint + path, params=params, **kwargs)
        self.sign_request(request)
        response = self.session.send(request.prepare())
        return self.process_response(response)
        
        
    def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """Executes an HTTPS GET request."""
        
        return self.request('GET', path, params=params)
    
    
    def post(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """Executes an HTTPS POST request."""
        
        return self.signed_request('POST', path, params=params)
    
    
    def delete(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """Executes an HTTPS DELETE request."""
        
        return self.signed_request('DELETE', path, json=params)
    
    
    def place_order(self, market: str, side: str, size: float, reduce_only: bool = False,
                    order_type: str = 'MARKET') -> dict:
        """Places a buy or sell order with the exchange."""
        
        return self.post('fapi/v1/order', params={
            'symbol': market,
            'side': side,
            'type': order_type,
            'quantity': size,
            "reduceOnly": reduce_only
        })

In [15]:
class MarketStructure:
    """Represents the market structure of a crypto coin at one point in time."""
    
    def __init__(self, prev_high: tuple, prev_low: tuple, 
                 provisional_high: tuple, provisional_low: tuple,
                 recent_break: bool = False):
        self.msb = False
        self.stay_in_range = False
        self.continuation = False
        self.trend = False
        self.prev_high = prev_high
        self.prev_low = prev_low
        self.provisional_high = provisional_high
        self.provisional_low = provisional_low
        self.recent_break = recent_break
        
        
    def _break_up_trend(self, row):
        """Sets the new market structure after break of an up trend."""
        
        self.trend = False
        self.prev_high = self.provisional_high
        self.prev_low = tuple(row[['low', 'open time']])
        self.provisional_low = tuple(row[['low', 'open time']])
        logging.info('Up Trend Broken')
        
        
    def _break_down_trend(self, row):
        """Sets the new market structure after break of a down trend."""
        
        self.trend = True
        self.prev_high = tuple(row[['high', 'open time']])
        self.prev_low = self.provisional_low
        self.provisional_high = tuple(row[['high', 'open time']])
        logging.info('Down Trend Broken')
        
        
    def _continue_up_trend(self, row):
        """Sets the new market structure after an up trend continues."""
        
        self.prev_low = self.provisional_low
        self.prev_high = tuple(row[['high', 'open time']])
        self.provisional_high = tuple(row[['high', 'open time']])
        logging.info('Continuing Up Trend')
        
        
    def _continue_down_trend(self, row):
        """Sets the new market structure after a down trend continues."""
        
        self.prev_high = self.provisional_high
        self.prev_low = tuple(row[['low', 'open time']])
        self.provisional_low = tuple(row[['low', 'open time']])
        logging.info('Continuing Down Trend')
        
    def _stay_in_range(self, row):
        """Keeps track of provisional information while range bound."""
        
        if row['close'] - row['open'] >= 0:
            self.provisional_high = tuple(row[['high', 'open time']])
        if row['close'] - row['open'] < 0:
            self.provisional_low = tuple(row[['low', 'open time']])
        logging.info('Staying in Range')
        
        
    def next_candle(self, row):
        """Investigates how the market structure changes with the next incoming price data candle."""
        
        close = row['close']
        self.msb = False
        self.continuation = False
        self.stay_in_range = False
        trend = self.trend
        
        if close > self.prev_high[0]:
            if self.trend == False:
                self._break_down_trend(row)
                self.msb = True
            else:
                self._continue_up_trend(row)
                self.continuation = True
        elif close < self.prev_low[0]:
            if self.trend == False:
                self._continue_down_trend(row)
                self.continuation = True
            else:
                self._break_up_trend(row)
                self.msb = True
        else:
            self._stay_in_range(row)
            self.stay_in_range = True
            
        return trend, self.msb, self.continuation, self.stay_in_range

In [16]:
class Trade:
    """Implements generic properties of trades that are common amongst all particular trading strategies
       such as buying and selling."""
    
    def __init__(self, bc: BinanceClient, ms: MarketStructure, equity: float, 
                 max_risk: float, max_leverage: float = 1.0):
        self.ms = ms
        self.bc = bc
        self._equity = equity
        self._max_risk = max_risk
        self._max_leverage = max_leverage
        self.equity_curve = [equity]
        self.position_size = [0]
        self.position = 0
        self.num_trades = 0
        self.wins = 0
        self.win_rate = 0
        self.kelly = 0.02
        self.open = 0
        self.close = 0
        self.long_trigger = False
        self.long_position = False
        self.short_trigger = False
        self.short_position = False
        self.entry = None
        self.stop_loss = None
        self.target = None

        
    @property
    def equity(self):
        """Returns the current equity value of the strategy."""
        
        if self.long_position or self.short_position:
            return (self.position*self.close) + self._equity
        else:
            return self._equity
    
    
    def plot_equity_curve(self):
        """Plots the equity curve of the strategy."""
        
        plt.plot(self.equity_curve)
        plt.show()
        
        
    def plot_position_size(self):
        """Plots the position size of the strategy over time."""
        
        plt.plot(self.position_size)
        plt.show()
        
        
    def long(self, price, risk):
        """Enters a long trade."""
        
        trade_size = min(self._max_leverage*self._equity, 
                         (self._max_risk/risk) * self._equity)
        coins = ((1.-self.bc.taker_fees_USD_futures)*trade_size)/price
        self.position += coins
        self._equity -= trade_size
        logging.info('Entered New Long Position')
        
        
    def short(self, price, risk):
        """Enters a short trade."""
        
        trade_size = min(self._max_leverage*self._equity, 
                         (self._max_risk/risk) * self._equity)
        coins = ((1-self.bc.taker_fees_USD_futures)*trade_size)/price
        self.position -= coins
        self._equity += (1.-self.bc.taker_fees_USD_futures) * trade_size
        logging.info('Entered New Short Position')
        
        
    def close_trade(self, price):
        """Closes an open long or short positoin."""
        
        cash = self.position*price
        self._equity += (1.-self.bc.taker_fees_USD_futures) * cash
        self.position = 0

In [22]:
class ContinuationTrade(Trade):
    """Implements the logical rules for a trend reversal trade triggered by an initial market structure break."""
             
    def __init__(self, bc: BinanceClient, ms: MarketStructure, equity: float, 
                 max_risk: float, max_leverage: float, risk_reward: float = 2.0):
        super(ContinuationTrade, self).__init__(bc, ms, equity, max_risk, max_leverage)
        self._risk_reward = risk_reward
        
            
    def setup_trade(self, row):
        """Activates trade triggers and sets stop losses."""
        
        trend, msb, continuation, stay_in_range = self.ms.next_candle(row)
        self.close = row['close']
        
        if continuation and trend:
            self.entry = 0.66*self.ms.prev_low[0] + 0.33*self.ms.prev_high[0]
            self.stop_loss = self.ms.prev_low[0]
            self.long_trigger = True
        elif continuation and (not trend):
            self.entry = 0.66*self.ms.prev_high[0] + 0.33*self.ms.prev_low[0]
            self.stop_loss = self.ms.prev_high[0]
            self.short_trigger = True
                
        self.position_size.append(self.position)
        self.equity_curve.append(self.equity)
            
            
    def next_candle_setup(self, row):
        """Initializes the strategy by iterating through historical data without executing trades."""
        self.setup_trade(row)
    
    
    def next_candle_trade_high_tf(self, row):
        """Checks for valid trade set ups with new live data and execute live trades."""
        
        price = row['close']
        
        if self.long_position:
            if price < self.stop_loss:
                self.close_trade(row['close'])
                logging.info('Stop Loss Hit on Long Position')
                self.long_position = False
                
        if self.short_trigger:
            if price <= self.ms.prev_low[0] and not self.ms.continuation:
                self.short_trigger = False
            if price >= self.ms.prev_high[0]:
                self.short_trigger = False
                
        if self.short_position:
            if price > self.stop_loss:
                self.close_trade(price)
                logging.info('Stop Loss Hit on Short Position')
                self.short_position = False
                
        
    def next_candle_trade_low_tf(self, row):
        """..."""
        
        if self.long_trigger:
            if row['low'] <= self.ms.prev_low[0]:
                self.long_trigger = False
            elif row['high'] >= self.ms.prev_high[0] and not self.ms.continuation:
                self.long_trigger = False
            else:
                if row['low'] <= self.entry:
                    target = self.ms.prev_high[0] #+ 0.4*(self.entry - self.stop_loss)
                    risk = self.entry - self.stop_loss
                    reward_risk = (target - self.entry)/risk
                    if reward_risk >= self._risk_reward:
                        self.long(self.entry, risk/price)
                        self.target = target #self.ms.prev_high[0]
                        self.num_trades += 1
                        self.long_position = True
                        self.long_trigger = False
            
        if self.long_position:
            if row['high'] > self.target:
                self.close_trade(self.target)
                logging.info('Take Profit on Long Position')
                self.long_position = False
                self.wins += 1
                self.win_rate = self.wins/self.num_trades
                
        if self.short_trigger:
            if row['high'] >= self.entry:
                risk = self.stop_loss - price
                reward_risk = (price - self.ms.prev_low[0])/risk
                if reward_risk >= self._risk_reward:
                    self.short(price, risk/price)
                    self.target = self.ms.prev_low[0]
                    self.num_trades += 1
                    self.short_position = True
                    self.short_trigger = False
                    
        if self.short_position:
            if row['low'] < self.target:
                self.close_trade(self.target)
                logging.info('Take Profit on Short Position')
                self.short_position = False
                self.wins += 1
                self.win_rate = self.wins/self.num_trades

In [18]:
def adjust_ts(ts):
    """Adjusts timestamps from milliseconds to seconds."""
    
    ts /= 1000
    return ts

In [19]:
def backtest(bc: BinanceClient, ms: MarketStructure, strat: Trade, 
             tick_interval: str, market_name: str, start: datetime):
    """Goes through a give set of historical data and applies the trading strategy to this data, 
       tracking results."""

    start_time = start
    end_time = start_time + timedelta(days=10)
        
    while start_time < datetime.now():
    
        response = bc.get(f'fapi/v1/klines?symbol={market_name}&interval={tick_interval}'
                        + f'&startTime={int(start_time.timestamp()*1000)}'
                        + f'&endTime={int(end_time.timestamp()*1000)}&limit={1000}')
        df = pd.DataFrame(response, columns = ['open time', 'open', 'high', 'low', 'close', 
                                               'volume', 'close time', 'quote asset volume', 
                                               'number of trades', 'taker buy base asset volume',
                                               'taker buy quote asset volume', 'unused field'])
        
        df['open time'] = df['open time'].apply(adjust_ts)
        df['open time'] = df['open time'].apply(datetime.fromtimestamp)
        df[['open', 'high', 'low', 'close', 'volume']] \
            = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
        
        for index, row in df.iterrows():
            strat.next_candle_setup(row)
    
        start_time = end_time
        end_time = end_time + timedelta(days=30)
        
    #strat.plot_position_size()

In [20]:
ath = (69190.0, '2021-11-10 14:00:00+00:00')
prev_low = (66251.0, '2021-11-10 12:00:00+00:00')
start = datetime(2021, 11, 10, 15, 0, 0, 0)
timeframe = '1h'
market_name = 'BTCBUSD'

initial_equity = 30
max_risk_per_trade = 0.02
max_leverage = 1.0
        
bc = BinanceClient(endpoint = 'https://fapi.binance.com/')
bc.open_session()
bc.read_config()

ms = MarketStructure(ath, prev_low, ath, prev_low)
rtit = ReversalTrade(bc, ms, initial_equity, max_risk_per_trade, max_leverage)

backtest(bc, ms, rtit, timeframe, market_name, start)

In [21]:
logging.getLogger().setLevel(logging.INFO)

def hourly_close():
    response = bc.get(f'fapi/v1/klines?symbol={market_name}&interval={timeframe}')
    df = pd.DataFrame(response, columns = ['open time', 'open', 'high', 'low', 'close', 
                                            'volume', 'close time', 'quote asset volume', 
                                            'number of trades', 'taker buy base asset volume',
                                            'taker buy quote asset volume', 'unused field'])
    df['open time'] = df['open time'].apply(adjust_ts)
    df['open time'] = df['open time'].apply(datetime.fromtimestamp)
    df[['open', 'high', 'low', 'close', 'volume']] \
        = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
    print('hourly')
    #rtit.next_candle_trade_high_tf(df.iloc[-1])
    #rtit.next_candle_trade_low_tf
    #rtit.next_candle_setup(df.iloc[-1])
    

def minute_close():
    response = bc.get(f'fapi/v1/klines?symbol={market_name}&interval=1m')
    df = pd.DataFrame(response, columns = ['open time', 'open', 'high', 'low', 'close', 
                                            'volume', 'close time', 'quote asset volume', 
                                            'number of trades', 'taker buy base asset volume',
                                            'taker buy quote asset volume', 'unused field'])
    df['open time'] = df['open time'].apply(adjust_ts)
    df['open time'] = df['open time'].apply(datetime.fromtimestamp)
    df[['open', 'high', 'low', 'close', 'volume']] \
        = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
    print('minute')
    #rtit.next_candle_trade_low_tf(df.iloc[-1])

sched = BlockingScheduler()
sched.add_job(hourly_close, 'cron', hour='0-23')
sched.add_job(minute_close, 'cron', minute='1-59')
sched.start()

KeyboardInterrupt: 