# Data Preprocessing for Training

## Trading Bot Settings

In [1]:
import os
from config import TRANSACT_FEE_RATE, STOP_LOSS_RATE, TRAILING_RATE

DATA_DPATH = os.path.join(os.getcwd(), "data")
os.chdir(os.path.pardir)

In [27]:
from src.market import MarketBase
from src.market.bitfinex import BitfinexSpot
from src.market_actor import MarketActorStub
from src.market_listener import MarketListenerStub
from src.advance_order.convertible_stop_loss import ConvertibleStopLossLogic

market = BitfinexSpot("BTC", "USD")
market_actor = MarketActorStub(TRANSACT_FEE_RATE, echo_mode=False)
market_listener = MarketListenerStub(market)
advance_order_logic = ConvertibleStopLossLogic(market_actor, market_listener,
        STOP_LOSS_RATE, trailing_rate=TRAILING_RATE, offset_as_rate=True,
        use_orderbook=False)

## Loading Data Pipeline Image

In [28]:
import pickle

with open(os.path.join(DATA_DPATH, "data_pipeline.pickle"), "rb") as file:
    data_pipeline = pickle.load(file)
    
    candle_buffers = data_pipeline.get("candle_buffers")
    features_compiler = data_pipeline.get("features_compiler")

## Fetching Candle History

In [29]:
import asyncio
import time
import numpy as np
import pandas as pd

from bfxapi import Client

async def get_candles(bfx: Client, market: MarketBase, epochs: int = 100,
    frame_resolution: str = "1m") -> pd.DataFrame:

    end = int(time.time())
    end -= (end % 60) # Truncate to minutes
    end *= 1000 # Convert to ms

    candles = await asyncio.gather(*[
        bfx.rest.get_public_candles(symbol=market.get_ticker(), start=0,
                end=(end - epoch * 10000 * 60000), limit=10000, tf=frame_resolution)
        for epoch in range(epochs)
    ])

    candles = pd.DataFrame(
        np.concatenate(candles),
        columns=["Timestamp", "Open", "Close", "High", "Low", "Volume"]
    )

    candles["Timestamp"] /= 1000 # Convert from ms to seconds
    candles["Timestamp"] = candles["Timestamp"].astype(int)

    return candles.set_index("Timestamp").sort_index()

In [30]:
candles = await get_candles(Client(), market, epochs=1)

candles

Unnamed: 0_level_0,Open,Close,High,Low,Volume
Timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1668790620,16650.0,16641.0,16650.0,16641.0,0.007074
1668790680,16642.0,16641.0,16642.0,16636.0,0.021766
1668790740,16641.0,16642.0,16644.0,16641.0,0.051292
1668790800,16642.0,16644.0,16644.0,16641.0,0.065620
1668790860,16651.0,16652.0,16652.0,16651.0,0.025000
...,...,...,...,...,...
1669393200,16492.0,16481.0,16492.0,16481.0,0.108947
1669393260,16480.0,16468.0,16480.0,16468.0,1.311781
1669393320,16468.0,16468.0,16469.0,16465.0,2.563234
1669393380,16468.0,16467.0,16470.0,16467.0,0.039476


## Generating Observations

In [31]:
from tqdm import tqdm
from src.candle_buffer import CandleBuffer
from src.indicator import FeaturesCompiler
from src.tradebook import Tradebook

def extract_observations(candle_buffers: dict[int, CandleBuffer], features_compiler: FeaturesCompiler,
    candles: pd.DataFrame):
    global true_x, obs_x, obs_y
    tradebook = Tradebook(10) # To facilitate update of orderbook
    timestamps, observations = [], []
    buffer_ready = False

    for timestamp, data in tqdm(list(candles.iterrows())):
        update_timestamp = timestamp - 1 # The passed timestamp denotes the end of time frame
        tradebook.append_trade(update_timestamp, (data.get("Open") + data.get("Close")) / 2,
                data.get("Volume"))
        
        for candle_buffer in candle_buffers.values():
            candle_buffer.update(update_timestamp, data.get("Open"), tradebook)
            candle_buffer.update(update_timestamp, data.get("High"), tradebook)
            candle_buffer.update(update_timestamp, data.get("Low"), tradebook)
            candle_buffer.update(update_timestamp, data.get("Close"), tradebook)

        if not buffer_ready:
            for candle_buffer in candle_buffers.values():
                buffer_ready = (candle_buffer.get_size() == candle_buffer.get_capacity())
                if not buffer_ready: break
                
        if buffer_ready:
            timestamps.append(timestamp)
            observations.append(features_compiler.get().copy()) # Create a copy from the memoryview
    
    return np.array(timestamps, dtype=int), np.array(observations, dtype=float)

In [32]:
obs_timestamps, observations = extract_observations(candle_buffers, features_compiler, candles)

100%|██████████| 10000/10000 [00:36<00:00, 271.96it/s]


In [33]:
# We need to replace the observations for CryptoFearAndGreed: features[0]
observations[:, 0]

array([-0.6, -0.6, -0.6, ..., -0.6, -0.6, -0.6])

In [34]:
from src.indicator.fear_and_greed import CryptoFearAndGreed

fng_timestamps, fng_values = CryptoFearAndGreed.get_hist_data()
max_index = fng_timestamps.shape[0] - 1
fng_index = 0

for obs_index, timestamp in enumerate(obs_timestamps):
    while (fng_index != max_index) and (timestamp > fng_timestamps[fng_index + 1]):
        fng_index += 1

    observations[obs_index, 0] = fng_values[fng_index]

observations[:, 0]

array([-0.52, -0.52, -0.52, ..., -0.6 , -0.6 , -0.6 ])

## Truncate Candles to Number of Observations

In [35]:
candles = candles[-observations.shape[0]:]

## Generating Outputs (Trading Signals)

In [41]:
close_values = candles["Close"].values
high_values = candles["High"].values
low_values = candles["Low"].values

In [45]:
# Iterate for long signals
long_signals = []

for open_timestamp in tqdm(range(close_values.shape[0])):
    position = market_actor.open_position(market, close_values[open_timestamp], size=1)
    advance_order = advance_order_logic.open_advance_order(position)

    for timestamp in range(open_timestamp + 1, close_values.shape[0]):
        # Simulate unfavourable development by updating low before high
        market_listener.set_current_price(low_values[timestamp])
        advance_order.update()

        market_listener.set_current_price(high_values[timestamp])
        advance_order.update()

        market_listener.set_current_price(close_values[timestamp])
        advance_order.update()

        if advance_order.filled:
            break
    
    if not advance_order.filled:
        break

    long_signals.append(int(advance_order.position.balances.get_size("USD") > 0))

long_signals = np.array(long_signals, dtype=int)
np.sum(long_signals) / long_signals.shape[0] * 100 # Percentage of signals

 88%|████████▊ | 6299/7163 [00:05<00:00, 1174.45it/s]


25.14684870614383

In [43]:
# Iterate for short signals
short_signals = []

for open_timestamp in tqdm(range(close_values.shape[0])):
    position = market_actor.open_position(market, close_values[open_timestamp], size=-1)
    advance_order = advance_order_logic.open_advance_order(position)

    for timestamp in range(open_timestamp + 1, close_values.shape[0]):
        # Simulate unfavourable development by updating high before low
        market_listener.set_current_price(high_values[timestamp])
        advance_order.update()

        market_listener.set_current_price(low_values[timestamp])
        advance_order.update()

        market_listener.set_current_price(close_values[timestamp])
        advance_order.update()

        if advance_order.filled:
            break
    
    if not advance_order.filled:
        break
    
    short_signals.append(int(advance_order.position.balances.get_size("USD") > 0))

short_signals = np.array(short_signals, dtype=int)
np.sum(short_signals) / short_signals.shape[0] * 100 # Percentage of signals

 90%|█████████ | 6457/7163 [00:06<00:00, 1024.87it/s]


41.334985287285114

## Saving Datasets

In [44]:
np.save(os.path.join(DATA_DPATH, "observations.npy"), observations)
np.save(os.path.join(DATA_DPATH, "long_signals.npy"), long_signals)
np.save(os.path.join(DATA_DPATH, "short_signals.npy"), short_signals)