Skip to content

Commit

Permalink
Merge pull request #93 from AsyncAlgoTrading/iexcache
Browse files Browse the repository at this point in the history
add caching for iex data
  • Loading branch information
timkpaine committed Sep 11, 2020
2 parents e7659f5 + 26cd55e commit be6daa3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
12 changes: 10 additions & 2 deletions aat/core/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, **config):
self._handler_subscriptions = {m: [] for m in EventType.__members__.values()}

# setup `now` handler for backtest
self._latest = datetime.fromtimestamp(0) if self.trading_type in (TradingType.BACKTEST, TradingType.SIMULATION) else datetime.now()
self._latest = datetime.fromtimestamp(0) if self._offline() else datetime.now()

# register internal management event handler before all strategy handlers
self.registerHandler(self.manager)
Expand Down Expand Up @@ -149,6 +149,9 @@ def __init__(self, **config):
self.log.critical('.......')
self.api_application.listen(self.port)

def _offline(self):
return self.trading_type in (TradingType.BACKTEST, TradingType.SIMULATION)

def registerHandler(self, handler):
'''register a handler and all callbacks that handler implements
Expand Down Expand Up @@ -228,7 +231,7 @@ async def run(self):
await self.processEvent(event)

# TODO move out of critical path
if self.trading_type in (TradingType.BACKTEST, TradingType.SIMULATION):
if self._offline():
# use time of last event
self._latest = event.target.timestamp if hasattr(event, 'target') and hasattr(event.target, 'timestamp') else self._latest
else:
Expand Down Expand Up @@ -286,6 +289,11 @@ async def processEvent(self, event, strategy=None):
async def tick(self):
'''helper method to ensure periodic methods execute periodically in absence
of market data'''

# TODO periodic strategies in backtest/simulation
if self._offline():
return

while True:
yield Event(type=EventType.HEARTBEAT, target=None)
await asyncio.sleep(1)
Expand Down
1 change: 1 addition & 0 deletions aat/core/order_book/order_book_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class OrderBookLite(OrderBookBase):
exchange_name (str): name of the exchange
callback (Function): callback on events
'''

def __init__(self,
instrument,
exchange_name='',
Expand Down
40 changes: 37 additions & 3 deletions aat/exchange/public/iex.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import os
import os.path
import pandas as pd # type: ignore
import pyEX # type: ignore
from collections import deque
Expand Down Expand Up @@ -30,12 +32,13 @@
class IEX(Exchange):
'''Investor's Exchange'''

def __init__(self, trading_type, verbose, api_key, is_sandbox, timeframe='1y', start_date=None, end_date=None):
def __init__(self, trading_type, verbose, api_key, is_sandbox, timeframe='1y', start_date=None, end_date=None, cache_data=True):
super().__init__(ExchangeType('iex'))
self._trading_type = trading_type
self._verbose = verbose
self._api_key = api_key
self._is_sandbox = is_sandbox
self._cache_data = cache_data

if trading_type == TradingType.LIVE:
assert not is_sandbox
Expand Down Expand Up @@ -125,9 +128,24 @@ def _callback(record):
if self._timeframe != '1d':
for i in tqdm(self._subscriptions, desc="Fetching data..."):
if i.name in insts:
# already fetched the data, multiple subscriptions
continue

df = self._client.chartDF(i.name, timeframe=self._timeframe)
if self._cache_data:
# first, check if we have this data and its cached already
os.makedirs('_aat_data', exist_ok=True)
data_filename = os.path.join('_aat_data', 'iex_{}_{}_{}_{}.pkl'.format(i.name, self._timeframe, datetime.now().strftime('%Y%m%d'), 'sand' if self._is_sandbox else ''))

if os.path.exists(data_filename):
print('using cached IEX data for {}'.format(i.name))
df = pd.read_pickle(data_filename)
else:
df = self._client.chartDF(i.name, timeframe=self._timeframe)
df.to_pickle(data_filename)

else:
df = self._client.chartDF(i.name, timeframe=self._timeframe)

df = df[['close', 'volume']]
df.columns = ['close:{}'.format(i.name), 'volume:{}'.format(i.name)]
dfs.append(df)
Expand All @@ -142,17 +160,33 @@ def _callback(record):
else:
for i in tqdm(self._subscriptions, desc="Fetching data..."):
if i.name in insts:
# already fetched the data, multiple subscriptions
continue

date = self._start_date
subdfs = []
while date <= self._end_date:
df = self._client.chartDF(i.name, timeframe='1d', date=date.strftime('%Y%m%d'))
if self._cache_data:
# first, check if we have this data and its cached already
os.makedirs('_aat_data', exist_ok=True)
data_filename = os.path.join('_aat_data', 'iex_{}_{}_{}_{}.pkl'.format(i.name, self._timeframe, date, 'sand' if self._is_sandbox else ''))

if os.path.exists(data_filename):
print('using cached IEX data for {} - {}'.format(i.name, date))
df = pd.read_pickle(data_filename)
else:
df = self._client.chartDF(i.name, timeframe='1d', date=date.strftime('%Y%m%d'))
df.to_pickle(data_filename)
else:
df = self._client.chartDF(i.name, timeframe='1d', date=date.strftime('%Y%m%d'))

if not df.empty:
df = df[['average', 'volume']]
df.columns = ['close:{}'.format(i.name), 'volume:{}'.format(i.name)]
subdfs.append(df)

date += timedelta(days=1)

dfs.append(pd.concat(subdfs))
insts.add(i.name)

Expand Down

0 comments on commit be6daa3

Please sign in to comment.