Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing s3 support #364

Merged
merged 29 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6257873
Added s3fs as requirement (used for S3Store and CSVMatrixStore)
nanounanue Jan 14, 2018
9ce8d5f
boto -> boto3. New S3Store object for models (uses s3fs) and removed
nanounanue Jan 14, 2018
68e2794
Fixed recursion error. H5 matrices can't be stored in S3 with s3fs (it
nanounanue Jan 15, 2018
1098644
Not needed
nanounanue Jan 15, 2018
bef3117
Testing with 3.6
nanounanue Jan 15, 2018
bf24c36
Removed line: Throws 404 (metadata doesn't exists yet)
nanounanue Jan 15, 2018
f515ca9
Forgot this: head_of_matrix is read from s3 too, duh
nanounanue Jan 15, 2018
f6e2786
Catching error
nanounanue Jan 15, 2018
c1709ff
Cleaning empty. Returning an Empty dataframe
nanounanue Jan 15, 2018
cfb925b
existx -> exists
nanounanue Jan 15, 2018
cfd6751
Fixed typo
nanounanue Jan 15, 2018
597defc
We are not supporting Python 2.7 so...
nanounanue Jan 15, 2018
a8a5b37
Added s3 support to metta (only for csv files)
nanounanue Jan 15, 2018
c64a788
Corrected some minor errors in both READMEs
nanounanue Jan 15, 2018
1e11214
Added logging
nanounanue Jan 17, 2018
5816be3
Fixed logging messages
nanounanue Jan 18, 2018
f7b2535
Another error in the messages :/. The fun thing is that this error
nanounanue Jan 18, 2018
a8807fe
Missing "f"
nanounanue Jan 18, 2018
b5e06b1
Getting the size of the data frame in bytes
nanounanue Jan 18, 2018
73acf94
Skipping creation of matrices on disk if they are already there (s3 v…
nanounanue Jan 18, 2018
cd8c53d
Better use of logging
nanounanue Jan 18, 2018
7aa94a7
Added
nanounanue Jan 18, 2018
45f9f50
unpinned sklearn
nanounanue Jan 18, 2018
e03cfe9
Fixing some of the comments (except the refactoring ones)
nanounanue Jan 30, 2018
f332833
Using joblib instead of pickle. Also storing the models using compres…
nanounanue Jan 30, 2018
65d0045
Removing unnecessary init
nanounanue Jan 30, 2018
e467cd1
Test fixes
thcrock Feb 1, 2018
afd79f2
Flake8 fixes
thcrock Feb 1, 2018
6b490a3
Standardize on joblib pickling, remove now unused catwalk util code
thcrock Feb 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
triage-3.5.3
triage-3.6.2
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file was autogenerated and will overwrite each time you run travis_pypi_setup.py
language: python
python: 3.5
python: 3.6
addons:
postgresql: '9.5'
services:
Expand Down
3 changes: 2 additions & 1 deletion requirement/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PyYAML
psycopg2
python-dateutil
scipy
scikit-learn==0.18.2
scikit-learn
tables==3.3.0
matplotlib
pandas
Expand All @@ -17,3 +17,4 @@ retrying
Dickens==1.0.1
signalled-timeout==1.0.0
smart_open==1.5.3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need smart_open or can we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it

s3fs==0.1.2
8 changes: 5 additions & 3 deletions src/tests/catwalk_tests/test_individual_importance_uniform.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_uniform_distribution_entity_id_index():
for imp in feature_importances:
data_dict[imp.feature] = [0.5, 0.5]
test_store = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data_dict).set_index(['entity_id']),
matrix=pandas.DataFrame.from_dict(data_dict),
metadata=sample_metadata()
)
session.commit()
Expand Down Expand Up @@ -68,9 +68,11 @@ def test_uniform_distribution_entity_date_index():
data_dict = {'entity_id': [1, 1], 'as_of_date': ['2016-01-01', '2017-01-01']}
for imp in feature_importances:
data_dict[imp.feature] = [0.5, 0.5]
metadata = sample_metadata()
metadata['indices'] = ['entity_id', 'as_of_date']
test_store = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data_dict).set_index(['entity_id', 'as_of_date']),
metadata=sample_metadata()
matrix=pandas.DataFrame.from_dict(data_dict),
metadata=metadata
)
session.commit()
results = uniform_distribution(
Expand Down
2 changes: 1 addition & 1 deletion src/tests/catwalk_tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_integration():
for as_of_date in as_of_dates
]

model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
model_storage_engine = S3ModelStorageEngine(project_path)

experiment_hash = save_experiment_and_get_hash({}, db_engine)
# instantiate pipeline objects
Expand Down
16 changes: 5 additions & 11 deletions src/tests/catwalk_tests/test_model_trainers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import boto3
import pandas
import pickle
import testing.postgresql
import datetime
import sqlalchemy
Expand All @@ -10,7 +9,6 @@
from moto import mock_s3
from sqlalchemy import create_engine
from triage.component.catwalk.db import ensure_db
from triage.component.catwalk.utils import model_cache_key

from triage.component.catwalk.model_trainers import ModelTrainer
from triage.component.catwalk.storage import InMemoryModelStorageEngine,\
Expand Down Expand Up @@ -51,7 +49,7 @@ def test_model_trainer():
'indices': ['entity_id'],
}
project_path = 'econ-dev/inspections'
model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
model_storage_engine = S3ModelStorageEngine(project_path)
trainer = ModelTrainer(
project_path=project_path,
experiment_hash=None,
Expand Down Expand Up @@ -79,11 +77,7 @@ def test_model_trainer():
engine.execute('select model_hash from results.models')
]
assert len(records) == 4

cache_keys = [
model_cache_key(project_path, model_row[0], s3_conn)
for model_row in records
]
hashes = [row[0] for row in records]

# 2. that the model groups are distinct
records = [
Expand All @@ -94,8 +88,8 @@ def test_model_trainer():

# 3. that all four models are cached
model_pickles = [
pickle.loads(cache_key.get()['Body'].read())
for cache_key in cache_keys
model_storage_engine.get_store(model_hash).load()
for model_hash in hashes
]
assert len(model_pickles) == 4
assert len([x for x in model_pickles if x is not None]) == 4
Expand Down Expand Up @@ -203,7 +197,7 @@ def test_n_jobs_not_new_model():
trainer = ModelTrainer(
project_path='econ-dev/inspections',
experiment_hash=None,
model_storage_engine=S3ModelStorageEngine(s3_conn, 'econ-dev/inspections'),
model_storage_engine=S3ModelStorageEngine('econ-dev/inspections'),
db_engine=engine,
model_group_keys=['label_name', 'label_timespan']
)
Expand Down
6 changes: 3 additions & 3 deletions src/tests/catwalk_tests/test_predictors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_predictor():
s3_conn = boto3.resource('s3')
s3_conn.create_bucket(Bucket='econ-dev')
project_path = 'econ-dev/inspections'
model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
model_storage_engine = S3ModelStorageEngine(project_path)
_, model_id = \
fake_trained_model(project_path, model_storage_engine, db_engine)
predictor = Predictor(project_path, model_storage_engine, db_engine)
Expand Down Expand Up @@ -148,7 +148,7 @@ def test_predictor_composite_index():
'end_time': AS_OF_DATE,
'label_timespan': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
'indices': ['entity_id', 'as_of_date'],
}
matrix_store = InMemoryMatrixStore(matrix, metadata)
predict_proba = predictor.predict(
Expand Down Expand Up @@ -238,7 +238,7 @@ def test_predictor_retrieve():
'end_time': AS_OF_DATE,
'label_timespan': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
'indices': ['entity_id', 'as_of_date'],
}
matrix_store = InMemoryMatrixStore(matrix, metadata)
predict_proba = predictor.predict(
Expand Down
46 changes: 21 additions & 25 deletions src/tests/catwalk_tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import yaml
from collections import OrderedDict

import boto
import pandas
import pandas as pd
from moto import mock_s3, mock_s3_deprecated

from triage.component.catwalk.storage import (
Expand All @@ -24,11 +23,11 @@ def __init__(self, val):


def test_S3Store():
import boto3
with mock_s3():
s3_conn = boto3.resource('s3')
s3_conn.create_bucket(Bucket='a-bucket')
store = S3Store(s3_conn.Object('a-bucket', 'a-path'))
import boto3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, just curious if boto3 really has to be imported dynamically in the test, rather than at the module-level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I believe I introduced this pattern a while ago to attempt to fix something with mocking but am unsure if it is needed.

client = boto3.client('s3')
client.create_bucket(Bucket='test_bucket', ACL='public-read-write')
store = S3Store(path=f"s3://test_bucket/a_path")
assert not store.exists()
store.write(SomeClass('val'))
assert store.exists()
Expand Down Expand Up @@ -71,7 +70,7 @@ def matrix_store(self):
('m_feature', [0.4, 0.5]),
('label', [0, 1])
])
df = pandas.DataFrame.from_dict(data_dict)
df = pd.DataFrame.from_dict(data_dict)
metadata = {
'label_name': 'label',
'indices': ['entity_id'],
Expand All @@ -85,7 +84,7 @@ def matrix_store(self):
tmphdf = os.path.join(tmpdir, 'df.h5')
with open(tmpyaml, 'w') as outfile:
yaml.dump(metadata, outfile, default_flow_style=False)
df.to_csv(tmpcsv, index=False)
df.to_csv(tmpcsv)
df.to_hdf(tmphdf, 'matrix')
csv = CSVMatrixStore(matrix_path=tmpcsv, metadata_path=tmpyaml)
hdf = HDFMatrixStore(matrix_path=tmphdf, metadata_path=tmpyaml)
Expand All @@ -105,7 +104,8 @@ def matrix_store(self):
assert csv.labels().to_dict() == inmemory.labels().to_dict()
assert hdf.labels().to_dict() == inmemory.labels().to_dict()

matrix_store = [inmemory, hdf, csv]
matrix_store = [inmemory, csv, hdf]

return matrix_store

def test_MatrixStore_resort_columns(self):
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_as_of_dates_entity_index(self):
'feature_two': [0.5, 0.6],
}
matrix = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data),
matrix=pd.DataFrame.from_dict(data),
metadata={'end_time': '2016-01-01', 'indices': ['entity_id']}
)

Expand All @@ -190,29 +190,25 @@ def test_as_of_dates_entity_date_index(self):
'as_of_date': ['2016-01-01', '2016-01-01', '2017-01-01', '2017-01-01']
}
matrix = InMemoryMatrixStore(
matrix=pandas.DataFrame.from_dict(data),
matrix=pd.DataFrame.from_dict(data),
metadata={'indices': ['entity_id', 'as_of_date']}
)

self.assertEqual(matrix.as_of_dates, ['2016-01-01', '2017-01-01'])

def test_s3_save(self):
with mock_s3_deprecated():
s3_conn = boto.connect_s3()
bucket_name = 'fake-matrix-bucket'
s3_conn.create_bucket(bucket_name)
with mock_s3():
import boto3
client = boto3.client('s3')
client.create_bucket(Bucket='fake-matrix-bucket', ACL='public-read-write')

matrix_store_list = self.matrix_store()

for matrix_store in matrix_store_list:
matrix_store.save(project_path='s3://fake-matrix-bucket', name='test')

# HDF
hdf = HDFMatrixStore(matrix_path='s3://fake-matrix-bucket/test.h5', metadata_path='s3://fake-matrix-bucket/test.yaml')
# CSV
csv = CSVMatrixStore(matrix_path='s3://fake-matrix-bucket/test.csv', metadata_path='s3://fake-matrix-bucket/test.yaml')
if isinstance(matrix_store, CSVMatrixStore):
matrix_store.save(project_path='s3://fake-matrix-bucket', name='test')
# CSV
csv = CSVMatrixStore(matrix_path='s3://fake-matrix-bucket/test.csv', metadata_path='s3://fake-matrix-bucket/test.yaml')

assert csv.metadata == matrix_store_list[0].metadata
assert csv.matrix.to_dict() == matrix_store_list[0].matrix.to_dict()
assert hdf.metadata == matrix_store_list[0].metadata
assert hdf.matrix.to_dict() == matrix_store_list[0].matrix.to_dict()
assert csv.metadata == matrix_store_list[0].metadata
assert csv.matrix.to_dict() == matrix_store_list[0].matrix.to_dict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So have we removed testing of non-csv stores? Or is it that you've added assertions which can only apply to csv? (And we can't apply these to, say, HDF?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not testing HDF

We should move to Arrow or Parquet or at least compressed CSV

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have HDF testing. This is for S3 storage specifically. The code now disallows storage of HDF on S3 by throwing an error upfront. So I think this testing is fine for the current state of the code.

34 changes: 29 additions & 5 deletions src/triage/component/architect/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pandas
import os

import s3fs
from urllib.parse import urlparse

from triage.component import metta


Expand All @@ -25,8 +28,11 @@ def validate(self):

def build_all_matrices(self, build_tasks):
logging.info('Building %s matrices', len(build_tasks.keys()))
for matrix_uuid, task_arguments in build_tasks.items():

for i, (matrix_uuid, task_arguments) in enumerate(build_tasks.items()):
logging.info(f"Building matrix {matrix_uuid} ({i}/{len(build_tasks.keys())})")
self.build_matrix(**task_arguments)
logging.debug(f"Matrix {matrix_uuid} built")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't have a firm opinion on this, now that we have the wonderful f string; however, logging advises lazy %s interpolation because – particularly in the case of debug messages – these messages might simply be discarded, and there's no reason to waste cycles eagerly casting these objects to str and interpolating them:

logging.debug("Matrix %s built", matrix_uuid)

or:

logging.debug("Matrix %(matrix_uuid)s built", {'matrix_uuid': matrix_uuid})

(For that matter, %s interpolation simply isn't going away, no matter how much we might like it to; and, lazy %s interpolation is alive and strong, thanks to logging and database drivers.)

All that said, particularly for higher-level log messages, it's hard to argue with the elegance and simplicity of f strings.


def _outer_join_query(
self,
Expand Down Expand Up @@ -224,9 +230,23 @@ def build_matrix(
matrix_directory,
'{}.csv'.format(matrix_uuid)
)
if not self.replace and os.path.exists(matrix_filename):
logging.info('Skipping %s because matrix already exists', matrix_filename)
return

# The output directory is local or in s3
path_parsed = urlparse(matrix_filename)
scheme = path_parsed.scheme # If '' of 'file' is a regular file or 's3'

if scheme in ('', 'file'):
if not self.replace and os.path.exists(matrix_filename):
logging.info('Skipping %s because matrix already exists', matrix_filename)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the additional logging is great. I don't have the hands-on experience with debugging an experiment, and so I can't say how "spammy" these might seem outside of the DEBUG context; rather, just curious if they might be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did decide a while ago that we would err the side of being too spammy, at least for now.

return
elif scheme == 's3':
if not self.replace and s3fs.S3FileSystem().exists(matrix_filename):
logging.info('Skipping %s because matrix already exists', matrix_filename)
return
else:
raise ValueError(f"""URL scheme not supported:
{scheme} (from {matrix_filename})
""")

logging.info('Creating matrix %s > %s', matrix_metadata['matrix_id'], matrix_filename)
# make the entity time table and query the labels and features tables
Expand All @@ -248,8 +268,9 @@ def build_matrix(
entity_date_table_name,
matrix_uuid
)
logging.info(f"Feature data extracted for matrix {matrix_uuid}")
try:
logging.info('Extracting label data frmo database into file for '
logging.info('Extracting label data from database into file for '
'matrix %s', matrix_uuid)
labels_csv_name = self.write_labels_data(
label_name,
Expand All @@ -260,13 +281,15 @@ def build_matrix(
)
features_csv_names.insert(0, labels_csv_name)

logging.info(f"Label data extracted for matrix {matrix_uuid}")
# stitch together the csvs
logging.info('Merging feature files for matrix %s', matrix_uuid)
output = self.merge_feature_csvs(
features_csv_names,
matrix_directory,
matrix_uuid
)
logging.info(f"Features data merged for matrix {matrix_uuid}")
finally:
# clean up files and database before finishing
for csv_name in features_csv_names:
Expand All @@ -281,6 +304,7 @@ def build_matrix(
directory=self.matrix_directory,
format='csv'
)
logging.info("Matrix {matrix_uuid} archived (using metta)")
finally:
if isinstance(output, str):
os.remove(output)
Expand Down
2 changes: 1 addition & 1 deletion src/triage/component/architect/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,5 +221,5 @@ def build_all_matrices(self, *args, **kwargs):
self.builder.build_all_matrices(*args, **kwargs)

def build_matrix(self, *args, **kwargs):
logging.info('Building matrix with args %s', args)
logging.info(f"Building matrix with {args}, {kwargs}")
self.builder.build_matrix(*args, **kwargs)
10 changes: 5 additions & 5 deletions src/triage/component/catwalk/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Below is a complete sample usage of the three Catwalk components::
from sqlalchemy import create_engine

from triage.component import metta
from triage.component.catwalk.storage import FSModelStorageEngine, MettaCSVMatrixStore
from triage.component.catwalk.storage import FSModelStorageEngine, CSVMatrixStore
from triage.component.catwalk.model_trainers import ModelTrainer
from triage.component.catwalk.predictors import Predictor
from triage.component.catwalk.evaluation import ModelEvaluator
Expand Down Expand Up @@ -66,9 +66,9 @@ Below is a complete sample usage of the three Catwalk components::
}
train_matrix_uuid = metta.archive_matrix(train_metadata, train_matrix, format='csv')

# The MettaCSVMatrixStore bundles the matrix and metadata together
# The CSVMatrixStore bundles the matrix and metadata together
# for catwalk to use
train_matrix_store = MettaCSVMatrixStore(
train_matrix_store = CSVMatrixStore(
matrix_path='{}.csv'.format(train_matrix_uuid),
metadata_path='{}.yaml'.format(train_matrix_uuid)
)
Expand All @@ -91,9 +91,9 @@ Below is a complete sample usage of the three Catwalk components::
}
test_matrix_uuid = metta.archive_matrix(test_metadata, test_matrix, format='csv')

# The MettaCSVMatrixStore bundles the matrix and metadata together
# The CSVMatrixStore bundles the matrix and metadata together
# for catwalk to use
test_matrix_store = MettaCSVMatrixStore(
test_matrix_store = CSVMatrixStore(
matrix_path='{}.csv'.format(test_matrix_uuid),
metadata_path='{}.yaml'.format(test_matrix_uuid)
)
Expand Down
7 changes: 5 additions & 2 deletions src/triage/component/catwalk/model_trainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ def process_train_task(
elif not saved_model_id:
reason = 'model metadata not found'

logging.info('Training %s/%s: %s', class_path, parameters, reason)
logging.info(f"Training {class_path} with parameters {parameters}"
f"(reason to train: {reason})")
model_id = self._train_and_store_model(
matrix_store,
class_path,
Expand Down Expand Up @@ -464,7 +465,8 @@ def generate_train_tasks(

for class_path, parameters in self._generate_model_configs(grid_config):
model_hash = self._model_hash(matrix_store.metadata, class_path, parameters)
logging.info('Computed model hash %s', model_hash)
logging.info(f"Computed model hash for {class_path} "
f"with parameters {parameters}: {model_hash}")

if any(task['model_hash'] == model_hash for task in tasks):
logging.info('Skipping model_hash %s because another'
Expand All @@ -480,4 +482,5 @@ def generate_train_tasks(
'misc_db_parameters': misc_db_parameters,
})
logging.info('Found %s unique model training tasks', len(tasks))

return tasks
Loading