<a href="https://colab.research.google.com/github/electropavuk/crypto_trader/blob/master/ipynb/system.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from enum import IntEnum
import collections
from statistics import mean

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time


pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)

np.set_printoptions(edgeitems=10, linewidth=200)
pd.options.mode.chained_assignment = None 

google_drive_dir = '/content/drive/MyDrive/Colab Notebooks/crypto_trader/'
data_dir = google_drive_dir + 'data/act_data/BTCUSDT/'

In [2]:
class Decision(IntEnum):
    SELL = -1
    WAIT = 0
    BUY = 1

# Trader

In [3]:
class Trader:
    def __init__(self):
        pass

# Data

In [4]:
class DataMaintainer:
    """Data Maintainer class for efficient data stream update.

    Args:
        _data: Dictionary that maps key to deque or another DataMaintainer instance.
        maxlen: Int. Maximum length of deque for columns.
    """

    def __init__(self, maxlen=2000):
        self._data = collections.defaultdict(DataMaintainer)
        self.maxlen = maxlen
    
    def __getitem__(self, keys):
        if not isinstance(keys, (tuple, list)):
            keys = (keys,)
        key, *other = keys
        if key not in self._data:
            raise KeyError(key)
        return self._data[key] if not other else self._data[key].__getitem__(other)

    def construct_location(self, keys):
        key, *other = keys
        return self._data[key] if not other else self._data[key].construct_location(other)

    def add(self, data, keys, location=None, maxlen=None):
        """Add data into the DataMaintainer.

        Args:
            data: Iterable. Contains columns to add.
            keys: Iterable. Contains keys for columns in same order.
            location: List. Contains subsequent keys for multikey access.
            maxlen: Int. Maximum length of deque for columns.
        """

        subunit = self.construct_location(location) if location else self
        subunit.maxlen = maxlen if maxlen is not None else self.maxlen
        for key, column in zip(keys, data):
            subunit._data[key] = collections.deque(column, maxlen=subunit.maxlen)

    def append(self, data, keys='auto'):
        """Append elements in data to deque specified by key.

        Args:
            data: Iterable. Elements to be added.
            keys: Iterable (optional). Contains keys for columns in same order.
                  By default keys are in the same order they were added.
        """
        keys = self._data.keys() if keys == 'auto' else keys
        for key, value in zip(keys, data):
            self._data[key].append(value)

    def show(self, location='', show_last=10):
        maintainers, deques = [], []
        for key, obj in self._data.items():
            lst = maintainers if isinstance(obj, DataMaintainer) else deques
            lst.append((key, obj))
        
        for key, maintainer in maintainers:
                maintainer.show(f'{location}/{key}')
        print('\n' + location)
        for key, _ in deques:
            print('{:>20}'.format(key), end='')
        print('\n' * 2)
        cols = [list(val)[-show_last:] for key, val in deques]
        for attrs in zip(*cols):
            for val in attrs:
                print('{:>20.5f}'.format(val), end='')
            print()

# Experts

In [5]:
class BaseExpert:
    """Base Expert class for decision making."""

    def __init__(self):
        self.name = 'Base Expert'
        self._inner_experts = None
        self._weights = np.ones((len(self._inner_experts), 1))

    def set_experts(self, experts):
        self._inner_experts = experts
    
    def estimate(self):
        estimations = np.array([expert.estimate() for expert in self._inner_experts])
        # return estimations @ self._weights
        return np.mean(estimations)
    
    def update(self):
        for expert in self._inner_experts:
            expert.update()


class PairExpert(BaseExpert):
    """Expert class for handling specific trading pair.

    Args:
        base: String. Name of base currency.
        quote: String. Name of quote currency.
    """

    def __init__(self, base, quote):
        self.name = f'{base}/{quote} Expert'



class TimeFrameExpert(BaseExpert):
    """Expert class for handling specific timeframe.

    Args:
        timeframe: String. Name of timeframe (Example: '1h').
    """

    def __init__(self, timeframe):
        self.timeframe = timeframe
        self.name = f'{timeframe} Expert'



class RuleExpert(BaseExpert):
    """Expert class for handling specific trading rule.

    Args:
        indicators: List of BaseIndicator. Indicators to which rule is applied.
        rule: BaseRule. Trading rule that applies to indicators.
    """

    def __init__(self, indicators, rule):
        self._indicators = indicators
        self._rule = rule
        self.name = f'{self._rule.name} {str([indicator.name for indicator in self._indicators])}'

    def set_experts(self):
        raise SystemError('Do not call this method')

    def estimate(self):
        return self._rule.decide(*self._indicators)

    def update(self):
        for indicator in self._indicators:
            indicator.update()

# Indicators

In [6]:
class BaseIndicator:
    """Base Indicator Class
    
    Args:
        _data: DataMaintainer. If needed [_data] object will suply the indicator
            with necessary information, must have [self.name] key.
        _min_history: Int. Minimum number of candlestick required for state maintenance # ????????????
    """

    name = 'Base Indicator'

    def __init__(self, data):
        self._data = None
        self._state = None

    def set_name(self):
        return f'{self.name} {self.get_parameters()}'

    def get_parameters(self):
        raise NotImplementedError()

    def init_state(self):
        """Calculate initial state"""
        raise NotADirectoryError()

    def update(self):
        raise NotImplementedError()


class MovingAverageIndicator(BaseIndicator):
    name = 'MA'

    def __init__(self, data, period):
        self.period = period
        self.name = self.set_name()

        self.data = data
        self.data.add(data=[[]], keys=[self.name], maxlen=self.period)
        self._close = self.data['History', 'Close']
        self._column = self.data[self.name]
        self.init_state()

    def init_state(self):
        self._state = 0
        self._window = collections.deque([], maxlen=self._column.maxlen)
        for val in self._close:
            if len(self._column) == self._column.maxlen:
                self._state -= self._window.popleft() / self.period
            self._state += val / self.period
            self._window.append(val)
            self._column.append(self._state)
            
    def update(self):
        self._state += (-self._window.popleft() + self._close[-1]) / self.period
        self._window.append(self._close[-1])
        self._column.append(self._state)

    def get_parameters(self):
        return [self.period]

# Rules

In [7]:
class BaseRule:
    name = 'Base Rule'
    
    def __init__(self):
        self._state = None
        self._patience = 1

    def update(self):
        raise NotImplementedError()
    
    def decide(self):
        raise NotImplementedError()

class BaseCrossoverRule(BaseRule):
    def cross(self, a, b):
        a, b = a.nlast(self._patience + 1).reshape(-1), b.nlast(self._patience + 1).reshape(-1)
        diff = a > b
        first, *rest = diff
        if not first and all(rest):
            return 'up'
        elif first and not any(rest):
            return 'down'
        else:
            return False


class BaseTrasholdRule(BaseRule):
    pass


class BaseDirectionChangeRule(BaseRule):
    pass


class MACrossoverRule(BaseCrossoverRule):
    def decide(self, slow, fast):
        cross = self.cross(fast, slow)
        if cross == 'up':
            return Decision.BUY
        elif cross == 'down':
            return Decision.SELL
        else:
            return Decision.WAIT


# Simulation

In [8]:
class Simulation:
    def simulate(self, history, timeframe, n, money=100, commision=.00075):
        base, quote = 'BTC', 'USDT'
        old_history, new_history = history.iloc[:-n], history.iloc[-n:]

        data = DataMaintainer()
        data.add(data=old_history.values.T, keys=list(history), location=['History'])

        ind = MovingAverageIndicator(data, 7)

        for idx, row in new_history.iterrows():
            data['History'].append(row)
            ind.update()

        data.show()

def load_history(filename):
    return pd.read_csv(data_dir + filename)

history = load_history('1h.csv')
n = 24 * 365
simulation = Simulation()
simulation.simulate(history, '1h', n)


/History
           Open time                Open                High                 Low               Close              Volume          Close time  Quote asset volume    Number of tradesTaker buy base asset volumeTaker buy quote asset volume              Ignore


 1627491600000.00000         40119.86000         40247.99000         38800.00000         38961.00000          6662.62756 1627495199999.00000     262720962.27113        146692.00000          2991.69068     118003342.20849             0.00000
 1627495200000.00000         38961.01000         40500.00000         38772.00000         40440.99000          7207.66161 1627498799999.00000     286746694.92022        123100.00000          3828.36463     152369647.71128             0.00000
 1627498800000.00000         40440.99000         40680.07000         39929.49000         40388.61000          4529.76023 1627502399999.00000     182776436.59890        182393.00000          2228.32465      89924406.93804             0.00000
 16275024

# Main