In [3]:
import pandas as pd
import numpy as np
import json

from strategy.cointegration import (
    extract_close_prices,
    calculate_cointegration,
    calculate_spread,
    calculate_zscore
)

In [79]:
symbol_1 = "BLZUSDT"
symbol_2 = "SLPUSDT"
filename = "1_price_histories.json"
window = 21

long_coin = symbol_1
other_coin = symbol_2 if long_coin == symbol_1 else symbol_1
zscore_threshold = 1.1
maker_fee_percent = -0.00025 # rebate would be -ve version of this value
start_trading_capital = 1000.0
slippage_assumption = 0.001 # difference in asset price at order placement and order fulfilment

In [80]:
prices_1 = []
prices_2 = []
with open(filename) as json_file:
    price_data = json.load(json_file)
    if len(price_data) > 0:
        prices_1 = extract_close_prices(price_data[symbol_1]["result"])
        prices_2 = extract_close_prices(price_data[symbol_2]["result"])
# print(prices_1[:5])
# print(prices_2[:5])

In [81]:
# Extract close prices for both symbols

df = pd.DataFrame(columns=[symbol_1, symbol_2])
df[symbol_1] = prices_1
df[symbol_2] = prices_2
# print(df)

# Add column for indicating whether z-score is -ve / +ve

coint = calculate_cointegration(prices_1, prices_2)
spread = calculate_spread(prices_1, prices_2, coint.hedge_ratio)
zscore = calculate_zscore(spread, window)

df["Z-score"] = zscore
df = df[df["Z-score"].notna()]
df["Z-score sign"] = np.where(df['Z-score'] > 0, 1, -1)
df.reset_index(inplace=True, drop=True)

# print(df[df["Z-score"] < 0])
# df

In [82]:
# Calculate next symbol price (the 1st price where z-score flips)

def calculate_next_price(df, symbol):
    next_prices = []
    for index in range(len(df)):
        next_price = df[symbol].iloc[index]
        zscore_sign = df["Z-score sign"].iloc[index]
        lookup_window = df[df.index > index]
        
        # get list of indexes where z-score sign is flipped
        next_price_candidates = lookup_window.loc[lookup_window["Z-score sign"] == -zscore_sign, symbol]
        
        # if there's no z-score flip, then use current price
        if len(next_price_candidates) > 0:
            # get price of first occurring instance of flipped z-score sign
            next_price = df[symbol].iloc[next_price_candidates.index[0]]
        next_prices.append(next_price)
        
    return next_prices

df[f"{symbol_1} next price"] = calculate_next_price(df, symbol_1)
df[f"{symbol_2} next price"] = calculate_next_price(df, symbol_2)
# pd.set_option('display.max_rows', 200)
# pd.set_option('display.max_columns', None)
# df

In [83]:
# Populate long coin column

df["Long coin"] = df.apply(lambda row: long_coin if row["Z-score"] < 0 else other_coin, axis=1)
# df

In [84]:
# Populate trigger column

# using the mean-reverted tactic here, whereby we only place long/short order when z-score crosses the zero-line
# a different approach would be to continue placing long/short orders whilst the z-score is on either side of the zero-line
# ^ referred to as the n-step approach in the Udemy notes
def calculate_trigger(df):
    triggers = [True]
    for index in range(1, len(df)):
        previous_zscore_sign = df["Z-score sign"].iloc[index-1]
        zscore_sign = df["Z-score sign"].iloc[index]
        zscore = df["Z-score"].iloc[index]
        
        trigger = abs(zscore) > zscore_threshold and previous_zscore_sign != zscore_sign
        triggers.append(trigger)
        
        previous_zscore = zscore
    return triggers

# df["Trigger"] = df.apply(lambda row: abs(row["Z-score"]) > zscore_threshold, axis=1)
df["Trigger"] = calculate_trigger(df)
# df

In [93]:
# Populate long at column

df["Long at"] = df.apply(lambda row: row[row["Long coin"]] if row["Trigger"] else 0.0, axis=1)
# df

In [95]:
# Populate close at column

df["Close at"] = df.apply(lambda row: row[f"{row['Long coin']} next price"] if row["Trigger"] else 0.0, axis=1)
# df

In [96]:
# Populate return column
# Basically represents profit/loss if trigger = 1, else capital remains the same

df["Return"] = df.apply(lambda row: row["Close at"] / row["Long at"] if row["Trigger"] else 1.0, axis=1)
# df

In [115]:
# Populate capital column

def calculate_capital(df):
    rebates = []
    slippages = []
    capitals = []
    previous_capital = None
    for index in range(len(df)):
        trigger = df["Trigger"].iloc[index]
        return_rate = df["Return"].iloc[index]
        trading_capital = previous_capital or start_trading_capital
        
        rebate, slippage = 0, 0
        if trigger:
            rebate = trading_capital * -maker_fee_percent
            slippage = trading_capital * slippage_assumption
        
        current_capital = (trading_capital * return_rate) + rebate - slippage
        
        capitals.append(current_capital)
        rebates.append(rebate)
        slippages.append(slippage)
        
        previous_capital = current_capital
    return capitals, rebates, slippages

df["Capital"], rebates, slippages = calculate_capital(df)
df["Rebate"] = rebates
df["Slippage"] = slippages

df[df["Return"] > 1]

Unnamed: 0,BLZUSDT,SLPUSDT,Z-score,Z-score sign,BLZUSDT next price,SLPUSDT next price,Long coin,Trigger,Long at,Close at,Return,Capital,Rebate,Slippage
113,0.057,0.00257,-1.185681,-1,0.05755,0.00255,BLZUSDT,True,0.057,0.05755,1.009649,830.963929,0.205909,0.823634
146,0.05649,0.00269,-3.961121,-1,0.0599,0.00259,BLZUSDT,True,0.05649,0.0599,1.060365,880.501566,0.207741,0.830964


In [118]:
# Now do the same for short trades

df_short = df[[symbol_1, symbol_2, "Z-score", "Z-score sign",
               f"{symbol_1} next price", f"{symbol_2} next price", "Trigger"]].copy()

df_short["Short coin"] = df_short.apply(lambda row: other_coin if row["Z-score"] < 0 else long_coin, axis=1)

df_short["Short at"] = df_short.apply(lambda row: row[row["Short coin"]] if row["Trigger"] else 0.0, axis=1)
df_short["Close short at"] = df_short.apply(lambda row: row[f"{row['Short coin']} next price"] if row["Trigger"] else 0.0, axis=1)

df_short["Return"] = df_short.apply(lambda row: row["Short at"] / row["Close short at"] if row["Trigger"] else 1.0, axis=1)
df_short["Capital"], short_rebates, short_slippages = calculate_capital(df_short)

df_short[df_short["Return"] > 1]

Unnamed: 0,BLZUSDT,SLPUSDT,Z-score,Z-score sign,BLZUSDT next price,SLPUSDT next price,Trigger,Short coin,Short at,Close short at,Return,Capital
0,0.08071,0.00358,0.927355,1,0.0733,0.00324,True,BLZUSDT,0.08071,0.0733,1.101091,1100.341405
32,0.06491,0.00297,-2.215837,-1,0.05917,0.00269,True,SLPUSDT,0.00297,0.00269,1.104089,1214.049827
113,0.057,0.00257,-1.185681,-1,0.05755,0.00255,True,SLPUSDT,0.00257,0.00255,1.007843,1222.661249
146,0.05649,0.00269,-3.961121,-1,0.0599,0.00259,True,SLPUSDT,0.00269,0.00259,1.03861,1268.951251
148,0.0599,0.00259,1.728201,1,0.05692,0.00256,True,BLZUSDT,0.0599,0.05692,1.052354,1334.434442


In [119]:
long_profit = df["Capital"].iloc[-1] - start_trading_capital
short_profit = df_short["Capital"].iloc[-1] - start_trading_capital
net_profit = long_profit + short_profit

print("Long profit:", long_profit)
print("Short profit:", short_profit)
print("Net profit:", net_profit)

roi = net_profit / (start_trading_capital * 2)
print("ROI:", roi)

trigger_sum = len(df[df["Trigger"] == True].index)

long_wins = len(df[(df["Trigger"] == True) & (df["Return"] > 1)].index)
long_win_rate = (long_wins / trigger_sum) * 100
print("Long win rate:", long_win_rate, "%")

short_wins = len(df_short[(df_short["Trigger"] == True) & (df_short["Return"] > 1)].index)
short_win_rate = (short_wins / trigger_sum) * 100
print("Short win rate:", short_win_rate, "%")

avg_win_rate = (long_win_rate + short_win_rate) / 2

Long profit: -130.35767016124498
Short profit: 334.4344415805597
Net profit: 204.0767714193147
ROI: 0.10203838570965734
Long win rate: 40.0 %
Short win rate: 100.0 %
