Skip to content

Commit

Permalink
Fixed bug, so won't attempt to deserialise empty objects from Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed Nov 5, 2019
1 parent d8c929e commit b679ced
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions findatapy/market/ioengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,24 +267,28 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
r = redis.StrictRedis(host=db_server, port=db_port, db=0, socket_timeout=timeout,
socket_connect_timeout=timeout)

if isinstance(data_frame, pandas.DataFrame):
# msgpack/blosc is deprecated
# r.set(fname, data_frame.to_msgpack(compress='blosc'))
if data_frame is not None:
if isinstance(data_frame, pandas.DataFrame):
# msgpack/blosc is deprecated
# r.set(fname, data_frame.to_msgpack(compress='blosc'))

# now uses pyarrow
context = pa.default_serialization_context()
# now uses pyarrow
context = pa.default_serialization_context()

ser = context.serialize(data_frame).to_buffer()
ser = context.serialize(data_frame).to_buffer()

if use_cache_compression:
comp = pa.compress(ser, codec='lz4', asbytes=True)
siz = len(ser) # siz = 3912
if use_cache_compression:
comp = pa.compress(ser, codec='lz4', asbytes=True)
siz = len(ser) # siz = 3912

r.set('comp_' + str(siz) + '_' + fname, comp)
else:
r.set(fname, ser.to_pybytes())
r.set('comp_' + str(siz) + '_' + fname, comp)
else:
r.set(fname, ser.to_pybytes())

self.logger.info("Pushed " + fname + " to Redis")
else:
self.logger.info("Object " + fname + " is empty, not pushed to Redis.")

self.logger.info("Pushed " + fname + " to Redis")
except Exception as e:
self.logger.warning("Couldn't push " + fname + " to Redis: " + str(e))

Expand Down Expand Up @@ -554,7 +558,6 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', start_date=None
if True:
r = redis.StrictRedis(host=db_server, port=db_port, db=0)


# is there a compressed key?
k = r.keys('comp_*_' + fname_single)

Expand All @@ -569,7 +572,11 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', start_date=None

msg = context.deserialize(dec)
else:
msg = context.deserialize(r.get(fname_single))
msg = r.get(fname_single)

print(fname_single)
if msg is not None:
msg = context.deserialize(msg)
#self.logger.warning("Key " + fname_single + " not in Redis cache?")

else:
Expand Down

0 comments on commit b679ced

Please sign in to comment.