Skip to content

Commit

Permalink
Add more S3 functions/support
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed Jul 12, 2021
1 parent beea796 commit 13a3bca
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 57 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ individual data providers)

# Coding log

* 12 Jul 2021
* Can now read CSV conf files for tickers from S3 buckets and improved S3 support (can now specify AWS credentials, as parameter)
* Additional file functions (eg. list_files)
* 05 Jul 2021
* Now (optionally) writes Parquet files in chunks (user specified size) to avoid memory issues with pyarrow
* Default is to use pandas.to_parquet (with pyarrow), and to fall back on chunked writing if that fails
Expand Down
119 changes: 105 additions & 14 deletions findatapy/market/ioengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
from dateutil.parser import parse
import shutil
import copy

import math

Expand Down Expand Up @@ -463,7 +464,7 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
if fname[-5:] != '.gzip':
fname = fname + '.parquet'

self.to_parquet(data_frame, fname, aws_region=constants.aws_region, parquet_compression=parquet_compression,
self.to_parquet(data_frame, fname, cloud_credentials=constants.cloud_credentials, parquet_compression=parquet_compression,
use_pyarrow_directly=use_pyarrow_directly)
# data_frame.to_parquet(fname, compression=parquet_compression)

Expand Down Expand Up @@ -899,27 +900,86 @@ def sanitize_path(self, path):
"""
if "s3://" in path:
path = path.replace("s3://", "")

path = path.replace("//", "/")

return "s3://" + path

return path

def read_parquet(self, path):
def read_parquet(self, path, cloud_credentials=constants.cloud_credentials):
"""Reads a Pandas DataFrame from a local or s3 path
Parameters
----------
path : str
Path of Parquet file (can be S3)
cloud_credentials : dict (optional)
Credentials for logging into the cloud
Returns
-------
DataFrame
"""
return pd.read_parquet(self.sanitize_path(path))

def to_parquet(self, df, path, filename=None, aws_region=constants.aws_region,
if "s3://" in path:
storage_options = self._convert_cred(cloud_credentials, convert_to_s3fs=True)

return pd.read_parquet(self.sanitize_path(path), storage_options=storage_options)
else:
return pd.read_parquet(path)


def _create_cloud_filesystem(self, cloud_credentials, filesystem_type):

cloud_credentials = self._convert_cred(cloud_credentials)

if 's3_pyarrow' == filesystem_type:
return pyarrow.fs.S3FileSystem(anon=cloud_credentials['aws_anon'],
access_key=cloud_credentials['aws_access_key'],
secret_key=cloud_credentials['aws_secret_key'],
session_token=cloud_credentials['aws_session_token'])

elif 's3_filesystem' == filesystem_type:
return S3FileSystem(anon=cloud_credentials['aws_anon'],
key=cloud_credentials['aws_access_key'],
secret=cloud_credentials['aws_secret_key'],
token=cloud_credentials['aws_session_token'])

def _convert_cred(self, cloud_credentials, convert_to_s3fs=False):
"""Backfills the credential dictionary (usually for AWS login)
"""

cloud_credentials = copy.copy(cloud_credentials)

boolean_keys = {'aws_anon' : False}

mappings = {'aws_anon' : 'anon',
'aws_access_key': 'key',
'aws_secret_key': 'secret',
'aws_session_token': 'token'
}

for m in mappings.keys():
if m not in cloud_credentials.keys():
if m in boolean_keys:
cloud_credentials[m] = boolean_keys[m]
else:
cloud_credentials[m] = None

# Converts the field names eg. aws_access_key => key etc.
if convert_to_s3fs:

cloud_credentials_temp = {}

for m in cloud_credentials.keys():
cloud_credentials_temp[mappings[m]] = cloud_credentials[m]

return cloud_credentials_temp

return cloud_credentials

def to_parquet(self, df, path, filename=None, cloud_credentials=constants.cloud_credentials,
parquet_compression=constants.parquet_compression, use_pyarrow_directly=False):
"""Write a DataFrame to a local or s3 path as a Parquet file
Expand All @@ -934,11 +994,11 @@ def to_parquet(self, df, path, filename=None, aws_region=constants.aws_region,
filename : str (optional)
Filename to be used (will be combined with the specified paths)
aws_region : str (optional)
AWS region for s3 dump
cloud_credentials : str (optional)
AWS credentials for S3 dump
parquet_compression : str (optional)
Parquet compression type to use when writting
Parquet compression type to use when writing
"""
logger = LoggerManager.getLogger(__name__)

Expand Down Expand Up @@ -979,6 +1039,9 @@ def to_parquet(self, df, path, filename=None, aws_region=constants.aws_region,
except:
pass


cloud_credentials = self._convert_cred(cloud_credentials)

# Tends to be slower than using pandas/pyarrow directly, but for very large files, we might have to split
# before writing to disk
def pyarrow_dump(df, path):
Expand All @@ -1003,7 +1066,8 @@ def pyarrow_dump(df, path):
counter = 1

if 's3://' in p:
s3 = pyarrow.fs.S3FileSystem(region=aws_region)
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_pyarrow')

p_in_s3 = p.replace("s3://", "")

for df_ in df_list:
Expand Down Expand Up @@ -1131,15 +1195,25 @@ def chunk_dataframes(self, obj, chunk_size_mb=constants.chunk_size_mb):

return obj_list

def to_csv(self, df, path):
def read_csv(self, path, cloud_credentials=constants.cloud_credentials):

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

path = self.sanitize_path(path)
path_in_s3 = self.sanitize_path(path).replace("s3://", "")

s3 = S3FileSystem(anon=False)
# Use 'w' for py3, 'wb' for py2
with s3.open(path_in_s3, 'r') as f:
return pd.read_csv(f)
else:
return pd.read_csv(path)

path_in_s3 = path.replace("s3://", "")
def to_csv(self, df, path, cloud_credentials=constants.cloud_credentials):

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

path_in_s3 = self.sanitize_path(path).replace("s3://", "")

# Use 'w' for py3, 'wb' for py2
with s3.open(path_in_s3, 'w') as f:
Expand All @@ -1151,7 +1225,7 @@ def path_exists(self, path):
if 's3://' in path:
path_in_s3 = path.replace("s3://", "")

return S3FileSystem(anon=False).exists(path_in_s3)
return S3FileSystem().exists(path_in_s3)
else:
return os.path.exists(path)

Expand All @@ -1175,6 +1249,23 @@ def path_join(self, folder, file):

return folder

def list_files(self, path, cloud_credentials=constants.cloud_credentials):
if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

path_in_s3 = self.sanitize_path(path).replace("s3://", "")

files = ['s3://' + x for x in s3.glob(path_in_s3)]

else:
files = glob.glob(path)

list.sort(files)

return files



#######################################################################################################################

class SpeedCache(object):
Expand Down
20 changes: 13 additions & 7 deletions findatapy/util/configmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ def populate_time_series_dictionaries(data_constants=None):
for tickers_list_file in time_series_tickers_list_file:

if os.path.isfile(tickers_list_file):
reader = csv.DictReader(open(tickers_list_file))
df_tickers.append(pd.read_csv(tickers_list_file))
# reader = csv.DictReader(open(tickers_list_file))
df = pd.read_csv(tickers_list_file)
df = df.dropna(how='all')
df_tickers.append(df)

for line in reader:
for index, line in df.iterrows():
category = line["category"]
data_source = line["data_source"]
freq_list = line["freq"].split(',')
Expand Down Expand Up @@ -191,9 +193,11 @@ def populate_time_series_dictionaries(data_constants=None):
ConfigManager._data_frame_time_series_tickers = df_tickers

## Populate fields conversions
reader = csv.DictReader(open(data_constants.time_series_fields_list))
# reader = csv.DictReader(open(data_constants.time_series_fields_list))
df = pd.read_csv(data_constants.time_series_fields_list)
df = df.dropna(how='all')

for line in reader:
for index, line in df.iterrows():
data_source = line["data_source"]
fields = line["fields"]
vendor_fields = line["vendor_fields"]
Expand All @@ -205,9 +209,11 @@ def populate_time_series_dictionaries(data_constants=None):
ConfigManager._dict_time_series_fields_list_library_to_vendor[data_source + '.' + fields] = vendor_fields

## Populate categories fields list
reader = csv.DictReader(open(data_constants.time_series_categories_fields))
# reader = csv.DictReader(open(data_constants.time_series_categories_fields))
df = pd.read_csv(data_constants.time_series_categories_fields)
df = df.dropna(how='all')

for line in reader:
for index, line in df.iterrows():
category = line["category"]
data_source = line["data_source"]
freq = line["freq"]
Expand Down
32 changes: 26 additions & 6 deletions findatapy/util/dataconstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@

def path_join(folder, file):
if 's3://' in folder:
if folder[-1] == '/':
return folder + file
else:
return folder + '/' + file
folder = folder.replace("s3://", "")
folder = folder + "/" + file

folder = folder.replace("//", "/")

folder = "s3://" + folder

else:
return os.path.join(folder, file)
if file[0] == '/':
file = file[1::]

folder = os.path.join(folder, file)

folder = folder.replace("\\\\", "/")
folder = folder.replace("\\", "/")

return folder

def key_store(service_name):
key = None
Expand Down Expand Up @@ -79,7 +90,16 @@ class DataConstants(object):
use_cache_compression = True

parquet_compression = 'gzip' # 'gzip' or 'snappy'
aws_region = None

# Note for AWS you can set these globally without having to specify here with AWS CLI
cloud_credentials = {'aws_anon' : False}

## eg. {
# {
# "aws_access_key": "asdfksesdf",
# "aws_secret_key": "asfsdf",
# "aws_access_token": "adsfsdf",
# },

###### FOR ALIAS TICKERS
# Config file for time series categories
Expand Down
19 changes: 17 additions & 2 deletions findatapy/util/tickerfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,30 @@ def aggregate_ticker_excel(self, excel_file, out_csv, sheets=[], skiprows=None,

df_list = []

logger = LoggerManager.getLogger(__name__)

for sh in sheets:
df_list.append(pd.read_excel(excel_file, sheet_name=sh, skiprows=skiprows))
logger.info("Reading from " + sh + " in " + excel_file)

df = pd.read_excel(excel_file, sheet_name=sh, skiprows=skiprows)
df = df.dropna(how='all')
df_list.append(df)

df = pd.concat(df_list)

if cols is not None:
df = df[cols]

df = df.reset_index()
df.to_csv(out_csv)

if isinstance(out_csv, list):
for out in out_csv:
logger.info("Writing to " + out)

df.to_csv(out)

else:
logger.info("Writing to " + out_csv)
df.to_csv(out_csv)

return df

0 comments on commit 13a3bca

Please sign in to comment.