Skip to content

Commit

Permalink
Merge pull request #94 from AsyncAlgoTrading/csv
Browse files Browse the repository at this point in the history
Add csv exchange, add runtime tests, add simple command line parser
  • Loading branch information
timkpaine committed Sep 12, 2020
2 parents be6daa3 + dba2946 commit ec938bb
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 103 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ testpycpp: ## Make unit tests
testjs: ## Make js tests
cd js; yarn test

testruns: ## Run a few examples as a live end-to-end test
$(PYTHON) -m aat.strategy.sample.readonly

lint: lintpy lintjs lintcpp ## run all linters

lintpy: ## run python linter
Expand Down
125 changes: 37 additions & 88 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,108 +332,57 @@ class Exchange(_MarketData, _OrderEntry):
```

#### Extending
Writing a custom exchange is very easy, you just need to implement the market data interface, the order entry interface, or both. Here is a simple example of implementing a market data exchange on top of IEX Cloud, with support for simulated order entry by accepting any trade submitted at the price asked for:
Writing a custom exchange is very easy, you just need to implement the market data interface, the order entry interface, or both. Here is a simple example of implementing a market data exchange on top of a CSV File, with support for simulated order entry by accepting any trade submitted at the price asked for:

```python3
class IEX(Exchange):
'''Investor's Exchange'''
import csv
from typing import List
from aat.config import EventType, InstrumentType, Side
from aat.core import ExchangeType, Event, Instrument, Trade, Order
from aat.exchange import Exchange

def __init__(self, trading_type, verbose, api_key, is_sandbox):
super().__init__(ExchangeType('iex'))
self._trading_type = trading_type
self._verbose = verbose
self._api_key = api_key
self._is_sandbox = is_sandbox
self._subscriptions = []

# "Order" management
self._queued_orders = deque()
self._order_id = 1
class CSV(Exchange):
'''CSV File Exchange'''

# *************** #
# General methods #
# *************** #
async def connect(self):
'''connect to exchange. should be asynchronous.
For OrderEntry-only, can just return None
'''
self._client = pyEX.Client(self._api_key, 'sandbox' if self._is_sandbox else 'v1')
def __init__(self, trading_type, verbose, filename: str):
super().__init__(ExchangeType('csv-{}'.format(filename)))
self._trading_type = trading_type
self._verbose = verbose
self._filename = filename
self._data: List[Trade] = []
self._order_id = 0

# ******************* #
# Market Data Methods #
# ******************* #
async def instruments(self):
'''get list of available instruments'''
instruments = []
symbols = self._client.symbols()
for record in symbols:
if not record['isEnabled']:
continue
symbol = record['symbol']
brokerExchange = record['exchange']
type = _iex_instrument_types[record['type']]
currency = Instrument(type=InstrumentType.CURRENCY, name=record['currency'])

try:
inst = Instrument(name=symbol, type=type, exchange=self.exchange(), brokerExchange=brokerExchange, currency=currency)
except AssertionError:
# Happens sometimes on sandbox
continue
instruments.append(inst)
return instruments
return list(set(_.instrument for _ in self._data))

def subscribe(self, instrument):
self._subscriptions.append(instrument)
async def connect(self):
with open(self._filename) as csvfile:
self._reader = csv.DictReader(csvfile, delimiter=',')

for row in self._reader:
self._data.append(Trade(volume=float(row['volume']),
price=float(row['close']),
maker_orders=[],
taker_order=Order(volume=float(row['volume']),
price=float(row['close']),
side=Side.BUY,
exchange=self.exchange(),
instrument=Instrument(
row['symbol'].split('-')[0],
InstrumentType(row['symbol'].split('-')[1].upper())
)
)
))

async def tick(self):
'''return data from exchange'''
dfs = []
for i in self._subscriptions:
df = self._client.chartDF(i.name, timeframe='6m')
df = df[['close', 'volume']]
df.columns = ['close:{}'.format(i.name), 'volume:{}'.format(i.name)]
dfs.append(df)

data = pd.concat(dfs, axis=1)
data.sort_index(inplace=True)
data = data.groupby(data.index).last()
data.drop_duplicates(inplace=True)
data.fillna(method='ffill', inplace=True)
for item in self._data:
yield Event(EventType.TRADE, item)

for index in data.index:
for i in self._subscriptions:
volume = data.loc[index]['volume:{}'.format(i.name)]
price = data.loc[index]['close:{}'.format(i.name)]

o = Order(volume=volume, price=price, side=Side.BUY, instrument=i, exchange=self.exchange())
o.timestamp = index.to_pydatetime()

t = Trade(volume=volume, price=price, taker_order=o, maker_orders=[])

yield Event(type=EventType.TRADE, target=t)
await asyncio.sleep(0)

while self._queued_orders:
order = self._queued_orders.popleft()
order.timestamp = index

t = Trade(volume=order.volume, price=order.price, taker_order=order, maker_orders=[])
t.my_order = order

yield Event(type=EventType.TRADE, target=t)
await asyncio.sleep(0)

# ******************* #
# Order Entry Methods #
# ******************* #
async def newOrder(self, order: Order):
'''submit a new order to the exchange. should set the given order's `id` field to exchange-assigned id
For MarketData-only, can just return None
'''
if self._trading_type == TradingType.LIVE:
raise NotImplementedError("Live OE not available for IEX")
raise NotImplementedError("Live OE not available for CSV")

order.id = self._order_id
self._order_id += 1
Expand Down
4 changes: 4 additions & 0 deletions aat/config/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ class BaseEnum(Enum):
def __str__(self):
return f'{self.value}'

@classmethod
def members(cls):
return list(cls.__members__.keys())


class TradingType(BaseEnum):
LIVE = 'LIVE'
Expand Down
65 changes: 61 additions & 4 deletions aat/config/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
import importlib
import itertools
import os
import os.path
from configparser import ConfigParser
Expand Down Expand Up @@ -29,8 +31,22 @@ def _config_to_dict(filename: str) -> Dict[str, Dict[str, Union[str, List[str],
return ret


def _args_to_dict(args):
ret = {}
ret['general'] = {}
ret['general']['verbose'] = args.verbose
ret['general']['trading_type'] = args.trading_type
ret['exchange'] = {'exchanges': list(_.split(',') for _ in itertools.chain.from_iterable(args.exchanges))}
ret['strategy'] = {'strategies': list(itertools.chain.from_iterable(args.strategies))}
return ret


def getStrategies(strategies: List) -> List:
strategy_instances = []

if not strategies:
raise Exception('Must provide strategies')

for strategy in strategies:
if isinstance(strategy, list):
mod, clazz = strategy[0].split(':')
Expand All @@ -46,6 +62,10 @@ def getStrategies(strategies: List) -> List:

def getExchanges(exchanges: List, trading_type, verbose: bool = False) -> List:
exchange_instances = []

if not exchanges:
raise Exception('Must provide exchanges')

for exchange in exchanges:
if isinstance(exchange, list):
mod, clazz = exchange[0].split(':')
Expand All @@ -60,8 +80,45 @@ def getExchanges(exchanges: List, trading_type, verbose: bool = False) -> List:


def parseConfig(argv: list) -> dict:
from aat import TradingType

parser = argparse.ArgumentParser()

parser.add_argument(
'--config',
help='Config file',
default='')

parser.add_argument(
'--verbose',
action='store_true',
help='Run in verbose mode',
default=False)

parser.add_argument(
'--trading_type',
help='Trading Type in ("live", "sandbox", "simulation", "backtest")',
choices=[_.lower() for _ in TradingType.members()],
default='simulation')

parser.add_argument(
'--strategies',
action='append',
nargs='+',
help='Strategies to run in form <path.to.module:Class,args,for,strat>',
default=[])

parser.add_argument(
'--exchanges',
action='append',
nargs='+',
help='Exchanges to run on',
default=[])

args = parser.parse_args(argv)

# Every engine run requires a static config object
if len(argv) != 2:
print('usage: <python executable> -m aat <config file>')
raise Exception(f'Invalid command line: {argv}')
return _config_to_dict(argv[-1])
if args.config:
return _config_to_dict(args.config)

return _args_to_dict(args)
2 changes: 1 addition & 1 deletion aat/exchange/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Exchange(_MarketData, _OrderEntry):
exchanges can be queried for data, or send data
'''

def __init__(self, exchange: str):
def __init__(self, exchange):
self._exchange = exchange

def exchange(self):
Expand Down
2 changes: 2 additions & 0 deletions aat/exchange/generic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .csv import CSV # noqa: F401
from .kafka import Kafka # noqa: F401
94 changes: 94 additions & 0 deletions aat/exchange/generic/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import csv
from collections import deque
from typing import List
from aat.config import EventType, InstrumentType, Side, TradingType
from aat.core import ExchangeType, Event, Instrument, Trade, Order
from aat.exchange import Exchange


class CSV(Exchange):
'''CSV File Exchange'''

def __init__(self, trading_type, verbose, filename: str):
super().__init__(ExchangeType('csv-{}'.format(filename)))
self._trading_type = trading_type
self._verbose = verbose
self._filename = filename
self._data: List[Trade] = []
self._queued_orders = deque() # type: ignore
self._order_id = 1

async def instruments(self):
'''get list of available instruments'''
return list(set(_.instrument for _ in self._data))

async def connect(self):
with open(self._filename) as csvfile:
self._reader = csv.DictReader(csvfile, delimiter=',')

for row in self._reader:
self._data.append(Trade(volume=float(row['volume']),
price=float(row['close']),
maker_orders=[],
taker_order=Order(volume=float(row['volume']),
price=float(row['close']),
side=Side.BUY,
exchange=self.exchange(),
instrument=Instrument(
row['symbol'].split('-')[0],
InstrumentType(row['symbol'].split('-')[1].upper())
)
)
))

async def tick(self):
for item in self._data:
yield Event(EventType.TRADE, item)

async def newOrder(self, order: Order):
if self._trading_type == TradingType.LIVE:
raise NotImplementedError("Live OE not available for CSV")

order.id = self._order_id
self._order_id += 1
self._queued_orders.append(order)
return order


class CSV2(Exchange):
'''CSV File Exchange'''

def __init__(self, trading_type, verbose, filename: str, value_field=''):
super().__init__(ExchangeType('csv-{}'.format(filename)))
self._trading_type = trading_type
self._verbose = verbose
self._filename = filename
self._data: List[Trade] = []
self._value_field = value_field

async def instruments(self):
'''get list of available instruments'''
return list(set(_.instrument for _ in self._data))

async def connect(self):
with open(self._filename) as csvfile:
self._reader = csv.DictReader(csvfile, delimiter=',')

for row in self._reader:
if self._value_field:
value = row[self._value_field]
if 'value' in row:
value = row['value']
elif 'price' in row:
value = row['price']
elif 'close' in row:
# OHLC data
value = row['close']
else:
raise Exception('Must provide a value field or "value", "price", "close"')

_ = value
# TODO make this smarter or more configureable


Exchange.registerExchange('csv', CSV)
14 changes: 14 additions & 0 deletions aat/exchange/generic/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from aat.exchange import Exchange
from aat.core import ExchangeType


class Kafka(Exchange):
'''Kafka Exchange'''

def __init__(self, trading_type, verbose):
super().__init__(ExchangeType('kafka'))
self._trading_type = trading_type
self._verbose = verbose


Exchange.registerExchange('kafka', Kafka)

0 comments on commit ec938bb

Please sign in to comment.