Skip to content

Commit

Permalink
Ticker support on cryptostore + fix redis host issue
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Sep 15, 2019
1 parent 64bac58 commit a5e53c0
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 12 deletions.
3 changes: 3 additions & 0 deletions config.yaml
Expand Up @@ -44,13 +44,16 @@ exchanges:
max_depth: 10
book_delta: true
trades: [XBTUSD]
ticker: [XBTUSD]
COINBASE:
retries: -1
l3_book:
symbols: [BTC-USD, ETH-USD]
book_delta: true
book_interval: 10000
trades: [BTC-USD, ETH-USD, ETH-BTC]
ticker: [BTC-USD]


# Where to store the data. Currently arctic, influx, elastic, and parquet are supported. More than one can be enabled
storage: [arctic, influx]
Expand Down
4 changes: 2 additions & 2 deletions cryptostore/aggregator/kafka.py
Expand Up @@ -7,7 +7,7 @@
import json
import logging

from cryptofeed.defines import L2_BOOK, L3_BOOK, TRADES
from cryptofeed.defines import L2_BOOK, L3_BOOK, TRADES, TICKER

from cryptostore.engines import StorageEngines
from cryptostore.aggregator.cache import Cache
Expand Down Expand Up @@ -59,7 +59,7 @@ def read(self, exchange, dtype, pair):
if dtype in {L2_BOOK, L3_BOOK}:
update = book_flatten(update, update['timestamp'], update['delta'])
ret.extend(update)
if dtype == TRADES:
if dtype in {TRADES, TICKER}:
ret.append(update)
return ret

Expand Down
6 changes: 3 additions & 3 deletions cryptostore/aggregator/redis.py
Expand Up @@ -9,7 +9,7 @@
import json
import time

from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, TICKER

from cryptostore.aggregator.util import book_flatten
from cryptostore.aggregator.cache import Cache
Expand Down Expand Up @@ -51,8 +51,8 @@ def read(self, exchange, dtype, pair):
if k in u:
u[k] = float(u[k])
ret.extend(update)
if dtype == TRADES:
for k in ('size', 'amount', 'price', 'timestamp'):
if dtype in {TRADES, TICKER}:
for k in ('size', 'amount', 'price', 'timestamp', 'bid', 'ask'):
if k in update:
update[k] = float(update[k])
ret.append(update)
Expand Down
14 changes: 9 additions & 5 deletions cryptostore/collector.py
Expand Up @@ -9,7 +9,7 @@
import logging

from cryptofeed import FeedHandler
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, BOOK_DELTA
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, BOOK_DELTA, TICKER


LOG = logging.getLogger('cryptostore')
Expand Down Expand Up @@ -48,23 +48,27 @@ def run(self):
self.exchange_config[callback_type] = self.exchange_config[callback_type]['symbols']

if cache == 'redis':
if self.config['redis']['ip']:
kwargs = {'ip': self.config['redis']['ip'], 'port': self.config['redis']['port'], 'numeric_type': float}
if 'ip' in self.config['redis'] and self.config['redis']['ip']:
kwargs = {'host': self.config['redis']['ip'], 'port': self.config['redis']['port'], 'numeric_type': float}
else:
kwargs = {'socket': self.config['redis']['socket'], 'numeric_type': float}
from cryptofeed.backends.redis import TradeStream, BookStream, BookDeltaStream
from cryptofeed.backends.redis import TradeStream, BookStream, BookDeltaStream, TickerStream
trade_cb = TradeStream
book_cb = BookStream
book_up = BookDeltaStream if delta else None
ticker_cb = TickerStream
elif cache == 'kafka':
from cryptofeed.backends.kafka import TradeKafka, BookKafka, BookDeltaKafka
from cryptofeed.backends.kafka import TradeKafka, BookKafka, BookDeltaKafka, TickerKafka
trade_cb = TradeKafka
book_cb = BookKafka
book_up = BookDeltaKafka if delta else None
ticker_cb = TickerKafka
kwargs = {'host': self.config['kafka']['ip'], 'port': self.config['kafka']['port']}

if callback_type == TRADES:
cb[TRADES] = [trade_cb(**kwargs)]
elif callback_type == TICKER:
cb[TICKER] = [ticker_cb(**kwargs)]
elif callback_type == L2_BOOK:
cb[L2_BOOK] = [book_cb(key=L2_BOOK, **kwargs)]
if book_up:
Expand Down
6 changes: 5 additions & 1 deletion cryptostore/data/arctic.py
Expand Up @@ -5,7 +5,7 @@
associated with this software.
'''
import pandas as pd
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, TICKER

from cryptostore.data.store import Store
from cryptostore.engines import StorageEngines
Expand All @@ -32,6 +32,10 @@ def write(self, exchange, data_type, pair, timestamp):
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
df = df.drop(['pair', 'feed', 'amount'], axis=1)
chunk_size = 'H'
elif data_type == TICKER:
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
df = df.drop(['pair', 'feed'], axis=1)
chunk_size = 'D'
elif data_type in { L2_BOOK, L3_BOOK }:
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
chunk_size = 'T'
Expand Down
7 changes: 6 additions & 1 deletion cryptostore/data/influx.py
Expand Up @@ -7,7 +7,7 @@
from decimal import Decimal
from collections import defaultdict

from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, TICKER
import requests

from cryptostore.data.store import Store
Expand Down Expand Up @@ -44,6 +44,11 @@ def write(self, exchange, data_type, pair, timestamp):
ts += 1
used_ts[pair].add(ts)
agg.append(f'{data_type}-{exchange},pair={pair} side="{entry["side"]}",id="{entry["id"]}",amount={entry["amount"]},price={entry["price"]},timestamp={entry["timestamp"]} {ts}')
elif data_type == TICKER:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
agg.append(f'{data_type}-{exchange},pair={pair} bid={entry["bid"]},ask={entry["ask"]},timestamp={entry["timestamp"]} {ts}')

elif data_type == L2_BOOK:
if len(self.data):
ts = int(Decimal(self.data[0]["timestamp"]) * 1000000000)
Expand Down

0 comments on commit a5e53c0

Please sign in to comment.