Skip to content

Commit

Permalink
Added tests, replaced blosc with pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed Nov 2, 2019
1 parent 6110e68 commit 14b359e
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 14 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ In findatapy/examples you will find several demos

# Coding log

* 02 Nov 2019
* Added BoE as a data source
* Removed blosc/msgpack (msgpack deprecated in pandas) and replaced with pyarrow for caching
* Uses keyring library for API keys (unless specified in DataCred)
* Began to add tests for IO
* 03 Oct 2019
* Remove API key from cache
* Remove timezone when storing in Arctic (can cause issues with later versions of Pandas)
Expand Down
32 changes: 25 additions & 7 deletions findatapy/market/ioengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
except:
pass

try:
import pyarrow as pa
except:
pass

import pyarrow as pa

from openpyxl import load_workbook
import os.path

Expand Down Expand Up @@ -261,9 +268,15 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
socket_connect_timeout=timeout)

if isinstance(data_frame, pandas.DataFrame):
r.set(fname, data_frame.to_msgpack(compress='blosc'))
# msgpack/blosc is deprecated
# r.set(fname, data_frame.to_msgpack(compress='blosc'))

# now uses pyarrow
context = pa.default_serialization_context()

self.logger.debug("Pushed " + fname + " to Redis")
r.set(fname, context.serialize(data_frame).to_buffer().to_pybytes())

self.logger.info("Pushed " + fname + " to Redis")
except Exception as e:
self.logger.warning("Couldn't push " + fname + " to Redis: " + str(e))

Expand Down Expand Up @@ -335,7 +348,7 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
# append data only works for HDF5 stored as tables (but this is much slower than fixed format)
# removes duplicated entries at the end
if append_data:
store = pandas.HDFStore(h5_filename, format=hdf5_format, complib="blosc", complevel=9)
store = pandas.HDFStore(h5_filename, format=hdf5_format, complib="zlib", complevel=9)

if ('intraday' in fname):
data_frame = data_frame.astype('float32')
Expand Down Expand Up @@ -367,7 +380,7 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
except:
pass

store = pandas.HDFStore(h5_filename_temp, format=hdf5_format, complib="blosc", complevel=9)
store = pandas.HDFStore(h5_filename_temp, format=hdf5_format, complib="zlib", complevel=9)

if ('intraday' in fname):
data_frame = data_frame.astype('float32')
Expand Down Expand Up @@ -529,18 +542,23 @@ def read_time_series_cache_from_disk(self, fname, engine='hdf5', start_date=None

try:
r = redis.StrictRedis(host=db_server, port=db_port, db=0)
msg = r.get(fname_single)

# msg = r.get(fname_single)

# for pyarrow
context = pa.default_serialization_context()

msg = context.deserialize(r.get(fname_single))

except:
self.logger.info("Cache not existent for " + fname_single + " in Redis")

if msg is None:
data_frame = None
else:

self.logger.info('Load Redis cache: ' + fname_single)

data_frame = pandas.read_msgpack(msg)
data_frame = msg # pandas.read_msgpack(msg)

elif (engine == 'arctic'):
socketTimeoutMS = 2 * 1000
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
tickers, using configuration files. There is also functionality which is particularly useful for those downloading FX market data."""

setup(name='findatapy',
version='0.1.1',
version='0.1.2',
description='Market data library',
author='Saeed Amen',
author_email='saeed@cuemacro.com',
Expand All @@ -31,6 +31,7 @@
'pathos',
'redis',
'numba',
'blosc',
'pyarrow',
'keyring',
'openpyxl'],
zip_safe=False)
13 changes: 8 additions & 5 deletions tests/test_filtering.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import pytest
import pandas
import pandas as pd

from findatapy.timeseries import Filter

def test_filtering_by_dates():

filter = Filter()

# filter S&P500 between specific working days
start_date = '01 Oct 2008'
finish_date = '29 Oct 2008'

# read CSV from disk, and make sure to parse dates
df = pandas.read_csv("S&P500.csv", parse_dates=['Date'], index_col=['Date'])
df = pd.read_csv("S&P500.csv", parse_dates=['Date'], index_col=['Date'])
df = filter.filter_time_series_by_date(start_date=start_date, finish_date=finish_date, data_frame=df)

assert df.index[0] == pandas.to_datetime(start_date)
assert df.index[-1]== pandas.to_datetime(finish_date)
assert df.index[0] == pd.to_datetime(start_date)
assert df.index[-1]== pd.to_datetime(finish_date)

if __name__ == '__main__':
pytest.main()
pytest.main()

# test_filtering_by_dates()
33 changes: 33 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest
import pandas as pd

from findatapy.util.dataconstants import DataConstants

data_constants = DataConstants()

redis_server = data_constants.db_cache_server
redis_port = data_constants.db_cache_port

def test_redis_caching():
# Note: you need to install Redis in order for this to work!

# read CSV from disk, and make sure to parse dates
df = pd.read_csv("S&P500.csv", parse_dates=['Date'], index_col=['Date'])
df.index = pd.to_datetime(df.index)

from findatapy.market.ioengine import IOEngine

io = IOEngine()

# Write DataFrame to Redis (using pyarrow format)
io.write_time_series_cache_to_disk('test_key', df, engine='redis', db_server=redis_server, db_port=redis_port)

# Read back DataFrame from Redis (using pyarrow format)
df_out = io.read_time_series_cache_from_disk('test_key', engine='redis', db_server=redis_server, db_port=redis_port)

pd.testing.assert_frame_equal(df, df_out)

if __name__ == '__main__':
pytest.main()

# test_redis_caching()
22 changes: 22 additions & 0 deletions tests/test_market_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest
import pandas as pd

from findatapy.market import Market, MarketDataGenerator, MarketDataRequest
from findatapy.util.dataconstants import DataConstants

market = Market(market_data_generator=MarketDataGenerator())

data_constants = DataConstants()
quandl_api_key = data_constants.quandl_api_key

def test_quandl_download():
md_request = MarketDataRequest(start_date='month', category='fx', data_source='quandl', tickers=['AUDJPY'],
quandl_api_key=quandl_api_key)

df = market.fetch_market(md_request)

assert df is not None

if __name__ == '__main__':
pytest.main()

0 comments on commit 14b359e

Please sign in to comment.