Skip to content

Commit

Permalink
Use polygon and fix out-of-market-hour bars
Browse files Browse the repository at this point in the history
  • Loading branch information
umitanuki committed Aug 30, 2018
1 parent 2b962d3 commit fb70479
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 52 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ dist
*.egg-info
.pytest_cache
*.pkl
.envrc
.vscode
.coverage
252 changes: 200 additions & 52 deletions pylivetrader/backend/alpaca.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd

import numpy as np
import uuid
import alpaca_trade_api as tradeapi
from alpaca_trade_api.rest import APIError
import concurrent.futures
from requests.exceptions import HTTPError
import numpy as np
import pandas as pd
from trading_calendars import get_calendar
import uuid

from .base import BaseBackend

Expand All @@ -40,6 +43,7 @@

from logbook import Logger


log = Logger('Alpaca')

NY = 'America/New_York'
Expand All @@ -48,14 +52,75 @@
one_day_offset = pd.Timedelta('1 day')


def skip_http_error(statuses):
'''
A decorator to wrap with try..except to swallow
specific HTTP errors.
@skip_http_error((404, 503))
def fetch():
...
'''

assert isinstance(statuses, tuple)

def decorator(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except HTTPError as e:
status_code = e.response.status_code
if status_code in statuses:
log.warn(str(e))
else:
raise
return wrapper
return decorator


def parallelize(mapfunc, workers=10):
'''
Parallelize the mapfunc using multithread partitioned by
symbol.
Return: func(symbols: list[str]) => dict[str -> result]
'''

def wrapper(symbols):
result = {}
with concurrent.futures.ThreadPoolExecutor(
max_workers=workers) as executor:
tasks = {}
for symbol in symbols:
task = executor.submit(mapfunc, symbol)
tasks[task] = symbol

for task in concurrent.futures.as_completed(tasks):
symbol = tasks[task]
task_result = task.result()
result[symbol] = task_result
return result

return wrapper


class Backend(BaseBackend):

def __init__(self, key_id=None, secret=None, base_url=None):
self._api = tradeapi.REST(key_id, secret, base_url)
self._cal = get_calendar('NYSE')

def _symbols2assets(self, symbols):
'''
Utility for debug/testing
'''

assets = {a.symbol: a for a in self.get_equities()}
return [assets[symbol] for symbol in symbols if symbol in assets]

def get_equities(self):
assets = []
t = normalize_date(pd.Timestamp('now', tz='America/New_York'))
t = normalize_date(pd.Timestamp('now', tz=NY))
raw_assets = self._api.list_assets(asset_class='us_equity')
for raw_asset in raw_assets:

Expand Down Expand Up @@ -98,11 +163,11 @@ def positions(self):
symbols.append(symbol)
position_map[symbol] = z_position

quotes = self._api.list_quotes(symbols)
for quote in quotes:
price = quote.last
dt = quote.last_timestamp
z_position = position_map[quote.symbol]
trades = self._symbol_trades(symbols)
for symbol, trade in trades.items():
price = trade.price
dt = trade.timestamp
z_position = position_map[symbol]
z_position.last_sale_price = float(price)
z_position.last_sale_date = dt
return z_positions
Expand Down Expand Up @@ -214,8 +279,8 @@ def cancel_order(self, zp_order_id):
return

def get_last_traded_dt(self, asset):
quote = self._api.get_quote(asset.symbol)
return pd.Timestamp(quote.last_timestamp)
trade = self._api.polygon.last_trade(asset.symbol)
return trade.timestamp

def get_spot_value(self, assets, field, dt, data_frequency):
assert(field in (
Expand All @@ -226,51 +291,66 @@ def get_spot_value(self, assets, field, dt, data_frequency):
else:
symbols = [asset.symbol for asset in assets]
if field in ('price', 'last_traded'):
quotes = self._api.list_quotes(symbols)
if assets_is_scalar:
if field == 'price':
if len(quotes) == 0:
return np.nan
return quotes[-1].last
else:
if len(quotes) == 0:
return pd.NaT
return quotes[-1].last_timestamp
results = self._get_spot_trade(symbols, field)
else:
results = self._get_spot_bars(symbols, field)
return results[0] if assets_is_scalar else results

def _get_spot_trade(self, symbols, field):
assert(field in ('price', 'last_traded'))
symbol_trades = self._symbol_trades(symbols)

def get_for_symbol(symbol_trades, symbol):
trade = symbol_trades.get(symbol)
if field == 'price':
if trade is None:
return np.nan
return trade.price
else:
return [
quote.last if field == 'price' else quote.last_timestamp
for quote in quotes
]
if trade is None:
return pd.NaT
return trade.timestamp

bars_list = self._api.list_bars(symbols, '1Min', limit=1)
if assets_is_scalar:
if len(bars_list) == 0:
return [get_for_symbol(symbol_trades, symbol) for symbol in symbols]

def _get_spot_bars(self, symbols, field):
symbol_bars = self._symbol_bars(symbols, 'minute', limit=1)

def get_for_symbol(symbol_bars, symbol, field):
bars = symbol_bars.get(symbol)
if bars is None or len(bars) == 0:
return np.nan
return bars_list[0].bars[-1]._raw[field]
bars_map = {a.symbol: a for a in bars_list}
return [
bars_map[symbol].bars[-1]._raw[field]
return bars[field].values[-1]

results = [
get_for_symbol(symbol_bars, symbol, field)
for symbol in symbols
]
return results

def get_bars(self, assets, data_frequency, bar_count=500):
'''
Interface method.
Return: pd.Dataframe() with MultiIndex [asset -> OHLCV]
'''
assets_is_scalar = not isinstance(assets, (list, set, tuple))
is_daily = 'd' in data_frequency # 'daily' or '1d'
if assets_is_scalar:
symbols = [assets.symbol]
else:
symbols = [asset.symbol for asset in assets]
timeframe = '1D' if is_daily else '1Min'

bars_list = self._api.list_bars(symbols, timeframe, limit=bar_count)
bars_map = {a.symbol: a for a in bars_list}
symbol_bars = self._symbol_bars(
symbols, data_frequency, limit=bar_count)

if is_daily:
intra_bars = {}
intra_list = self._api.list_bars(symbols, '1Min', limit=1000)
for bars in intra_list:
symbol_bars_minute = self._symbol_bars(
symbols, 'minute', limit=1000)
for symbol, bars in symbol_bars_minute.items():
symbol = bars.symbol
df = _fix_tz(bars.df)
df = bars.df
mask = (df.index.time >= pd.Timestamp('9:30').time()) & (
df.index.time < pd.Timestamp('16:00').time())
agged = df[mask].resample('1D').agg(dict(
Expand All @@ -285,23 +365,91 @@ def get_bars(self, assets, data_frequency, bar_count=500):
dfs = []
for asset in assets if not assets_is_scalar else [assets]:
symbol = asset.symbol
df = bars_map[symbol].df.copy()
df = _fix_tz(df)
df = symbol_bars.get(symbol)
if df is None:
dfs.append(
pd.DataFrame(
[],
columns=[
'open',
'high',
'low',
'close',
'volume']))
continue
if is_daily:
agged = intra_bars[symbol]
if agged.index[-1] not in df.index:
agged = intra_bars.get(symbol)
if agged is not None and len(
agged.index) > 0 and agged.index[-1] not in df.index:
assert agged.index[-1] > df.index[-1]
df = df.append(agged.iloc[-1])
df.columns = pd.MultiIndex.from_product([[asset, ], df.columns])
dfs.append(df)

if len(dfs) > 0:
return pd.concat(dfs, axis=1)

return pd.DataFrame()


def _fix_tz(df):
if df.index.tz is None:
df.index = df.index.tz_localize('UTC').tz_convert(NY)
return df
return pd.concat(dfs, axis=1)

def _symbol_bars(
self,
symbols,
frequency,
_from=None,
to=None,
limit=None):
'''
Query historic_agg either minute or daily in parallel
for multiple symbols, and return in dict.
symbols: list[str]
frequency: str ('daily', 'minute')
_from: str or pd.Timestamp
to: str or pd.Timestamp
limit: str or int
return: dict[str -> pd.DataFrame]
'''
assert frequency in ('daily', 'minute')

# temp workaround for less bars after masking by
# market hours
query_limit = limit
if query_limit is not None:
query_limit *= 2
size = 'day' if frequency == 'daily' else 'minute'

@skip_http_error((404, 504))
def fetch(symbol):
df = self._api.polygon.historic_agg(
size, symbol, _from, to, query_limit).df

# zipline -> right label
# API result -> left label (beginning of bucket)
if size == 'minute':
df.index += pd.Timedelta('1min')

# mask out bars outside market hours
mask = self._cal.minutes_in_range(
df.index[0], df.index[-1],
).tz_convert(NY)
df = df.reindex(mask)

if limit is not None:
df = df.iloc[-limit:]
return df

return parallelize(fetch, workers=25)(symbols)

def _symbol_trades(self, symbols):
'''
Query last_trade in parallel for multiple symbols and
return in dict.
symbols: list[str]
return: dict[str -> polygon.Trade]
'''

@skip_http_error((404, 504))
def fetch(symbol):
return self._api.polygon.last_trade(symbol)

return parallelize(fetch, workers=25)(symbols)
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ exclude =

[aliases]
test = pytest

[tool:pytest]
addopts = --verbose --cov=pylivetrader --cov-report=term-missing
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
],
tests_require=[
'pytest',
'pytest-cov',
],
setup_requires=["flake8", "pytest-runner"],
extras_require={}
Expand Down

0 comments on commit fb70479

Please sign in to comment.