## Install Prerequisites


In [1]:
!pip install gym
!pip install numpy



In [2]:
import sys
import numpy as np
import pandas as pd
import gym
import os
from gym import spaces
from collections import OrderedDict, namedtuple
from pathlib import Path
from dataclasses import dataclass
from typing import Tuple, Union, List, Dict
import matplotlib.pyplot as plt

%matplotlib inline

In [3]:
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from indicators import (indicators, normalize)

In [4]:
FILES = [Path(os.getcwd()) / "../data/xtbusd_1h_2019.csv"]
COLUMNS = ["idx",
           "Time",
           "Period",
           "Open",
           "High",
           "Low",
           "Close",
           "PriceAverage",
           "TradeAverage",
           "SellAverage",
           "BuyAverage",
           "TradeMedian",
           "Volume",
           "TradeCount"]
INIT_ACCOUNT_BALANCE = 10_000
MAX_ACCOUNT_BALANCE = 1_000_0000
MAX_NUMBER_SHARES = 1000
MAX_PER_COST = 100_000
FEE_PERCENT = 0.001

In [5]:
INDICATOR_PADDING = 50

INDICATORS: Dict[str, indicators.Set] = {
    "Close": indicators.Set([
        indicators.SMA(12),
        # indicators.SMA(24),
        indicators.SMA(50),
        indicators.EMA(12),
        # indicators.EMA(24),
        indicators.EMA(50),
        indicators.BBANDS(16,2),
        indicators.BBANDS(16,3),
    ]),
    "BuySellDiff": indicators.Set([
        indicators.SMA(12),
        # indicators.SMA(24),
        indicators.SMA(50),
        indicators.EMA(12),
        # indicators.EMA(24),
        indicators.EMA(50),
    ], prepend="BuySellDiff"),
    "VolumeEma": indicators.Set([
        indicators.EMA(12),
        # indicators.EMA(24),
        indicators.EMA(50),
    ], prepend="Volume"),
    "ATR": indicators.Set([
        indicators.ATR(14),
    ]),
    "KelBbandDif": indicators.Set([
        indicators.KelBbandDif(),
    ]),
    "RSI": indicators.Set([
        indicators.RSI(10),
        indicators.RSI(20),
    ])
}

In [21]:

class DataFile:

    def __init__(self, file: str, skip_rows: int, columns: List[str], dtype = np.float32):
        self.file = file
        self.skip_rows = skip_rows
        self.columns = columns
        self.data: Union[np.ndarray,None] = None
        self.prices: Union[np.array,None] = None
        self.dtype = dtype
        self.__shape: Tuple[int, int] = (0,0)
        self._len = 0

    def load(self) -> np.ndarray:
        """ loads data from the file into memory and applies our transformations """
        if self.data is not None:
            # already loaded
            return self.data

        df = pd.read_csv(self.file,
                         header=None,
                         names=self.columns,
                         skiprows=self.skip_rows,
                         dtype=self.dtype)

        column_sets = []

        # apply all indicator sets
        df = INDICATORS["Close"].concat(df, df["Close"])
        column_sets.append(["Close", "Open", "High", "Low", "PriceAverage"] + INDICATORS["Close"].output_names())

        # RSIs
        df = INDICATORS["RSI"].concat(df, df["Close"])
        column_sets.append(INDICATORS["RSI"].output_names())

        # Kelter channel bband diff
        df = INDICATORS["KelBbandDif"].concat(df, df["High"], df["Low"], df["Close"])
        column_sets.append(INDICATORS["KelBbandDif"].output_names())

        # Volume moving averages
        df = INDICATORS["VolumeEma"].concat(df, df["Volume"])
        column_sets.append(INDICATORS["VolumeEma"].output_names() + ["Volume"])

        # get the diff between the buy average and the sell average
        df["BuySellDiff"] = df["BuyAverage"] - df["SellAverage"]
        df = INDICATORS["BuySellDiff"].concat(df, df["BuySellDiff"])
        column_sets.append(INDICATORS["BuySellDiff"].output_names() + ["BuySellDiff"])

        # drop all rows with empty values
        df = df.dropna()

        # get the first normalized frame
        tdf = normalize.min_max_dataframe(df[column_sets.pop(0)])

        # loop over the rest of the column sets and concat the resulting frames
        for cols in column_sets:
            tdf = pd.concat((tdf, normalize.min_max_dataframe(df[cols])), axis=1, join="outer")

        self.prices = tdf["Close"].to_numpy(dtype=self.dtype)
        # store the data as a numpy array
        self.data = tdf.to_numpy(dtype=self.dtype)
        self._len = len(self.data)
        # set the shape of the data
        self.__shape = self.data.shape

        print(f"loaded {self._len} rows of data from {self.file}")
        return self.data

    @property
    def shape(self):
        return self.__shape

    def unload(self):
        """ unloads the data from memory """
        print(f"unloaded data from {self.file}")
        self.data = None

    def sample(self, idx: int, window: int) -> np.ndarray:
        """ sample returns a sample from the data set """
        if self.data is None:
            self.load()
        return self.data[idx:idx+window]

    def price_at(self, idx: int):
        """ sample returns a sample from the data set """
        if self.data is None:
            self.load()
        return self.prices[idx]

    def __len__(self):
        return self._len

In [22]:
data_file = DataFile(
    file=FILES[0],
    columns=COLUMNS,
    skip_rows=1
)

data_file.load()
print(data_file.sample(0,1)[0])


       Close      Open      High       Low  PriceAverage  SMA p=12  SMA p=50  \
49  0.059188  0.053954  0.059679  0.053627      0.056456  0.054361   0.05025   

    EMA p=12  EMA p=50  BBANDS UPPER p=16, d=2.0  ...  \
49  0.054832   0.05025                  0.057865  ...   

    KelBbandDif Upper p=20, bd=2.0, km=2  \
49                              0.268727   

    KelBbandDif Lower p=20, bd=2.0, km=2  VolumeEMA p=12 p=12  \
49                              0.278435             0.153768   

    VolumeEMA p=50 p=50    Volume  BuySellDiffSMA p=12 p=12  \
49             0.137994  0.256504                   0.50909   

    BuySellDiffSMA p=50 p=50  BuySellDiffEMA p=12 p=12  \
49                  0.507613                  0.510864   

    BuySellDiffEMA p=50 p=50  BuySellDiff  
49                  0.507613     0.532082  

[1 rows x 27 columns]
loaded 15101 rows of data from /Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv
[0.05918757 0.05395415 0.05967897 0.05362655 0

In [23]:
@dataclass
class DataSlice:
    file_idx: int
    idx: int


class CryptoCandles(gym.Env):
    """crypto candles environment"""
    
    def __init__(self, csv_files: list, window: int):
        super(CryptoCandles, self).__init__()
        assert window >= 5, "window size must be 5 or greater"
        self.window = window
        self.files = OrderedDict()
        self.slice_map = OrderedDict()

        slice_idx = 0
        self.cols = 0

        for i in range(len(csv_files)):

            self.files[i] = DataFile(
                file=csv_files[i],
                skip_rows=1,
                columns=COLUMNS,
            )

            # load the data file
            self.files[i].load()
            data_len = len(self.files[i])

            if self.cols != 0:
                assert(self.files[i].shape[1] == self.cols), "number of columns in all files must match"
            else:
                self.cols = self.files[i].shape[1]

            # add slices for
            for j in range(data_len):
                if j + self.window > data_len:
                    # cannot add any more
                    break
                self.slice_map[slice_idx] = DataSlice(
                    file_idx=i,
                    idx=j,
                )
                slice_idx += 1

        self._len = len(self.slice_map)

        assert(self._len > 0), "length of chart slices from loaded from the csv file(s) must be greater than 1"

        # action space is discrete, we'll either buy 100%, sell 100% or hold
        self.action_space = spaces.Discrete(3)
        # observation space will contain last *window*

        self.observation_space = spaces.Box(low=0, high=1, shape=(self.window*self.cols+6,), dtype=np.float32)

        # values that will be reset
        self.balance = 0
        self.net_worth = 0
        self.max_net_worth = 0
        self.amount_held = 0.
        self.cost_basis = 0.
        self.total_sold = 0.
        self.total_sales_value = 0.
        self.current_step = 0
        self.balance_history = [INIT_ACCOUNT_BALANCE]
        self.reward = 0
        self.reward_history = []
        self.trade_count = 0
        self.terminal = False
        self.state = None

    def _idx(self, idx=0) -> Tuple[int, int]:
        assert idx <= self._len, f"index {idx} out of range"
        return self.slice_map[idx].file_idx, self.slice_map[idx].idx

    def _sample(self, idx=0) -> np.ndarray:
        f, s = self._idx(idx)
        return self.files[f].sample(s, self.window).flatten()

    def _price_at(self, idx=0) -> np.ndarray:
        f, s = self._idx(idx)
        return self.files[f].price_at(s)

    def get_state(self) -> np.ndarray:
        """ get the current step's observation """
        sample = self._sample(self.current_step)
        addition_data = [
            min(self.balance / MAX_ACCOUNT_BALANCE, 1),
            min(self.max_net_worth / MAX_ACCOUNT_BALANCE, 1),
            min(self.amount_held / MAX_NUMBER_SHARES, 1),
            min(self.cost_basis / MAX_PER_COST, 1),
            min(self.total_sold / MAX_NUMBER_SHARES, 1),
            min(self.total_sales_value / (MAX_NUMBER_SHARES * MAX_PER_COST), 1),
        ]

        for x in addition_data:
            sample = np.append(sample, x)

        return sample

    def _sell(self):
        current_price = self._price_at(self.current_step)
        """ sell the stock """
        if self.amount_held == 0:
            # cannot sell 0
            return

        self.total_sold += self.amount_held
        self.balance += self.amount_held * current_price * (1 + FEE_PERCENT)
        self.total_sales_value += self.amount_held * current_price
        self.amount_held = 0
        self.cost_basis = 0
        self.net_worth = self.balance

    def _buy(self):
        round_to = 0.00001
        current_price = self._price_at(self.current_step)
        # we're buying
        if self.amount_held > 0:
            # cannot buy while holding any
            return

        buy_amount = int((self.balance / current_price) / round_to) * round_to
        self.balance -= buy_amount * current_price
        self.amount_held = buy_amount
        self.cost_basis = current_price
        self.net_worth = buy_amount * current_price

    def take_action(self, action: int):
        """
        take action
        0 = hold
        1 = buy
        2 = sell
        """

        if action < 1:
            # nothing changes, we're holding
            return

        elif action < 2:
            self._buy()
        else:
            self._sell()

        if self.net_worth > self.max_net_worth:
            self.max_net_worth = self.net_worth

    def step(self, action: int):
        prev_net_worth = self.net_worth
        self.take_action(action)
        self.current_step += 1

        if self.current_step >= self._len:
            self.terminal = True
            self.current_step = 0

        self.state = self.get_state()
        # reward is the percentage diff between last and prev net worth
        self.reward = (self.net_worth - prev_net_worth) / ((self.net_worth - prev_net_worth) / 2)
        self.reward_history.append(self.reward)

        if self.net_worth <= 0:
            # we're fucked
            self.terminal = True

        self.balance_history.append(self.balance)

        if self.terminal:
            fig, ax = plt.subplots()
            ax.plot(self.balance_history)
            ax.set(ylabel="Balance", title="Balance History")
            plt.show()

        return self.state, self.reward, self.terminal, {}

    
    def reset(self):
        # values that will be reset
        self.balance = INIT_ACCOUNT_BALANCE
        self.net_worth = INIT_ACCOUNT_BALANCE
        self.max_net_worth = INIT_ACCOUNT_BALANCE
        self.balance_history = [INIT_ACCOUNT_BALANCE]
        self.amount_held = 0.
        self.cost_basis = 0.
        self.total_sold = 0.
        self.total_sales_value = 0.
        self.current_step = 0
        self.reward_history = []
        self.trade_count = 0
        self.terminal = False

        return self.get_state()

    def render(self, mode='human'):
        str = f"STEP # {self.current_step}\n" \
              f"BALANCE: {self.balance}\n" \
              f"NET WORTH: {self.net_worth}\n" \
              f"TRADES: {self.trade_count}"
        print(str)
        return self.state

        

In [16]:
env = CryptoCandles(FILES, 10)
env.slices

OrderedDict([(0,
              {'file': PosixPath('/Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv'),
               'data': None}),
             (1,
              {'file': PosixPath('/Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv'),
               'data': None}),
             (2,
              {'file': PosixPath('/Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv'),
               'data': None}),
             (3,
              {'file': PosixPath('/Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv'),
               'data': None}),
             (4,
              {'file': PosixPath('/Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv'),
               'data': None}),
             (5,
              {'file': PosixPath('/Users/eric/Projects/cryptonomicon/notebooks/../data/xtbusd_1h_2019.csv'),
               'data': None}),
             (6,
              {'file': PosixPath('/Users/