Skip to content

Commit

Permalink
Fixing s3 support [Resolves #364]
Browse files Browse the repository at this point in the history
Catwalk
Removed smart_open, now using s3fs
Reason: smart_open uses boto instead of boto3. boto lacks of several features (like using aws sts)

- S3Store is working, and also S3ModelStoreEngine
- CSVMatrixStore supports storing to s3 using s3fs
- HDFMatrixStore now doesn' t support storing to s3  - H5 matrices can't be stored in S3 with s3fs (it
is a limitation of pytables, not of s3fs: H5 doesn't support buffers,
so it can't be used with context manager from s3fs, and if you try to
force it, an unicode exception is throw
- Use joblib instead of pickle
- Standardize on joblib pickling, remove now unused catwalk util code

Metta
- Added support to store matrices in s3 (only for csv)

Other
- Add maxtasksperchild=1 to MultiCore Experiment to recycle memory
- Upgrade to python 3.6
  • Loading branch information
nanounanue authored and thcrock committed Feb 1, 2018
1 parent 92d8f2b commit 7bfaf85
Show file tree
Hide file tree
Showing 18 changed files with 375 additions and 283 deletions.
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
triage-3.5.3
triage-3.6.2
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file was autogenerated and will overwrite each time you run travis_pypi_setup.py
language: python
python: 3.5
python: 3.6
addons:
postgresql: '9.5'
services:
Expand Down
3 changes: 2 additions & 1 deletion requirement/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PyYAML
psycopg2
python-dateutil
scipy
scikit-learn==0.18.2
scikit-learn
tables==3.3.0
matplotlib
pandas
Expand All @@ -17,3 +17,4 @@ retrying
Dickens==1.0.1
signalled-timeout==1.0.0
smart_open==1.5.3
s3fs==0.1.2
8 changes: 5 additions & 3 deletions src/tests/catwalk_tests/test_individual_importance_uniform.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_uniform_distribution_entity_id_index():
for imp in feature_importances:
data_dict[imp.feature] = [0.5, 0.5]
test_store = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data_dict).set_index(['entity_id']),
matrix=pandas.DataFrame.from_dict(data_dict),
metadata=sample_metadata()
)
session.commit()
Expand Down Expand Up @@ -68,9 +68,11 @@ def test_uniform_distribution_entity_date_index():
data_dict = {'entity_id': [1, 1], 'as_of_date': ['2016-01-01', '2017-01-01']}
for imp in feature_importances:
data_dict[imp.feature] = [0.5, 0.5]
metadata = sample_metadata()
metadata['indices'] = ['entity_id', 'as_of_date']
test_store = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data_dict).set_index(['entity_id', 'as_of_date']),
metadata=sample_metadata()
matrix=pandas.DataFrame.from_dict(data_dict),
metadata=metadata
)
session.commit()
results = uniform_distribution(
Expand Down
2 changes: 1 addition & 1 deletion src/tests/catwalk_tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_integration():
for as_of_date in as_of_dates
]

model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
model_storage_engine = S3ModelStorageEngine(project_path)

experiment_hash = save_experiment_and_get_hash({}, db_engine)
# instantiate pipeline objects
Expand Down
16 changes: 5 additions & 11 deletions src/tests/catwalk_tests/test_model_trainers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import boto3
import pandas
import pickle
import testing.postgresql
import datetime
import sqlalchemy
Expand All @@ -10,7 +9,6 @@
from moto import mock_s3
from sqlalchemy import create_engine
from triage.component.catwalk.db import ensure_db
from triage.component.catwalk.utils import model_cache_key

from triage.component.catwalk.model_trainers import ModelTrainer
from triage.component.catwalk.storage import InMemoryModelStorageEngine,\
Expand Down Expand Up @@ -51,7 +49,7 @@ def test_model_trainer():
'indices': ['entity_id'],
}
project_path = 'econ-dev/inspections'
model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
model_storage_engine = S3ModelStorageEngine(project_path)
trainer = ModelTrainer(
project_path=project_path,
experiment_hash=None,
Expand Down Expand Up @@ -79,11 +77,7 @@ def test_model_trainer():
engine.execute('select model_hash from results.models')
]
assert len(records) == 4

cache_keys = [
model_cache_key(project_path, model_row[0], s3_conn)
for model_row in records
]
hashes = [row[0] for row in records]

# 2. that the model groups are distinct
records = [
Expand All @@ -94,8 +88,8 @@ def test_model_trainer():

# 3. that all four models are cached
model_pickles = [
pickle.loads(cache_key.get()['Body'].read())
for cache_key in cache_keys
model_storage_engine.get_store(model_hash).load()
for model_hash in hashes
]
assert len(model_pickles) == 4
assert len([x for x in model_pickles if x is not None]) == 4
Expand Down Expand Up @@ -203,7 +197,7 @@ def test_n_jobs_not_new_model():
trainer = ModelTrainer(
project_path='econ-dev/inspections',
experiment_hash=None,
model_storage_engine=S3ModelStorageEngine(s3_conn, 'econ-dev/inspections'),
model_storage_engine=S3ModelStorageEngine('econ-dev/inspections'),
db_engine=engine,
model_group_keys=['label_name', 'label_timespan']
)
Expand Down
6 changes: 3 additions & 3 deletions src/tests/catwalk_tests/test_predictors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_predictor():
s3_conn = boto3.resource('s3')
s3_conn.create_bucket(Bucket='econ-dev')
project_path = 'econ-dev/inspections'
model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
model_storage_engine = S3ModelStorageEngine(project_path)
_, model_id = \
fake_trained_model(project_path, model_storage_engine, db_engine)
predictor = Predictor(project_path, model_storage_engine, db_engine)
Expand Down Expand Up @@ -148,7 +148,7 @@ def test_predictor_composite_index():
'end_time': AS_OF_DATE,
'label_timespan': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
'indices': ['entity_id', 'as_of_date'],
}
matrix_store = InMemoryMatrixStore(matrix, metadata)
predict_proba = predictor.predict(
Expand Down Expand Up @@ -238,7 +238,7 @@ def test_predictor_retrieve():
'end_time': AS_OF_DATE,
'label_timespan': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
'indices': ['entity_id', 'as_of_date'],
}
matrix_store = InMemoryMatrixStore(matrix, metadata)
predict_proba = predictor.predict(
Expand Down
46 changes: 21 additions & 25 deletions src/tests/catwalk_tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import yaml
from collections import OrderedDict

import boto
import pandas
import pandas as pd
from moto import mock_s3, mock_s3_deprecated

from triage.component.catwalk.storage import (
Expand All @@ -24,11 +23,11 @@ def __init__(self, val):


def test_S3Store():
import boto3
with mock_s3():
s3_conn = boto3.resource('s3')
s3_conn.create_bucket(Bucket='a-bucket')
store = S3Store(s3_conn.Object('a-bucket', 'a-path'))
import boto3
client = boto3.client('s3')
client.create_bucket(Bucket='test_bucket', ACL='public-read-write')
store = S3Store(path=f"s3://test_bucket/a_path")
assert not store.exists()
store.write(SomeClass('val'))
assert store.exists()
Expand Down Expand Up @@ -71,7 +70,7 @@ def matrix_store(self):
('m_feature', [0.4, 0.5]),
('label', [0, 1])
])
df = pandas.DataFrame.from_dict(data_dict)
df = pd.DataFrame.from_dict(data_dict)
metadata = {
'label_name': 'label',
'indices': ['entity_id'],
Expand All @@ -85,7 +84,7 @@ def matrix_store(self):
tmphdf = os.path.join(tmpdir, 'df.h5')
with open(tmpyaml, 'w') as outfile:
yaml.dump(metadata, outfile, default_flow_style=False)
df.to_csv(tmpcsv, index=False)
df.to_csv(tmpcsv)
df.to_hdf(tmphdf, 'matrix')
csv = CSVMatrixStore(matrix_path=tmpcsv, metadata_path=tmpyaml)
hdf = HDFMatrixStore(matrix_path=tmphdf, metadata_path=tmpyaml)
Expand All @@ -105,7 +104,8 @@ def matrix_store(self):
assert csv.labels().to_dict() == inmemory.labels().to_dict()
assert hdf.labels().to_dict() == inmemory.labels().to_dict()

matrix_store = [inmemory, hdf, csv]
matrix_store = [inmemory, csv, hdf]

return matrix_store

def test_MatrixStore_resort_columns(self):
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_as_of_dates_entity_index(self):
'feature_two': [0.5, 0.6],
}
matrix = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data),
matrix=pd.DataFrame.from_dict(data),
metadata={'end_time': '2016-01-01', 'indices': ['entity_id']}
)

Expand All @@ -190,29 +190,25 @@ def test_as_of_dates_entity_date_index(self):
'as_of_date': ['2016-01-01', '2016-01-01', '2017-01-01', '2017-01-01']
}
matrix = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data),
matrix=pd.DataFrame.from_dict(data),
metadata={'indices': ['entity_id', 'as_of_date']}
)

self.assertEqual(matrix.as_of_dates, ['2016-01-01', '2017-01-01'])

def test_s3_save(self):
with mock_s3_deprecated():
s3_conn = boto.connect_s3()
bucket_name = 'fake-matrix-bucket'
s3_conn.create_bucket(bucket_name)
with mock_s3():
import boto3
client = boto3.client('s3')
client.create_bucket(Bucket='fake-matrix-bucket', ACL='public-read-write')

matrix_store_list = self.matrix_store()

for matrix_store in matrix_store_list:
matrix_store.save(project_path='s3://fake-matrix-bucket', name='test')

# HDF
hdf = HDFMatrixStore(matrix_path='s3://fake-matrix-bucket/test.h5', metadata_path='s3://fake-matrix-bucket/test.yaml')
# CSV
csv = CSVMatrixStore(matrix_path='s3://fake-matrix-bucket/test.csv', metadata_path='s3://fake-matrix-bucket/test.yaml')
if isinstance(matrix_store, CSVMatrixStore):
matrix_store.save(project_path='s3://fake-matrix-bucket', name='test')
# CSV
csv = CSVMatrixStore(matrix_path='s3://fake-matrix-bucket/test.csv', metadata_path='s3://fake-matrix-bucket/test.yaml')

assert csv.metadata == matrix_store_list[0].metadata
assert csv.matrix.to_dict() == matrix_store_list[0].matrix.to_dict()
assert hdf.metadata == matrix_store_list[0].metadata
assert hdf.matrix.to_dict() == matrix_store_list[0].matrix.to_dict()
assert csv.metadata == matrix_store_list[0].metadata
assert csv.matrix.to_dict() == matrix_store_list[0].matrix.to_dict()
34 changes: 29 additions & 5 deletions src/triage/component/architect/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pandas
import os

import s3fs
from urllib.parse import urlparse

from triage.component import metta


Expand All @@ -25,8 +28,11 @@ def validate(self):

def build_all_matrices(self, build_tasks):
logging.info('Building %s matrices', len(build_tasks.keys()))
for matrix_uuid, task_arguments in build_tasks.items():

for i, (matrix_uuid, task_arguments) in enumerate(build_tasks.items()):
logging.info(f"Building matrix {matrix_uuid} ({i}/{len(build_tasks.keys())})")
self.build_matrix(**task_arguments)
logging.debug(f"Matrix {matrix_uuid} built")

def _outer_join_query(
self,
Expand Down Expand Up @@ -224,9 +230,23 @@ def build_matrix(
matrix_directory,
'{}.csv'.format(matrix_uuid)
)
if not self.replace and os.path.exists(matrix_filename):
logging.info('Skipping %s because matrix already exists', matrix_filename)
return

# The output directory is local or in s3
path_parsed = urlparse(matrix_filename)
scheme = path_parsed.scheme # If '' of 'file' is a regular file or 's3'

if scheme in ('', 'file'):
if not self.replace and os.path.exists(matrix_filename):
logging.info('Skipping %s because matrix already exists', matrix_filename)
return
elif scheme == 's3':
if not self.replace and s3fs.S3FileSystem().exists(matrix_filename):
logging.info('Skipping %s because matrix already exists', matrix_filename)
return
else:
raise ValueError(f"""URL scheme not supported:
{scheme} (from {matrix_filename})
""")

logging.info('Creating matrix %s > %s', matrix_metadata['matrix_id'], matrix_filename)
# make the entity time table and query the labels and features tables
Expand All @@ -248,8 +268,9 @@ def build_matrix(
entity_date_table_name,
matrix_uuid
)
logging.info(f"Feature data extracted for matrix {matrix_uuid}")
try:
logging.info('Extracting label data frmo database into file for '
logging.info('Extracting label data from database into file for '
'matrix %s', matrix_uuid)
labels_csv_name = self.write_labels_data(
label_name,
Expand All @@ -260,13 +281,15 @@ def build_matrix(
)
features_csv_names.insert(0, labels_csv_name)

logging.info(f"Label data extracted for matrix {matrix_uuid}")
# stitch together the csvs
logging.info('Merging feature files for matrix %s', matrix_uuid)
output = self.merge_feature_csvs(
features_csv_names,
matrix_directory,
matrix_uuid
)
logging.info(f"Features data merged for matrix {matrix_uuid}")
finally:
# clean up files and database before finishing
for csv_name in features_csv_names:
Expand All @@ -281,6 +304,7 @@ def build_matrix(
directory=self.matrix_directory,
format='csv'
)
logging.info("Matrix {matrix_uuid} archived (using metta)")
finally:
if isinstance(output, str):
os.remove(output)
Expand Down
2 changes: 1 addition & 1 deletion src/triage/component/architect/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,5 +221,5 @@ def build_all_matrices(self, *args, **kwargs):
self.builder.build_all_matrices(*args, **kwargs)

def build_matrix(self, *args, **kwargs):
logging.info('Building matrix with args %s', args)
logging.info(f"Building matrix with {args}, {kwargs}")
self.builder.build_matrix(*args, **kwargs)
10 changes: 5 additions & 5 deletions src/triage/component/catwalk/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Below is a complete sample usage of the three Catwalk components::
from sqlalchemy import create_engine

from triage.component import metta
from triage.component.catwalk.storage import FSModelStorageEngine, MettaCSVMatrixStore
from triage.component.catwalk.storage import FSModelStorageEngine, CSVMatrixStore
from triage.component.catwalk.model_trainers import ModelTrainer
from triage.component.catwalk.predictors import Predictor
from triage.component.catwalk.evaluation import ModelEvaluator
Expand Down Expand Up @@ -66,9 +66,9 @@ Below is a complete sample usage of the three Catwalk components::
}
train_matrix_uuid = metta.archive_matrix(train_metadata, train_matrix, format='csv')

# The MettaCSVMatrixStore bundles the matrix and metadata together
# The CSVMatrixStore bundles the matrix and metadata together
# for catwalk to use
train_matrix_store = MettaCSVMatrixStore(
train_matrix_store = CSVMatrixStore(
matrix_path='{}.csv'.format(train_matrix_uuid),
metadata_path='{}.yaml'.format(train_matrix_uuid)
)
Expand All @@ -91,9 +91,9 @@ Below is a complete sample usage of the three Catwalk components::
}
test_matrix_uuid = metta.archive_matrix(test_metadata, test_matrix, format='csv')

# The MettaCSVMatrixStore bundles the matrix and metadata together
# The CSVMatrixStore bundles the matrix and metadata together
# for catwalk to use
test_matrix_store = MettaCSVMatrixStore(
test_matrix_store = CSVMatrixStore(
matrix_path='{}.csv'.format(test_matrix_uuid),
metadata_path='{}.yaml'.format(test_matrix_uuid)
)
Expand Down
7 changes: 5 additions & 2 deletions src/triage/component/catwalk/model_trainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ def process_train_task(
elif not saved_model_id:
reason = 'model metadata not found'

logging.info('Training %s/%s: %s', class_path, parameters, reason)
logging.info(f"Training {class_path} with parameters {parameters}"
f"(reason to train: {reason})")
model_id = self._train_and_store_model(
matrix_store,
class_path,
Expand Down Expand Up @@ -464,7 +465,8 @@ def generate_train_tasks(

for class_path, parameters in self._generate_model_configs(grid_config):
model_hash = self._model_hash(matrix_store.metadata, class_path, parameters)
logging.info('Computed model hash %s', model_hash)
logging.info(f"Computed model hash for {class_path} "
f"with parameters {parameters}: {model_hash}")

if any(task['model_hash'] == model_hash for task in tasks):
logging.info('Skipping model_hash %s because another'
Expand All @@ -480,4 +482,5 @@ def generate_train_tasks(
'misc_db_parameters': misc_db_parameters,
})
logging.info('Found %s unique model training tasks', len(tasks))

return tasks
Loading

0 comments on commit 7bfaf85

Please sign in to comment.