Skip to content

Commit

Permalink
Extra support for S3 writing/reading of Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed Jul 4, 2021
1 parent 605dfd2 commit d004b4a
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 84 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ individual data providers)

# Coding log

* 04 Jul 2021
* Added extra support for reading/writing to S3 buckets of Parquet files
* 02 Jul 2021
* Can download multiple CSVs in ZIP with time series data (DataVendorWeb)
* 29 Jun 2021
* Added downloads badge
* 17 Jun 2021
Expand Down
142 changes: 87 additions & 55 deletions findatapy/market/datavendorweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2418,83 +2418,115 @@ def __init__(self):
super(DataVendorFlatFile, self).__init__()

# implement method in abstract superclass
def load_ticker(self, market_data_request):
def load_ticker(self, market_data_request, index_col=0):
logger = LoggerManager().getLogger(__name__)

data_source = market_data_request.data_source
data_source_list = market_data_request.data_source
data_engine = market_data_request.data_engine

if data_engine is not None:
if isinstance(data_source_list, list):
pass
else:
data_source_list = [data_source_list]

data_frame_list = []

for data_source in data_source_list:

if data_engine is not None:

logger.info("Request " + market_data_request.data_source + " data via " + data_engine)
logger.info("Request " + market_data_request.data_source + " data via " + data_engine)

# If a file path has been specified
if '*' in data_engine:
w = data_engine.split("*.")
# If a file path has been specified
if '*' in data_engine:
w = data_engine.split("*.")

folder = w[0]
file_format = w[-1]
folder = w[0]
file_format = w[-1]

# For intraday/tick files each ticker is stored in a separate file
if market_data_request.freq == 'intraday' or market_data_request.freq == 'tick':
path = market_data_request.environment + "." \
+ market_data_request.category + "." + data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut + "." + market_data_request.tickers[0] + "." + file_format
else:
path = market_data_request.environment + "." \
+ market_data_request.category + "." + data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut + "." + file_format

# For intraday/tick files each ticker is stored in a separate file
if market_data_request.freq == 'intraday' or market_data_request.freq == 'tick':
path = market_data_request.environment + "." \
+ market_data_request.category + "." + market_data_request.data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut + "." + market_data_request.tickers[0] + "." + file_format
full_path = os.path.join(folder, path)
else:
path = market_data_request.environment + "." \
+ market_data_request.category + "." + market_data_request.data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut + "." + file_format
# Otherwise a database like arctic has been specified

# For intraday/tick files each ticker is stored in a separate file
if market_data_request.freq == 'intraday' or market_data_request.freq == 'tick':
full_path = market_data_request.environment + "." \
+ market_data_request.category + "." + data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut + "." + market_data_request.tickers[0]
else:
full_path = market_data_request.environment + "." \
+ market_data_request.category + "." + data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut

full_path = os.path.join(folder, path)
else:
# Otherwise a database like arctic has been specified
logger.info("Request " + data_source + " data")

full_path = data_source

if ".zip" in data_source:
import zipfile

# For intraday/tick files each ticker is stored in a separate file
if market_data_request.freq == 'intraday' or market_data_request.freq == 'tick':
full_path = market_data_request.environment + "." \
+ market_data_request.category + "." + market_data_request.data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut + "." + market_data_request.tickers[0]
if "http" in full_path:
from requests import get
request = get(full_path)
zf = zipfile.ZipFile(BytesIO(request.content))
else:
full_path = market_data_request.environment + "." \
+ market_data_request.category + "." + market_data_request.data_source + "." + market_data_request.freq \
+ "." + market_data_request.cut
zf = zipfile.ZipFile(full_path)

else:
logger.info("Request " + market_data_request.data_source + " data")
name_list = zipfile.ZipFile.namelist(zf)

full_path = data_source
df_list = []

if ".csv" in market_data_request.data_source:
data_frame = pandas.read_csv(full_path, index_col=0, parse_dates=True,
infer_datetime_format=True)
elif ".h5" in market_data_request.data_source:
data_frame = IOEngine().read_time_series_cache_from_disk(full_path, engine='hdf5')
elif ".parquet" in market_data_request.data_source or '.gzip' in market_data_request.data_source:
data_frame = IOEngine().read_time_series_cache_from_disk(full_path, engine='parquet')
else:
data_frame = IOEngine().read_time_series_cache_from_disk(full_path, engine=data_engine)
for name in name_list:
df_list.append(pd.read_csv(zf.open(name), index_col=index_col, parse_dates=True,
infer_datetime_format=True))

if data_frame is None or data_frame.index is []: return None
data_frame = pd.concat(df_list)
elif ".csv" in data_source:
data_frame = pandas.read_csv(full_path, index_col=index_col, parse_dates=True,
infer_datetime_format=True)
elif ".h5" in data_source:
data_frame = IOEngine().read_time_series_cache_from_disk(full_path, engine='hdf5')
elif ".parquet" in data_source or '.gzip' in data_source:
data_frame = IOEngine().read_time_series_cache_from_disk(full_path, engine='parquet')
else:
data_frame = IOEngine().read_time_series_cache_from_disk(full_path, engine=data_engine)

if data_frame is not None:
tickers = data_frame.columns
if data_frame is None or data_frame.index is []: return None

if data_frame is not None:
# Tidy up tickers into a format that is more easily translatable
# we can often get multiple fields returned (even if we don't ask for them!)
# convert to lower case
ticker_combined = []
if data_frame is not None:
tickers = data_frame.columns

for i in range(0, len(tickers)):
if "." in tickers[i]:
ticker_combined.append(tickers[i])
else:
ticker_combined.append(tickers[i] + ".close")
if data_frame is not None:
# Tidy up tickers into a format that is more easily translatable
# we can often get multiple fields returned (even if we don't ask for them!)
# convert to lower case
ticker_combined = []

for i in range(0, len(tickers)):
if "." in tickers[i]:
ticker_combined.append(tickers[i])
else:
ticker_combined.append(tickers[i] + ".close")

data_frame.columns = ticker_combined
data_frame.index.name = 'Date'
data_frame.columns = ticker_combined
data_frame.index.name = 'Date'

logger.info("Completed request from " + str(data_source) + " for " + str(ticker_combined))

data_frame_list.append(data_frame)

logger.info("Completed request from " + market_data_request.data_source + " for " + str(ticker_combined))
data_frame = pd.concat(data_frame_list)

return data_frame

Expand Down
126 changes: 104 additions & 22 deletions findatapy/market/ioengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,10 +881,72 @@ def create_cache_file_name(self, filename):
def get_engine(self, engine='hdf5_fixed'):
pass

def sanitize_path(self, path):
"""Will remove unnecessary // from a file path (eg. in the middle)
Parameters
----------
path : str
path to be sanitized
Returns
-------
str
"""
if "s3://" in path:
path = path.replace("s3://", "")

path = path.replace("//", "/")

return "s3://" + path

return path

def read_parquet(self, path):
return pd.read_parquet(path)
"""Reads a Pandas DataFrame from a local or s3 path
Parameters
----------
path : str
Returns
-------
DataFrame
"""
return pd.read_parquet(self.sanitize_path(path))

def to_parquet(self, df, path, filename=None, aws_region=constants.aws_region, parquet_compression=constants.parquet_compression):
"""Write a DataFrame to a local or s3 path as a Parquet file
Parameters
----------
df : DataFrame
DataFrame to be written
path : str(list)
Paths where the DataFrame will be written
filename : str (optional)
Filename to be used (will be combined with the specified paths)
def to_parquet(self, df, path, aws_region=constants.aws_region, parquet_compression=constants.parquet_compression):
aws_region : str (optional)
AWS region for s3 dump
parquet_compression : str (optional)
Parquet compression type to use when writting
"""
if isinstance(path, list):
pass
else:
path = [path]

if filename is not None:
new_path = []

for p in path:
new_path.append(self.path_join(p, filename))

path = new_path

constants = DataConstants()

Expand All @@ -910,24 +972,43 @@ def to_parquet(self, df, path, aws_region=constants.aws_region, parquet_compress
except:
pass

if 's3://' in path:
s3 = pyarrow.fs.S3FileSystem(region=aws_region)
table = pa.Table.from_pandas(df)
for p in path:
p = self.sanitize_path(p)

path_in_s3 = path.replace("s3://", "")
if 's3://' in p:
s3 = pyarrow.fs.S3FileSystem(region=aws_region)
table = pa.Table.from_pandas(df)

with s3.open_output_stream(path_in_s3) as f:
pq.write_table(table, f, compression=parquet_compression, coerce_timestamps=constants.default_time_units, allow_truncated_timestamps=True,
)
path_in_s3 = p.replace("s3://", "")

else:
# Using pandas.to_parquet, doesn't let us pass in parameters to allow coersion of timestamps
# ie. ns -> us
table = pa.Table.from_pandas(df)
with s3.open_output_stream(path_in_s3) as f:
pq.write_table(table, f, compression=parquet_compression, coerce_timestamps=constants.default_time_units, allow_truncated_timestamps=True,
)

else:
# Using pandas.to_parquet, doesn't let us pass in parameters to allow coersion of timestamps
# ie. ns -> us
table = pa.Table.from_pandas(df)

pq.write_table(table, p, compression=parquet_compression,
coerce_timestamps=constants.default_time_units, allow_truncated_timestamps=True)
# df.to_parquet(path, compression=parquet_compression)

pq.write_table(table, path, compression=parquet_compression,
coerce_timestamps=constants.default_time_units, allow_truncated_timestamps=True)
# df.to_parquet(path, compression=parquet_compression)
def to_csv(self, df, path):

if "s3://" in path:

path = self.sanitize_path(path)

s3 = S3FileSystem(anon=False)

path_in_s3 = path.replace("s3://", "")

# Use 'w' for py3, 'wb' for py2
with s3.open(path_in_s3, 'w') as f:
df.to_csv(f)
else:
df.to_csv(path)

def path_exists(self, path):
if 's3://' in path:
Expand All @@ -939,14 +1020,15 @@ def path_exists(self, path):

def path_join(self, folder, file):
if 's3://' in folder:
if folder[-1] == '/':
return folder + file
else:
return folder + '/' + file
else:
return os.path.join(folder, file)
folder = folder.replace("s3://", "")
folder = folder + file

folder = folder.replace("//", "/")

return "s3://" + folder

else:
return os.path.join(folder, file)

#######################################################################################################################

Expand Down
14 changes: 8 additions & 6 deletions findatapy/market/marketdatagenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def get_data_vendor(self, md_request):
data_source = md_request.data_source
data_engine = md_request.data_engine

# Special case for files
if '.csv' in data_source or '.h5' in data_source or '.parquet' in data_source or data_engine is not None:
# Special case for files (csv, h5, parquet or zip)
if '.csv' in str(data_source) or '.h5' in str(data_source) or '.parquet' in str(data_source) or '.zip' in str(data_source) \
or data_engine is not None:
from findatapy.market.datavendorweb import DataVendorFlatFile
data_vendor = DataVendorFlatFile()
else:
Expand Down Expand Up @@ -180,7 +181,7 @@ def fetch_market_data(self, market_data_request, kill_session = True):

tickers = market_data_request.tickers

if tickers is None :
if tickers is None:
create_tickers = True
elif isinstance(tickers, str):
if tickers == '': create_tickers = True
Expand Down Expand Up @@ -539,12 +540,13 @@ def download_daily(self, market_data_request):
# By default use other
thread_no = constants.market_thread_no['other']

if market_data_request.data_source in constants.market_thread_no:
if str(market_data_request.data_source) in constants.market_thread_no:
thread_no = constants.market_thread_no[market_data_request.data_source]

# Daily data does not include ticker in the key, as multiple tickers in the same file
if thread_no == 1 or '.csv' in market_data_request.data_source or \
'.h5' in market_data_request.data_source or '.parquet' in market_data_request.data_source or market_data_request.data_engine is not None:
if thread_no == 1 or '.csv' in str(market_data_request.data_source) or \
'.h5' in str(market_data_request.data_source) or '.parquet' in str(market_data_request.data_source) \
or '.zip' in str(market_data_request.data_source) or market_data_request.data_engine is not None:
# data_frame_agg = data_vendor.load_ticker(md_request)
data_frame_agg = self.fetch_single_time_series(market_data_request)
else:
Expand Down
1 change: 0 additions & 1 deletion findatapy/timeseries/calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import copy
from datetime import timedelta


class Calculations(object):
"""Calculations on time series, such as calculating strategy returns and various wrappers on pd for rolling sums etc.
Expand Down

0 comments on commit d004b4a

Please sign in to comment.