In [143]:
import pandas as pd
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
from scipy import signal, stats
from plotly import express as plx
import json
import csv

from sklearn import linear_model
from tqdm import tqdm


In [49]:
COINBASE_TICKERS = '../data/coinbase_tickers.txt'
COINBASE_BOOKS = '../data/coinbase_books.txt'

KRAKEN_TICKERS = '../data/kraken_tickers.txt'
KRAKEN_BOOKS = '../data/kraken_books.txt'


DEPTH = 100
SCALE = int(1e8)

Explore the lag within the books as they update much faster than tickers do

In [69]:
schema = {
    'ticker':pl.String,
    'timestamp':pl.UInt64,
    'type':pl.String,
    'price':pl.UInt64,
    'quantity':pl.UInt64
}

cb_book = (pl.read_csv(COINBASE_BOOKS, schema=schema))
cb_bids = cb_book.filter(pl.col('type') == 'bid').drop('type')
cb_asks = cb_book.filter(pl.col('type') == 'offer').drop('type')

kk_book = (pl.scan_csv(KRAKEN_BOOKS, schema=schema)).collect()
kk_bids = kk_book.filter(pl.col('type') == 'bid').drop('type')
kk_asks = kk_book.filter(pl.col('type') == 'ask').drop('type')

print("size of cb_bids", cb_bids.height)
print("size of cb_asks", cb_asks.height)
print("size of kk_bids", kk_bids.height)
print("size of kk_asks", kk_asks.height)

size of cb_bids 33548
size of cb_asks 42740
size of kk_bids 15935
size of kk_asks 21757
[String, UInt64, UInt64, UInt64]


In [86]:
def calculate_best(data, is_bid):

    best_signal = []
    all_ts = data.select('timestamp').unique().sort('timestamp')
    first_ts = all_ts[0, "timestamp"]
    curbook = data.filter(pl.col('timestamp') == first_ts)

    for ts in tqdm(all_ts['timestamp']):
        #update the book
        signal = data.filter(pl.col('timestamp') == ts)
        curbook = (curbook
                   .join(signal.select(['price', 'quantity']), on='price', how='left', suffix='_new')
                   .with_columns(pl.col('quantity_new').fill_null(pl.col('quantity')).alias('quantity'))
                   .drop('quantity_new')
                   .filter(pl.col('quantity') != 0)
                   .sort('price', descending=is_bid)
        )
        # get best
        minbook = curbook.head(DEPTH)
        if not minbook.is_empty() and ts != first_ts:
            best = {
                'timestamp': ts,
                'bid_price': curbook['price'][0],
                'bid_quantity': curbook['quantity'][0]
            } if is_bid else {
                'timestamp': ts,
                'ask_price': curbook['price'][0],
                'ask_quantity': curbook['quantity'][0]
            }
            best_signal.append(best)


    return pl.DataFrame(best_signal).cast(pl.UInt64)


cb_bids_best = calculate_best(cb_bids, True)
cb_asks_best = calculate_best(cb_asks, False)
kk_bids_best = calculate_best(kk_bids, True)
kk_asks_best = calculate_best(kk_asks, False)



100%|██████████| 12370/12370 [00:07<00:00, 1565.93it/s]
100%|██████████| 11855/11855 [00:08<00:00, 1435.28it/s]
100%|██████████| 9617/9617 [00:06<00:00, 1592.65it/s]
100%|██████████| 12125/12125 [00:07<00:00, 1572.06it/s]


In [87]:
def ffill(data):
    # Convert 'timestamp' from milliseconds to a datetime column
    data = data.with_columns(
        pl.col('timestamp').cast(pl.Datetime('ms')).alias('datetime')
    )
    data = data.drop(pl.col('timestamp'))
    # Generate a full range of datetime values at 1 ms intervals
    start = data['datetime'].min()
    end = data['datetime'].max()
    full_range = pl.DataFrame({
        'datetime':pl.datetime_range(start=start, end=end, interval='1ms', closed='both', eager=True)
        }).cast(pl.Datetime('ms'))
    
    data = full_range.join(data, on='datetime', how='left')
    data = (data
            .fill_null(strategy='forward')
            .with_columns(pl.col('datetime').dt.epoch(time_unit='ms').cast(pl.UInt64).alias('timestamp'))
            .drop('datetime'))
    return data

cb_bids_best_ffill = ffill(cb_bids_best)
cb_asks_best_ffill = ffill(cb_asks_best)
kk_bids_best_ffill = ffill(kk_bids_best)
kk_asks_best_ffill = ffill(cb_asks_best)

In [88]:
def ffill_agregate(dataframes):
    start = max(df['timestamp'].min() for df in dataframes)
    end = min(df['timestamp'].max() for df in dataframes)

    aligned_dfs = [df.filter((pl.col('timestamp') >= start) & (pl.col('timestamp') <= end)) for df in dataframes]
    return aligned_dfs

final = ffill_agregate([cb_bids_best_ffill,
                      cb_asks_best_ffill,
                      kk_bids_best_ffill,
                      kk_asks_best_ffill])

In [137]:
def weighted_best_price(best_bids, best_ask):

    ### super ghetto to prevent overflow, but since this is an dumb estimate of true price anyway it shouldnt really matter lol
    df = (
        best_bids
        .join(best_ask, on="timestamp", how="inner")
        .with_columns([
            ((pl.col("bid_price")/1000 * pl.col("bid_quantity")/1000) + 
             (pl.col("ask_price")/1000 * pl.col("ask_quantity")/1000)).alias("weighted_value"),
            (pl.col("bid_quantity") + pl.col("ask_quantity")).alias("total_quantity"),
        ])
        .with_columns((pl.col("weighted_value") / pl.col("total_quantity") * 1000000).cast(pl.UInt64).alias("price"))
        .select(["timestamp", "price"])
    )
    return df

cb_price = weighted_best_price(final[0], final[1])
kk_price = weighted_best_price(final[2], final[3])

In [145]:
def log_return(price):
    df = (
        price
        .with_columns([
            pl.col('price').log().diff().alias("log_return")
        ])
        .select(["timestamp", "log_return"])
    )
    return df.filter(pl.col('log_return').is_null() != True)

cb_lr = log_return(cb_price)
kk_lr = log_return(kk_price)

In [None]:
## TODO: EVERYTHING ABOVE THIS NEEDS TO BE OPTIMIZED CUZ RN IT CAN'T RUN!!!

# TODO: indicators

 best-ask to best-bid price spread
 ask-bid imbalance (ask_vol - bid_vol) / (ask_vol + bid_vol)

##### for MA, do for both best (best_signal_ffilled) and weighted avg (wavg_signal_ffileed)
- MA (3 ms)
- MA (10 ms)
- MA (100 ms)
- MA (1000 ms)
- EMA (alpha = 0.01)
- EMA (alpha = 0.05)
- EMA (alpha = 0.10)
- EMA (alpha = 0.33)
- EMA (alpha = 0.67)
- EMA (alpha = 0.90)

- kalman filter

*also I have a few ideas for slope interpolation of weighted avg to patch in gaps in noisy best_price, but we will ignore for now*

# Model pseudo code

next, code up logistic regression to tell us if we should trade or not
`LOGISTIC(indicators) => y(t)`
or `LINEAR(indicators) => x(t), calculate z scores => y(t)`

    x(T) = statistically segnificant signals at T timestep(forward filled)
    x(T) is statsitically segniciant at T if X(T) > MEAN + Z*STD  
    where MEAN & STD come from X(T-1), X(T-2), ... X(T-N)

Y = `COINBASE_BEST_ASK(T+(LAG))` < the `KRAKEN_BEST_BID within (T+(LAG), T+(LAG)+(HOLDTIME)]`

then optimize `HOLDTIME` and `Z` & `N` with the objective function (i'm thinking gridsearch to keep it simple)

    OBJ = 0
    WE_TRADE = Y(T) >= 0.5
    if(WE_TRADE): 
      check if ask price @ T+LAG > bid price T1 @ T+LAG to T+LAG+HOLDTIME:
          if so, (OBJ += (ASK_PRICE(T+LAG) - BID_PRICE(T+LAG+T1))
          if not, (OBJ += ASK_PRICE(T+LAG) - BID_PRICE(T+LAG+T1))

    MAX OBJ W.R.T `HOLDTIME` and `Z` & `N`




In [None]:
# ID-ing signiciant signals, ideally use logistical regression for this part
window_size = 30  # law of large numbers
# TODO, not price, its like cb_ask_price
X1 = wavg_signal_ffilled['price'].rolling(window_size)
mu = X1.mean(skipna=True)
std = X1.std(skipna=True)
z_th = 1

zscores = X1.apply(lambda x: stats.zscore(x)[-1] if len(x) == window_size else np.nan)

sig = zscores > z_th # peaks
peaks = wavg_signal_ffilled['price'][sig]


In [None]:

HOLDTIME = 100 # 100 ms
LAG = 100 # 100 ms

X = peaks.iloc[:-(HOLDTIME+LAG)]
peakprice = best_signal_ffilled['kkbidprice'].rolling(HOLDTIME + LAG, min_periods=(HOLDTIME+LAG)).dropna().max()
finalprice = best_signal_ffilled['kkbidprice'].iloc[HOLDTIME+LAG:]

trade_ratio = (peakprice > finalprice).sum() / len(peakprice)
trade_ratio


Make the following two cells into a method with parameters `HOLDTIME` and 

In [None]:
from matplotlib import pyplot as plt
plt.plot(best_signal_ffilled['price_cb_asks'])
plt.plot(best_signal_ffilled['price_cb_bids'])
plt.ylim(0,1)

In [None]:
# plot ask lag
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=best_signal_ffilled['timestamp'], y=best_signal_ffilled['price_' + "cb" + "_asks"]/SCALE, mode='lines', name='Coinbase'))
fig.add_trace(go.Scatter(x=best_signal_ffilled['timestamp'], y=best_signal_ffilled['price_' + "kk" + "_asks"]/SCALE, mode='lines', name='Kraken'))
fig.show()

In [None]:
# plot bids
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=best_signal_ffilled['timestamp'], y=best_signal_ffilled['price_' + "cb" + "_bids"]/SCALE, mode='lines', name='Coinbase'))
fig.add_trace(go.Scatter(x=best_signal_ffilled['timestamp'], y=best_signal_ffilled['price_' + "kk" + "_bids"]/SCALE, mode='lines', name='Kraken'))
fig.show()

In [None]:

import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=midpt_signal['timestamp'], y=midpt_signal['midpt_' + "cb"]/SCALE, mode='lines', name='Coinbase'))
fig.add_trace(go.Scatter(x=midpt_signal['timestamp'], y=midpt_signal['midpt_' + "kk"]/SCALE, mode='lines', name='Kraken'))
fig.show()

In [109]:
def max_delayed_crosscorrelation(s1, s2):
    correlation = signal.correlate(s1, s2, mode='full', method='auto')
    
    max_corr_index = np.argmax(correlation)
    max_corr_value = correlation[max_corr_index]
    
    delay = max_corr_index - (len(s1) - 1)
    
    return max_corr_value, delay

In [None]:
print('asks', max_delayed_crosscorrelation(best_signal_ffilled['price_cb_asks'], best_signal_ffilled['price_kk_asks']))
print('bids', max_delayed_crosscorrelation(best_signal_ffilled['price_cb_bids'], best_signal_ffilled['price_kk_bids']))
print('midpt', max_delayed_crosscorrelation(midpt_signal['midpt_cb'], midpt_signal['midpt_kk']))