Skip to content

Commit

Permalink
Merge pull request #20 from bmoscon/elastic
Browse files Browse the repository at this point in the history
Elasticsearch improvements
  • Loading branch information
bmoscon committed Jul 24, 2019
2 parents 0e5ef96 + 213cdf4 commit 61d42f5
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 10 deletions.
5 changes: 4 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ pass_through:


elastic:
host: 'http://localhost:9200'
host: 'http://127.0.0.1:9200'
user: null
token: null
shards: 10
replicas: 0
refresh_interval: '30s'

influx:
host: 'http://127.0.0.1:8086'
Expand Down
8 changes: 8 additions & 0 deletions cryptostore/aggregator/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,20 @@ def read(self, exchange, dtype, pair):

LOG.info("%s: Read %d messages from Redis", key, len(data[0][1]))
ret = []

for update_id, update in data[0][1]:
if dtype in {L2_BOOK, L3_BOOK}:
update = json.loads(update['data'])
update = book_flatten(update, update['timestamp'], update['delta'])
for u in update:
for k in ('size', 'amount', 'price', 'timestamp'):
if k in u:
u[k] = float(u[k])
ret.extend(update)
if dtype == TRADES:
for k in ('size', 'amount', 'price', 'timestamp'):
if k in update:
update[k] = float(update[k])
ret.append(update)
self.ids[key].append(update_id)

Expand Down
2 changes: 1 addition & 1 deletion cryptostore/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run(self):
trade_cb = TradeStream
book_cb = BookStream
book_up = BookDeltaStream if not depth and self.config['book_delta'] else None
kwargs = {'host': self.config['redis']['ip'], 'port': self.config['redis']['port']}
kwargs = {'host': self.config['redis']['ip'], 'port': self.config['redis']['port'], 'numeric_type': float}
elif cache == 'kafka':
from cryptofeed.backends.kafka import TradeKafka, BookKafka, BookDeltaKafka
trade_cb = TradeKafka
Expand Down
5 changes: 1 addition & 4 deletions cryptostore/data/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ def write(self, exchange, data_type, pair, timestamp):

if data_type == TRADES:
df['id'] = df['id'].astype(str)
df['size'] = df.amount.astype('float64')
df['price'] = df.price.astype('float64')
df['size'] = df.amount
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
df = df.drop(['pair', 'feed', 'amount'], axis=1)
chunk_size = 'H'
elif data_type in { L2_BOOK, L3_BOOK }:
df['size'] = df['size'].astype('float64')
df['price'] = df.price.astype('float64')
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
chunk_size = 'T'

Expand Down
25 changes: 23 additions & 2 deletions cryptostore/data/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
'''
import json
import itertools
import logging

import requests

from cryptostore.data.store import Store


LOG = logging.getLogger('cryptostore')


def chunk(iterable, length):
return (iterable[i : i + length] for i in range(0, len(iterable), length))

Expand All @@ -22,16 +26,33 @@ def __init__(self, config: dict):
self.host = config.host
self.user = config.user
self.token = config.token
self.settings = {'settings': {
"index" : {
"number_of_shards" : config.shards,
"number_of_replicas" : config.replicas,
"refresh_interval": config.refresh_interval
}
}
}

def aggregate(self, data):
self.data = data

def write(self, exchange, data_type, pair, timestamp):
for c in chunk(self.data, 100000):
data = itertools.chain(*zip([json.dumps({ "index":{} })] * len(c), [json.dumps(d) for d in c]))
if requests.head(f"{self.host}/{data_type}").status_code != 200:
r = requests.put(f"{self.host}/{data_type}", data=json.dumps(self.settings), auth=(self.user, self.token), headers={'content-type': 'application/json'})
if r.status_code != 200:
LOG.error("Elasticsearch Index creation failed: %s", r.text)
r.raise_for_status()

LOG.info("Writing %d documents to Elasticsearch", len(self.data))
for c in chunk(self.data, 10000):
data = itertools.chain(*zip(['{"index": {}}'] * len(c), [json.dumps(d) for d in c]))
data = '\n'.join(data)
data = f"{data}\n"
r = requests.post(f"{self.host}/{data_type}/{data_type}/_bulk", auth=(self.user, self.token), data=data, headers={'content-type': 'application/x-ndjson'})
if r.status_code != 200:
LOG.error("Elasticsearch insert failed: %s", r.text)
r.raise_for_status()
self.data = None

Expand Down
2 changes: 0 additions & 2 deletions cryptostore/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ def aggregate(self, data):
for entry in data:
for key in entry:
val = entry[key]
if key in {'timestamp', 'amount', 'size', 'price'}:
val = float(val)
cols[key].append(val)
arrays = [pa.array(cols[col]) for col in cols]
table = pa.Table.from_arrays(arrays, names=names)
Expand Down

0 comments on commit 61d42f5

Please sign in to comment.