1. Data Feed       -> Passing to strategy
2. Strategy        -> Signal creation
3. Portolio        -> Signal approval and order creation
4. Order execution -> Order execution, feedback

### Abstract classes

In [1]:
from abc import ABC, abstractmethod
import threading, queue
from glob import glob
import time
import csv
import datetime as dt
from typing import List, Any, Set
from dataclasses import dataclass

from enum import Enum

import os

Бесконечно слушает данные, предаставляя бары по запросу, в порядке FIFO

### Data Handler

In [2]:
### ============= DataHandler =============

class DataHandler(ABC):
    _paused = False
    _exited = False
    
    def __init__(self, queue_size):
        self.data = queue.Queue(queue_size)
        self.t = threading.Thread(target=self.__run)
    
    def pause(self):
        self._paused = True
        
    def unpause(self):
        self._paused = False
        
    def exit(self):
        self._exited = True
        
    def run(self):
        self.unpause()
        self.t.start()
        
    def __run(self):
        while not self._exited:
            
            if self._paused:
                time.sleep(1)
                continue
                
            bars = self.update()
            self.data.put(bars)
    
    def get(self, n: int = 1) -> List[Any]:
        """ Returns n elements from queue """
        return [self.data.get() for _ in range(n)]
        
    @abstractmethod     
    def update(self):
        return NotImplementedError("Implement update method!")

In [10]:
@dataclass
class Bars:
    symbol:    None
    startTime: None
    time:      dt.datetime
    close:     float
    open:      float
    high:      float
    low:       float
    volume:    float
        
    def __post_init__(self):
        self.startTime = dt.datetime.strptime(self.startTime, '%Y-%m-%dT%H:%M:%S%z').replace(tzinfo=None)
        self.time      = dt.datetime.fromtimestamp(float(self.time) / 1000)
        self.close     = float(self.close)
        self.open      = float(self.open)
        self.high      = float(self.high)
        self.volume    = float(self.volume)
        self.low       = float(self.low)
        

class MyDataHandler(DataHandler):
    def __init__(self, paths, events, symbols=None, queue_size: int = 5, feed_name: str = 'DataFeed'):
        super().__init__(queue_size)
        
        assert len(paths) > 0
        self.paths       = paths
        self.symbols     = [(os.path.basename(path).split('.')[0], path) for path in self.paths] if not symbols else symbols
        self._iterators  = [(symbol, csv.DictReader(open(path, 'r'))) for symbol, path in self.symbols]
        self.last_prices = list()
    
    def get_latest_bars(self, symbols):
        return list(filter(lambda bars: bars.symbol in symbols, self.last_prices))
        
    def update(self):
        self.last_prices = [Bars(**next(iterator), symbol=symbol) for symbol, iterator in self._iterators] 
        return self.last_prices
    


### Usage

In [11]:
paths = glob('../data/*.csv')
dh = MyDataHandler(paths, None, queue_size=20)
dh.run()
dh.get(4)

[[Bars(symbol='ABNBUSD', startTime=datetime.datetime(2020, 12, 9, 4, 30), time=datetime.datetime(2020, 12, 9, 7, 30), close=59.865, open=60.0775, high=60.0775, low=59.865, volume=0.0),
  Bars(symbol='ETHUSDT', startTime=datetime.datetime(2020, 3, 28, 14, 40), time=datetime.datetime(2020, 3, 28, 17, 40), close=128.455, open=128.82, high=128.84, low=128.455, volume=0.0),
  Bars(symbol='BTCUSDT', startTime=datetime.datetime(2020, 3, 28, 14, 40), time=datetime.datetime(2020, 3, 28, 17, 40), close=6224.25, open=6240.75, high=6241.0, low=6223.5, volume=0.0)],
 [Bars(symbol='ABNBUSD', startTime=datetime.datetime(2020, 12, 9, 4, 35), time=datetime.datetime(2020, 12, 9, 7, 35), close=59.99, open=59.865, high=59.99, low=59.865, volume=0.0),
  Bars(symbol='ETHUSDT', startTime=datetime.datetime(2020, 3, 28, 14, 45), time=datetime.datetime(2020, 3, 28, 17, 45), close=128.565, open=128.455, high=128.67, low=128.42, volume=0.0),
  Bars(symbol='BTCUSDT', startTime=datetime.datetime(2020, 3, 28, 14, 45

### Portfolio

#### Position,  Signal, OrderTypes, DirectionTypes

In [12]:
class OrderTypes(Enum):
    limit = 0
    market = 1
    stop_loss = 2
    market_stop_loss = 3
    trailing_stop = 4
    take_profit = 5
    limit_take_profit = 6
    
class DirectionTypes(Enum):
    SHORT = -1
    EXIT = 0
    LONG = 1
    
@dataclass
class Signal:
    symbol: str
    price: float = None
    volume: float = None
    direction_type: DirectionTypes = None
    enter_date: dt.datetime = None
    enter_order_type: OrderTypes = None
    
@dataclass
class Position:
    symbol: str
    price: float = None
    volume: float = None
    direction_type: DirectionTypes = None
        
    enter_date: dt.datetime = dt.datetime.now()
    enter_order_type: OrderTypes = None
        
    filled: bool = False
    market_value: float = None
        
    def __post_init__(self):
        assert isinstance(self.price, (float, int))
        assert isinstance(self.volume, (float, int))
        assert isinstance(self.direction_type, DirectionTypes)
        assert isinstance(self.enter_date, (dt.datetime, dt.date))
        self.update_market_value()
        
        
    def set_price(self, new_price):
        assert isinstance(new_price, (float, int))
        assert new_price > 0
        self.price = new_price
        self.update_market_value()
        
    
    def add_position_value(self, value):
        assert isinstance(value, (float, int))
        self.volume += value
        self.update_market_value()

    def update_market_value(self):
        self.market_value = self.price * self.volume
        
    def set_filled(self):
        self.filled = True
    
    def __hash__(self):
        return hash(self.symbol + str(self.enter_date.timestamp()))
    
    def __repr__(self):
        return 'Position(enter_date={}, price={}, volume={}, direction={})'.format(self.enter_date,
                                                                     self.price, self.volume, self.direction_type)

In [77]:
class Portfolio(ABC):
    
    positions: List[Position] = list()
    positions_names: Set[str] = set()

    def add_position(self, position):
        self.positions.append(position)
        self.positions_names.add(position.symbol)
        
    @abstractmethod
    def process_signal(self):
        """ Пойдут в execution simulator """
        pass
    
    @abstractmethod
    def process_fill(self):
        """ Пойдут в holdings """
        pass
    
    
class MyPortfolio(Portfolio):
    
    def __init__(self, data_handler: DataHandler, initial_cash: float = 100_000,
                 start_date: dt.datetime = dt.datetime(2020, 1, 1),
                 maximum_per_share: float = 0.5, one_lot_value: float = 0.1):
        
        self.cash = initial_cash
        self.start_date = start_date
        self.max_per_share = maximum_per_share
        self.total_value = self.cash
        self.dh = data_handler
        self.value = self.cash
        self.one_lot_value = one_lot_value
        
    
    def update_positions_price(self):
        """ Обновляем цены активов в позициях """
        bars = self.dh.get_latest_bars(self.positions_names)
        
        for position in self.positions:
            new_bars = list(filter(lambda bar: bar.symbol == position.symbol, bars))
            
            if new_bars:
                position.set_price(new_bars[-1].close)
                
        self.value = sum(position.price * position.volume for position in self.positions) + self.cash
        

    def process_signal(self, signal: Signal):
        """ лонгуем пока размер позиции не превышает порога """
        self.update_positions_price()
        
        # Проверяем, что не лонгуем инструмент на величину большую максимально допустимой
        if signal.direction_type == DirectionTypes.LONG:  
            position = filter(lambda position: position.symbol == signal.symbol, self.positions)            
            pos_size = sum(pos.price * pos.volume for pos in position)
            pos_size += signal.price * signal.volume
            
            if pos_size / self.value > self.max_per_share:
                return
        
        if signal.enter_date < self.start_date:
            return
        
        if signal.price * signal.volume > self.cash:
            return
        
        if signal.direction_type == DirectionTypes.EXIT:
            if signal.symbol not in self.positions_names:
                return
            
        return signal

    def process_fill(self, position: Position):
        self.add_position(position)
        
    

In [88]:
start_date = dt.datetime(year=2020, month=1, day=1).date()

s = Signal(symbol='BTCUSDT',
           price=2000,
           volume=1,
           direction_type=DirectionTypes.LONG,
           enter_date=dt.datetime.now().date())

p = MyPortfolio(data_handler=dh, start_date=start_date, maximum_per_share=0.2)
signal = p.process_signal(s)

### Broker
Хранит историю сделок, готовит репорты и отменяет ордеры в случае чего

In [None]:
class Broker(ABC):
    
    def process_order():
        pass
    
class MyBroker(Broker):
    pass

In [85]:
signal

Signal(symbol='BTCUSDT', price=2000, volume=1, direction_type=<DirectionTypes.LONG: 1>, enter_date=datetime.date(2021, 10, 28), enter_order_type=None)

In [86]:
p.positions

[]

### Strategy

In [24]:
class Strategy(ABC):
    
    @abstractmethod
    def generate_signal(self):
        pass


class BuyAndHoldStrategy(Strategy):
    def __init__(self, dh: DataHandler, portfolio: Portfolio):
        self.dh = dh
    
    def generate_signal(self):
        symbols = [symbol for symbol, _ in self.symbols]
        
        
s = BuyAndHoldStrategy(dh, p)

In [None]:
def backtest(events: queue.Queue(), bars: DataHandler, strategy: Strategy,
             portfolio: Portfolio, broker: ExecutionHandler) -> None:

    while True:
        # Update the bars (specific backtest code, as opposed to live trading)
        if bars.continue_backtest == True:
            bars.update_bars()
        else:
            break

        # Handle the events
        while True:
            try:
                event = events.get(False)

                if event is not None:
                    if event.type == EventTypes.MARKET:
                        strategy.calculate_signals(event)
                        portfolio.update_timeindex(event)

                    elif event.type == EventTypes.SIGNAL:
                        portfolio.update_signal(event)

                    elif event.type == EventTypes.ORDER:
                        broker.execute_order(event)

                    elif event.type == EventTypes.FILL:
                        portfolio.update_fill(event)

            except queue.Empty:
                break

In [24]:
data = queue.Queue(1)

In [25]:
data.put(1)

In [27]:
data.get()

KeyboardInterrupt: 

In [19]:
data.put(2)

KeyboardInterrupt: 

In [5]:
!ls ../data

ABNBUSD.csv BTCUSDT.csv ETHUSDT.csv
