In [1]:
import sys
import pandas as pd

# Alternate between ../../ and ../../src due to bug
sys.path.append('../..')

from cryptolib.exchange import Binance
from cryptolib.config import config

exchange = Binance(config.BINANCE_API_KEY, config.BINANCE_API_SECRET)

# Checking if the values are the same
print(exchange.get_last_price('ETHUSDT'))
print('Sandbox:', exchange.sandbox)

1878.6
Sandbox: False


### Charting config

In [2]:
import matplotlib.pyplot as plt
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource
from bokeh.models.tools import HoverTool
from bokeh.palettes import Category10, Dark2_5
from bokeh.io import output_notebook
import itertools

output_notebook()

def plot_signals_matplot(data, signals):
    plt.figure(figsize=(16, 9))
    plt.plot(data.index, data['close'], label='Closing Price')
    # Plot buy signals as vertical lines
    plt.plot(signals[signals == 1].index, data["close"][signals == 1], "^", markersize=10, color="g", label="Buy")
    # Plot sell signals as vertical lines
    plt.plot(signals[signals == -1].index, data["close"][signals == -1], "v", markersize=10, color="r", label="Sell")
    plt.title("Asset price with buy and sell signals")
    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.legend()
    plt.show()


colors = itertools.cycle(Dark2_5)
def plot_signals(data, signals):
    # Plot the history of orders for each bot in bokeh
    p = figure(width=1200, height=800, x_axis_type="datetime")
    p.title.text = 'Asset price with buy and sell signals'
    p.xaxis.axis_label = 'Date'
    p.yaxis.axis_label = 'Price'
    source = ColumnDataSource(data=dict(x=data.index, y=data['close']))
    p.line(x="x", y="y", source=source, line_width=2, color=next(colors), legend_label="Closing Price", name="price")

    # Plot buy signals as vertical lines
    buy_source = ColumnDataSource(data=dict(x=signals[signals == 1].index, y=data["close"][signals == 1]))
    p.triangle(x="x", y="y", source=buy_source, size=10, color="green", legend_label="Buy")

    # Plot sell signals as vertical lines
    sell_source = ColumnDataSource(data=dict(x=signals[signals == -1].index, y=data["close"][signals == -1]))
    p.inverted_triangle(x="x", y="y", source=sell_source, size=10, color="red", legend_label="Sell")

    p.legend.location = "top_left"
    p.legend.click_policy = "hide"
    p.add_tools(HoverTool(
        tooltips=[
            ('date', '@x{%F}'),
            ('price', '@y{0.00000}')
        ],
        formatters={
            '@x': 'datetime',
        },
        mode='vline',
        names=['price']
    ))

    show(p)

In [55]:
from pytvlwcharts import Chart, TimeScaleOptions

def plot_tv(df):
    chart = Chart(width=1360,
                  height=400,
                  time_scale=TimeScaleOptions(
                        time_visible=True,
                        seconds_visible=False,
                  ))
    chart.mark_candlestick(data=df, scaleMargins={'top': 0.1, 'bottom': 0.4})
    return chart


### Data generator

In [4]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

from cryptolib.enums import Interval

class StockMarketSimulation:
    def __init__(self, start_price, trend, points, time_interval: Interval, start_date: datetime):
        self.start_price = start_price
        self.trend = trend
        self.points = points
        self.time_interval = self.convert_interval(time_interval)
        self.start_date = start_date

    def convert_interval(self, interval):
        unit = interval.value[-1]
        value = int(interval.value[:-1])

        if unit == "m":
            return value
        elif unit == "h":
            return value * 60
        else:  # assuming the unit is 'd' for days
            return value * 60 * 24

    def simulate(self):
        times = [self.start_date + timedelta(minutes=i*self.time_interval) for i in range(self.points)]
        standard_normal_values = np.random.standard_normal(size = self.points)

        if self.trend == 'upward':
            drift = 0.01
        elif self.trend == 'downward':
            drift = -0.01
        else:
            drift = 0

        volatility = 0.05

        # Applying geometric Brownian motion
        prices = self.start_price * np.exp(np.cumsum((drift - 0.5 * volatility**2) * self.time_interval
                        + volatility * np.sqrt(self.time_interval) * standard_normal_values))

        ohlc_data = []
        for price in prices:
            open_price = price
            close_price = price + np.random.normal(0, price*0.01)
            high_price = max(open_price, close_price) + abs(np.random.normal(0, price*0.01))
            low_price = min(open_price, close_price) - abs(np.random.normal(0, price*0.01))
            ohlc_data.append([open_price, high_price, low_price, close_price])

        return pd.DataFrame(ohlc_data, columns=['open', 'high', 'low', 'close'], index=times)

# Create an instance of the class and simulate
simulator = StockMarketSimulation(start_price=100, trend='upward', points=1000, 
                                  time_interval=Interval.HOUR_1, start_date=datetime(2023, 5, 1))
stock_data = simulator.simulate()
print(stock_data.index)
print(stock_data)


DatetimeIndex(['2023-05-01 00:00:00', '2023-05-01 01:00:00',
               '2023-05-01 02:00:00', '2023-05-01 03:00:00',
               '2023-05-01 04:00:00', '2023-05-01 05:00:00',
               '2023-05-01 06:00:00', '2023-05-01 07:00:00',
               '2023-05-01 08:00:00', '2023-05-01 09:00:00',
               ...
               '2023-06-11 06:00:00', '2023-06-11 07:00:00',
               '2023-06-11 08:00:00', '2023-06-11 09:00:00',
               '2023-06-11 10:00:00', '2023-06-11 11:00:00',
               '2023-06-11 12:00:00', '2023-06-11 13:00:00',
               '2023-06-11 14:00:00', '2023-06-11 15:00:00'],
              dtype='datetime64[ns]', length=1000, freq=None)
                              open           high            low   
2023-05-01 00:00:00   1.515272e+02   1.531139e+02   1.485212e+02  \
2023-05-01 01:00:00   2.112848e+02   2.125353e+02   2.070150e+02   
2023-05-01 02:00:00   4.205885e+02   4.252777e+02   4.190467e+02   
2023-05-01 03:00:00   7.123843e+02  

### Strategy Analysis

In [3]:
from enum import Enum

class StrategyParamsEnum(Enum):
    """Enum for strategy parameters"""

    MACD = {"short_window": (1, 20), "long_window": (20, 40), "signal_window": (1, 5)}
    BOLLINGER_BANDS = {"window": (1, 40), "std": (1, 10)}
    STOCHASTIC_OSCILLATOR = {"window": (1, 60)}
    WILLIAMS_R = {"window": (1, 60)}
    AVERAGE_DIRECTIONAL_INDEX = {"window": (1, 60)}
    COMMODITY_CHANNEL_INDEX = {"window": (1, 60)}
    RSI = {"window": (1, 60)}
    CHAIKIN_OSCILLATOR = {"short_window": (1, 30), "long_window": (10, 40)}
    ACCUMULATION_DISTRIBUTION = {"window": (1, 60)}
    EASE_OF_MOVEMENT = {"window": (1, 60)}
    FORCE_INDEX = {"window": (1, 60)}
    ULTIMATE_OSCILLATOR = {"window1": (1, 20), "window2": (20, 40), "window3": (40, 60)}
    DONCHIAN_CHANNEL = {"window": (1, 60)}
    KELTNER_CHANNEL = {"window": (1, 60)}
    KNOW_SURE_THING = {"window1": (1,15), "window2": (10, 25), "window3": (20, 35), "window4": (25, 40)}
    TRIX = {"window": (1, 60)}

class Strategy:

    def __init__(self, data):
        self.data = data
        
    def macd(self, short_window=12, long_window=26, signal_window=9):
        """Plots the macd for a cryptocurrency symbol"""
        close = self.data.get("close")
        short_ema = close.ewm(span=short_window, adjust=False).mean()
        long_ema = close.ewm(span=long_window, adjust=False).mean()
        macd = short_ema - long_ema
        signals = macd.ewm(span=signal_window, adjust=False).mean()
        signals = pd.Series(0, index=self.data.index)
        signals[macd > signals] = 1
        signals[macd < signals] = -1
        return signals

    def bollinger_bands(self, window=20, num_stds=2):
        close = self.data.get("close")
        rolling_mean = close.rolling(window=window).mean()
        rolling_std = close.rolling(window=window).std()
        upper_band = rolling_mean + (rolling_std * num_stds)
        lower_band = rolling_mean - (rolling_std * num_stds)
        signals = pd.Series(0, index=self.data.index)
        signals[self.data["close"] > upper_band] = 1
        signals[self.data["close"] < lower_band] = -1
        return signals

    def stochastic_oscillator(self, window=14):
        """Plots the stochastic oscillator for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        k = 100 * ((close - low.rolling(window).min()) / (high.rolling(window).max() - low.rolling(window).min()))
        d = k.rolling(3).mean()
        signals = pd.Series(0, index=self.data.index)
        signals[k > d] = 1
        signals[k < d] = -1
        return signals

    def williams_r(self, window=14):
        """Plots the williams r for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        r = 100 * ((high.rolling(window).max() - close) / (high.rolling(window).max() - low.rolling(window).min()))
        signals = pd.Series(0, index=self.data.index)
        signals[r < -80] = 1
        signals[r > -20] = -1
        return signals

    def average_directional_index(self, window=14):
        """Plots the average directional index for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        tr = high - low
        tr1 = high - close.shift(1)
        tr2 = close.shift(1) - low
        tr = tr.combine(tr1, max).combine(tr2, max)
        tr = tr.rolling(window).sum()
        dm = high - high.shift(1)
        dm1 = low.shift(1) - low
        dm = dm.combine(dm1, max)
        dm = dm.clip(lower=0)
        dm = dm.rolling(window).sum()
        pdi = dm / tr
        mdi = -dm / tr
        dx = (pdi - mdi).abs() / (pdi + mdi)
        adx = dx.rolling(window).mean()
        signals = pd.Series(0, index=self.data.index)
        signals[adx < 20] = 1
        signals[adx > 50] = -1
        return signals

    def commodity_channel_index(self, window=20):
        """Plots the commodity channel index for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        tp = (high + low + close) / 3
        cci = (tp - tp.rolling(window).mean()) / (0.015 * tp.rolling(window).std())
        signals = pd.Series(0, index=self.data.index)
        signals[cci < -100] = 1
        signals[cci > 100] = -1
        return signals
    
    def rsi(self, window=14):
        """Plots the relative strength index for a cryptocurrency symbol"""
        close = self.data.get("close")
        delta = close.diff()
        up_days = delta.clip(lower=0)
        down_days = -1 * delta.clip(upper=0)
        ema_up = up_days.ewm(com=window - 1, adjust=True).mean()
        ema_down = down_days.ewm(com=window - 1, adjust=True).mean()
        rs = ema_up / ema_down
        rsi = 100 - (100 / (1 + rs))
        signals = pd.Series(0, index=self.data.index)
        signals[rsi < 30] = 1
        signals[rsi > 70] = -1
        return signals
    
    def trix(self, window=14):
        """Plots the trix for a cryptocurrency symbol"""
        close = self.data.get("close")
        ema1 = close.ewm(span=window, adjust=False).mean()
        ema2 = ema1.ewm(span=window, adjust=False).mean()
        ema3 = ema2.ewm(span=window, adjust=False).mean()
        trix = ema3.pct_change()
        signals = pd.Series(0, index=self.data.index)
        signals[trix > 0] = 1
        signals[trix < 0] = -1
        return signals

    def chaikin_oscillator(self, short_window=3, long_window=10):
        """Plots the chaikin oscillator for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        volume = self.data.get("volume")
        adl = ((close - low) - (high - close)) / (high - low)
        adl = adl * volume
        adl = adl.rolling(short_window).sum() - adl.rolling(long_window).sum()
        signals = pd.Series(0, index=self.data.index)
        signals[adl > 0] = 1
        signals[adl < 0] = -1
        return signals
    
    def know_sure_thing(self, window1=10, window2=15, window3=20, window4=30):
        """Plots the know sure thing for a cryptocurrency symbol"""
        close = self.data.get("close")
        roc1 = close.diff(window1)
        roc2 = close.diff(window2)
        roc3 = close.diff(window3)
        roc4 = close.diff(window4)
        kst = (roc1 * 1) + (roc2 * 2) + (roc3 * 3) + (roc4 * 4)
        kst = kst.rolling(9).mean()
        signals = pd.Series(0, index=self.data.index)
        signals[kst > 0] = 1
        signals[kst < 0] = -1
        return signals
    
    def ichimoku_cloud(self):
        """Plots the ichimoku cloud for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        conv = (high.rolling(9).max() + low.rolling(9).min()) / 2
        base = (high.rolling(26).max() + low.rolling(26).min()) / 2
        span_a = (conv + base) / 2
        span_b = (high.rolling(52).max() + low.rolling(52).min()) / 2
        signals = pd.Series(0, index=self.data.index)
        signals[close > span_a] = 1
        signals[close < span_b] = -1
        return signals
    
    def parabolic_sar(self):
        """Plots the parabolic sar for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        sar = close.copy()
        af = 0.02
        max_af = 0.2
        long = True
        ep = low[0]
        hp = high[0]
        lp = low[0]
        for i in range(2, len(close)):
            if long:
                sar[i] = sar[i-1] + af * (hp - sar[i-1])
            else:
                sar[i] = sar[i-1] + af * (lp - sar[i-1])
            if long and low[i] < sar[i]:
                long = False
                sar[i] = hp
                lp = low[i]
                af = 0.02
            elif not long and high[i] > sar[i]:
                long = True
                sar[i] = lp
                hp = high[i]
                af = 0.02
            elif long and high[i] > hp:
                hp = high[i]
                af = min(af + 0.02, max_af)
            elif not long and low[i] < lp:
                lp = low[i]
                af = min(af + 0.02, max_af)
        signals = pd.Series(0, index=self.data.index)
        signals[close > sar] = 1
        signals[close < sar] = -1
        return signals
    
    def inverse_cramer_transform(self):
        """Plots the inverse cramer transform for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        hl2 = (high + low) / 2
        hl2 = hl2.ewm(10).mean()
        hl2 = hl2.diff()
        hl2 = hl2.ewm(21).mean()
        hl2 = hl2.ewm(4).mean()
        signals = pd.Series(0, index=self.data.index)
        signals[close > hl2] = 1
        signals[close < hl2] = -1
        return signals
    
    def vwap(self):
        """Plots the volume weighted average price for a cryptocurrency symbol"""
        close = self.data.get("close")
        high = self.data.get("high")
        low = self.data.get("low")
        volume = self.data.get("volume")
        tp = (high + low + close) / 3
        vwap = (tp * volume).cumsum() / volume.cumsum()
        signals = pd.Series(0, index=self.data.index)
        signals[close > vwap] = 1
        signals[close < vwap] = -1
        return signals

In [52]:
import numpy as np
import pandas as pd

class StrategyAnalyser:

    CAPITAL = 5000

    def remove_repeated_signals(self, signals):
        """ Removes repeated signals. """
        prior_signal = 0
        i = 1
        while i < len(signals):
            if signals[i] == 1 or signals[i] == -1:
                if signals[i] == prior_signal:
                    signals[i] = 0
                else:
                    prior_signal = signals[i]
                
            i += 1

        # fill in the NaN values with zeros
        signals = signals.fillna(0)

        return signals
    
    def find_best_fit(self, symbol, interval, strategy_params_enum: StrategyParamsEnum):
        """Finds the best fit for the strategy parameters"""
        data = self.fetch_data(symbol, interval)
        spread = exchange.get_spread(symbol)

        best_fit, score = None, 0

        params = list(strategy_params_enum.value.items())
        best_fit, score = self._find_best_fit(params, Strategy(data).macd, spread)

        if strategy_params_enum == StrategyParamsEnum.MACD:
            best_fit, score = self.find_best_fit_macd(strategy_params_enum.value, data, spread)
        elif strategy_params_enum == StrategyParamsEnum.BOLLINGER_BANDS:
            best_fit, score = self.find_best_fit_bollinger_bands(strategy_params_enum.value, data, spread)
        elif strategy_params_enum == StrategyParamsEnum.STOCHASTIC_OSCILLATOR:
            best_fit, score = self.find_best_fit_general(strategy_params_enum.value, Strategy(data).stochastic_oscillator, spread)
        elif strategy_params_enum == StrategyParamsEnum.WILLIAMS_R:
            best_fit, score = self.find_best_fit_general(strategy_params_enum.value, Strategy(data).williams_r, spread)
        elif strategy_params_enum == StrategyParamsEnum.AVERAGE_DIRECTIONAL_INDEX:
            best_fit, score = self.find_best_fit_general(strategy_params_enum.value, Strategy(data).average_directional_index, spread)
        elif strategy_params_enum == StrategyParamsEnum.COMMODITY_CHANNEL_INDEX:
            best_fit, score = self.find_best_fit_general(strategy_params_enum.value, Strategy(data).commodity_channel_index, spread)
        elif strategy_params_enum == StrategyParamsEnum.RSI:
            best_fit, score = self.find_best_fit_general(strategy_params_enum.value, Strategy(data).rsi, spread)
        elif strategy_params_enum == StrategyParamsEnum.CHAIKIN_OSCILLATOR:
            best_fit, score = self.find_best_fit_chaikin(strategy_params_enum.value, data, spread)
        elif strategy_params_enum == StrategyParamsEnum.KNOW_SURE_THING:
            best_fit, score = self.find_best_fit_know_sure_thing(strategy_params_enum.value, data, spread)
        elif strategy_params_enum == StrategyParamsEnum.TRIX:
            best_fit, score = self.find_best_fit_general(strategy_params_enum.value, Strategy(data).trix, spread)

        return best_fit, score

    def _blah(self, params, func, spread):
        pass

    def _find_best_fit(self, params, func, spread):
        best_fit = None
        best_fit_score = -999999999
        
        # If there are no parameters for the strategy, just run it
        if len(params) < 1:
            signals = func()
            score = self.calculate_total_gain(signals, spread)
            return {}, score
        
        # Recursively find the best fit
        


    def find_best_fit_general(self, params, func, spread):
        best_fit = None
        best_fit_score = -999999999
        for window in range(params["window"][0], params["window"][1]):
            signals = func(window)
            score = self.calculate_total_gain(signals, spread)
            if score > best_fit_score:
                best_fit = {
                    "window": window,
                }
                best_fit_score = score
        return best_fit, best_fit_score

    def find_best_fit_macd(self, params, data, spread):
        best_fit = None
        best_fit_score = 0
        for short_window in range(params["short_window"][0], params["short_window"][1]):
            for long_window in range(params["long_window"][0], params["long_window"][1]):
                for signal_window in range(params["signal_window"][0], params["signal_window"][1]):
                    signals = Strategy(data).macd(short_window, long_window, signal_window)
                    score = self.calculate_total_gain(signals, spread)
                    if score > best_fit_score:
                        best_fit = {
                            "short_window": short_window,
                            "long_window": long_window,
                            "signal_window": signal_window,
                        }
                        best_fit_score = score
        return best_fit, best_fit_score

    def find_best_fit_bollinger_bands(self, params, data, spread):
        best_fit = None
        best_fit_score = 0
        for window in range(params["window"][0], params["window"][1]):
            for std in range(params["std"][0], params["std"][1]):
                signals = Strategy(data).bollinger_bands(window, std)
                score = self.calculate_total_gain(signals, spread)
                if score > best_fit_score:
                    best_fit = {
                        "window": window,
                        "std": std,
                    }
                    best_fit_score = score
        return best_fit, best_fit_score
    
    def find_best_fit_know_sure_thing(self, params, data, spread):
        best_fit = None
        best_fit_score = 0
        for window1 in range(params["window1"][0], params["window1"][1]):
            for window2 in range(params["window2"][0], params["window2"][1]):
                for window3 in range(params["window3"][0], params["window3"][1]):
                    for window4 in range(params["window4"][0], params["window4"][1]):
                        signals = Strategy(data).know_sure_thing(window1, window2, window3, window4)
                        score = self.calculate_total_gain(signals, spread)
                        if score > best_fit_score:
                            best_fit = {
                                "window1": window1,
                                "window2": window2,
                                "window3": window3,
                                "window4": window4,
                            }
                            best_fit_score = score
        return best_fit, best_fit_score

    def find_best_fit_chaikin(self, params, data, spread):
        best_fit = None
        best_fit_score = 0
        for window1 in range(params["short_window"][0], params["short_window"][1]):
            for window2 in range(params["long_window"][0], params["long_window"][1]):
                signals = Strategy(data).chaikin_oscillator(window1, window2)
                score = self.calculate_total_gain(signals, spread)
                if score > best_fit_score:
                    best_fit = {
                        "short_window": window1,
                        "long_window": window2,
                    }
                    best_fit_score = score
        return best_fit, best_fit_score

    def fetch_data(self, symbol, interval, limit=1000):
        self.data = exchange.get_historical_klines(symbol, interval, limit)
        return self.data

    def calculate_total_gain(self, signals, spread, fee=0.001):
        investment = self.CAPITAL
        units = 0.0
        position = 0  # 0 for not holding, 1 for holding
        total_gain = 0.0
        buy_price = 0.0 
        for i in range(1, len(signals)):
            if signals[i] == 1 and position == 0:  # Buy
                position = 1
                buy_price = self.data["close"][i]
                units = (investment / buy_price) * (1 - fee)
            elif signals[i] == -1 and position == 1:  # Sell
                position = 0
                sell_price = self.data["close"][i]
                investment = (sell_price * units) * (1 - fee)

        return investment - self.CAPITAL


#### Fitter
Finds the best paramater fit for different intervals for a given strategy.

In [40]:
from cryptolib.enums import Interval

symbols = [
    "XRPUSDT"
]


analyser = StrategyAnalyser()

for symbol in symbols:
    print(f"\n### {symbol}")

    # iterate intervals
    for interval in Interval:
        # find best fit
        best_fit, score = analyser.find_best_fit(symbol, interval.value, StrategyParamsEnum.RSI)

        print(" * * * ")
        print(f"Best fit for {interval.value} is {best_fit} with a P&L score of ${score}")
    
    print(" * * * ")



### XRPUSDT
 * * * 
Best fit for 1m is {'window': 56} with a P&L score of $-1403.0450461649693
 * * * 
Best fit for 3m is {'window': 31} with a P&L score of $-1420.009730240593
 * * * 
Best fit for 5m is {'window': 20} with a P&L score of $-1480.0079787852956
 * * * 
Best fit for 15m is {'window': 46} with a P&L score of $-1404.3951032522432
 * * * 
Best fit for 30m is {'window': 3} with a P&L score of $845.4814231084056


KeyboardInterrupt: 

#### Plotter

In [53]:
analyser = StrategyAnalyser()
data = analyser.fetch_data("XRPUSDT", "4h")

strategy = Strategy(data)

window = None
best = -9999999
for i in range(1, 60):
    signals = strategy.rsi(i)
    total_gain = analyser.calculate_total_gain(signals, 0.0)
    if total_gain > best:
        best = total_gain
        window = i

print(f"Best window: {window} with gain: {best}")

Best window: 8 with gain: 4366.934502182086


In [54]:
analyser = StrategyAnalyser()
data = analyser.fetch_data("XRPUSDT", "4h")

signals = Strategy(data).rsi(8)

print(analyser.calculate_total_gain(signals, 0.0))
signals = analyser.remove_repeated_signals(signals)
print("Number of orders:", len([signal for signal in signals if signal != 0]))

plot_signals(data, signals)

4366.934502182086
Number of orders: 27
