Skip to content

Commit

Permalink
Support for data passthrough using ZMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Jul 19, 2019
1 parent 54afe64 commit 8392eee
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
9 changes: 8 additions & 1 deletion config.yaml
Expand Up @@ -41,9 +41,16 @@ book_delta: True
# Number of deltas between full book updates
book_delta_window: 10000

# Where to store the data. Currently arctic, influx, and parquet are supported. More than one can be enabled
# Where to store the data. Currently arctic, influx, elastic, and parquet are supported. More than one can be enabled
storage: [arctic]

# Configurable passthrough for data - data will be sent in realtime (no aggregation in redis)
pass_through:
type: zmq
host: '127.0.0.1'
port: 5678


elastic:
host: 'http://localhost:9200'
user: null
Expand Down
27 changes: 22 additions & 5 deletions cryptostore/collector.py
Expand Up @@ -51,15 +51,32 @@ def run(self):
kwargs = {'host': self.config['kafka']['ip'], 'port': self.config['kafka']['port']}

if TRADES in self.exchange_config:
cb[TRADES] = trade_cb(**kwargs)
cb[TRADES] = [trade_cb(**kwargs)]
if L2_BOOK in self.exchange_config:
cb[L2_BOOK] = book_cb(key=L2_BOOK, depth=depth, **kwargs)
cb[L2_BOOK] = [book_cb(key=L2_BOOK, depth=depth, **kwargs)]
if book_up:
cb[BOOK_DELTA] = book_up(key=L2_BOOK, **kwargs)
cb[BOOK_DELTA] = [book_up(key=L2_BOOK, **kwargs)]
if L3_BOOK in self.exchange_config:
cb[L3_BOOK] = book_cb(key=L3_BOOK, depth=depth, **kwargs)
cb[L3_BOOK] = [book_cb(key=L3_BOOK, depth=depth, **kwargs)]
if book_up:
cb[BOOK_DELTA] = book_up(key=L3_BOOK, **kwargs)
cb[BOOK_DELTA] = [book_up(key=L3_BOOK, **kwargs)]


if 'pass_through' in self.config:
if self.config['pass_through']['type'] == 'zmq':
from cryptofeed.backends.zmq import TradeZMQ, BookDeltaZMQ, BookZMQ
import zmq
host = self.config['pass_through']['host']
port = self.config['pass_through']['port']

if TRADES in cb:
cb[TRADES].append(TradeZMQ(host=host, port=port, zmq_type=zmq.PUB))
if BOOK_DELTA in cb:
cb[BOOK_DELTA].append(BookDeltaZMQ(host=host, port=port, zmq_type=zmq.PUB))
if L2_BOOK in cb:
cb[L2_BOOK].append(BookZMQ(host=host, port=port, zmq_type=zmq.PUB))
if L3_BOOK in cb:
cb[L3_BOOK].append(BookZMQ(host=host, port=port, zmq_type=zmq.PUB))

fh.add_feed(self.exchange, book_interval=window, config=self.exchange_config, callbacks=cb)
fh.run()
3 changes: 2 additions & 1 deletion setup.py
Expand Up @@ -45,7 +45,8 @@
'kafka': ['aiokafka', 'confluent-kafka'],
'arctic': ['arctic'],
'gcs': ['google-cloud-storage'],
'aws': ['boto3']
'aws': ['boto3'],
'zmq': ['pyzmq']
},
entry_points = {
'console_scripts': ['cryptostore=cryptostore.bin.cryptostore:main'],
Expand Down

0 comments on commit 8392eee

Please sign in to comment.