Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize and Deserialize from S3/URLs #685

Merged
merged 86 commits into from Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
c6b4e7f
Added smart-open to requirements
jeremyliweishih Jul 23, 2019
6add696
Added import to replace open with smart_open
jeremyliweishih Jul 23, 2019
f6de2a8
Added smart_open for writing
jeremyliweishih Jul 23, 2019
c6879b6
First changes to serialize
jeremyliweishih Jul 24, 2019
2388c46
Fixed serialization
jeremyliweishih Jul 24, 2019
29650d1
Added first half of tests using moto
jeremyliweishih Jul 24, 2019
186717a
Fixed serializing
jeremyliweishih Jul 24, 2019
42f05f0
Added url deserialization
jeremyliweishih Jul 24, 2019
84c5b52
Added s3 tests
jeremyliweishih Jul 24, 2019
1c3b8b3
Fixed import order
jeremyliweishih Jul 24, 2019
08b53f5
Added s3 test for feature serialization using moto
jeremyliweishih Jul 24, 2019
71e1f13
Added read from real s3_url test and url tests
jeremyliweishih Jul 25, 2019
244ef09
Added using default profile for s3 or use session parameter
jeremyliweishih Jul 25, 2019
02b40ca
fixed real s3 test name
jeremyliweishih Jul 25, 2019
f4f1cae
Seperated s3 and urls
jeremyliweishih Jul 25, 2019
3bcb41d
Added support to s3 and URLs to feature serialization
jeremyliweishih Jul 25, 2019
6de475a
Added tests for feature serialization
jeremyliweishih Jul 25, 2019
80dd85b
Fixed import order
jeremyliweishih Jul 25, 2019
44b0438
Merge branch 'master' into read_from_url
jeremyliweishih Jul 25, 2019
f188315
erge branch 'master' of https://github.com/featuretools/featuretools …
jeremyliweishih Jul 25, 2019
cd9e86b
Merge branch 'read_from_url' of https://github.com/featuretools/featu…
jeremyliweishih Jul 25, 2019
a879bed
Added pathlib for py < 3
jeremyliweishih Jul 25, 2019
736e634
Fixed version of pathlib
jeremyliweishih Jul 25, 2019
45b1130
Added s3fs for anon users
jeremyliweishih Jul 25, 2019
e679b30
Fixed s3 without credentials for serialization
jeremyliweishih Jul 25, 2019
a2b27e3
Fixed s3 without credentials for feature serialization
jeremyliweishih Jul 25, 2019
77ccccd
Fixed tests for s3 without credentials
jeremyliweishih Jul 25, 2019
5b4f557
Fixed tests for s3 without credentials for feature serialization
jeremyliweishih Jul 25, 2019
85cfb42
Fixed lint issues
jeremyliweishih Jul 25, 2019
ff42ff4
Fixed reading bytes from json for py3.5
jeremyliweishih Jul 26, 2019
e9b2e43
Added encoding for json loading
jeremyliweishih Jul 26, 2019
a5104d3
Added sorting for unit tests as feature ordering is different in 3.5 …
jeremyliweishih Jul 26, 2019
99edd99
Added changes to changelog
jeremyliweishih Jul 26, 2019
cac9074
Edited comments and made stylistic changes
jeremyliweishih Jul 26, 2019
7ab3ded
Fixed s3fs version
jeremyliweishih Jul 26, 2019
0a9fa2c
Fixed lint issues
jeremyliweishih Jul 26, 2019
27e9f06
Replaced urllib for python 2
jeremyliweishih Jul 26, 2019
60cfb2d
Fixed space in changelog
jeremyliweishih Jul 26, 2019
66e0ef6
Fixed checking url
jeremyliweishih Jul 26, 2019
10bd93c
Changed to use backport.tempfile for py2.7
jeremyliweishih Jul 26, 2019
9ca4bba
Fixed checking URLs for features
jeremyliweishih Jul 26, 2019
5397307
Fixed writing to s3 using s3fs
jeremyliweishih Jul 26, 2019
6c1b83b
Added AWS profile as paramater for serialization
jeremyliweishih Jul 29, 2019
7e35b4c
Added tests for checking AWS profile
jeremyliweishih Jul 29, 2019
99777ad
Added aws_profile as a parameter
jeremyliweishih Jul 29, 2019
cc4ce08
Added profile_name as parameter
jeremyliweishih Jul 29, 2019
3485a8a
Fixed lint
jeremyliweishih Jul 29, 2019
1168ab4
Fixed comments with path_name parameter
jeremyliweishih Jul 29, 2019
e34482a
Slight fix to logic
jeremyliweishih Jul 29, 2019
9f603dd
Removed try in tests
jeremyliweishih Jul 29, 2019
5004ca8
Added fixtures for mock_s3 tests
jeremyliweishih Jul 29, 2019
16b5769
Moved moto to test-requirements
jeremyliweishih Jul 30, 2019
1d0a63a
Cleaned up verbose directory creation
jeremyliweishih Jul 30, 2019
076a15e
Fixed forward slash for arc name
jeremyliweishih Jul 30, 2019
c73bc0a
Added helper functions to clean code
jeremyliweishih Jul 30, 2019
ab0cce5
Moved url checking functions to utils
jeremyliweishih Jul 30, 2019
25eef1f
Fixed if style
jeremyliweishih Jul 30, 2019
2c184c5
Fixed test config name
jeremyliweishih Jul 30, 2019
254121e
Changed behavior for aws credentials
jeremyliweishih Jul 30, 2019
88974b5
Fixed checking profile tests and resetting environment variables
jeremyliweishih Jul 30, 2019
626be31
Removed unused imports
jeremyliweishih Jul 30, 2019
9d28710
Fixed profile tests
jeremyliweishih Jul 30, 2019
3a9a699
Added python 2 specific importing
jeremyliweishih Jul 30, 2019
a8c3e0b
Fixed wrong function call
jeremyliweishih Jul 30, 2019
8bcbac7
Fixed requirements to be python2 only for backports
jeremyliweishih Jul 30, 2019
6a88969
Fixed checking python version in requirements
jeremyliweishih Jul 30, 2019
6e4baf4
Fixed if logic
jeremyliweishih Jul 30, 2019
4256a6c
Fixed order of tests for python2
jeremyliweishih Jul 30, 2019
544d89c
Added tests to explicitly test anon profiles
jeremyliweishih Jul 30, 2019
44968d3
Fixed logic and made more consistent
jeremyliweishih Jul 31, 2019
9ec6588
Moved open statements into utils and fixed logic
jeremyliweishih Jul 31, 2019
bbf87eb
Made tests consitent with other serialization tests
jeremyliweishih Jul 31, 2019
5940734
Removed faulty encoding parameter for py2
jeremyliweishih Jul 31, 2019
73d7ab5
Added profile specific tests
jeremyliweishih Jul 31, 2019
7b4702b
Fix lint
jeremyliweishih Jul 31, 2019
7db7701
Fixed descriptions
jeremyliweishih Aug 2, 2019
cdd8b49
Fixed imports
jeremyliweishih Aug 2, 2019
4970a4e
Moved repetitive code into funciton
jeremyliweishih Aug 2, 2019
dc9f1a7
Add todo
jeremyliweishih Aug 2, 2019
281c9cc
Clean up setting up test profile
jeremyliweishih Aug 2, 2019
7b18ebc
Move repetitive code into function
jeremyliweishih Aug 2, 2019
6a04a11
Reduced repetitive tests
jeremyliweishih Aug 2, 2019
95644c1
Added expectation of tar format
jeremyliweishih Aug 2, 2019
bec2e3c
Fixed description
jeremyliweishih Aug 2, 2019
d29411a
Parameterize testing s3
jeremyliweishih Aug 2, 2019
eac8821
Merge branch 'master' into read_from_url
jeremyliweishih Aug 2, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changelog.rst
Expand Up @@ -6,6 +6,7 @@ Changelog
* Enhancements
* Added drop_first as param in encode_features (:pr:`647`)
* Generate transform features of direct features (:pr:`623`)
* Added serializing and deserializing from S3 and deserializing from URLs (:pr:`685`)
* Fixes
* Fix performance regression in DFS (:pr:`637`)
* Fix deserialization of feature relationship path (:pr:`665`)
Expand Down
57 changes: 49 additions & 8 deletions featuretools/entityset/deserialize.py
@@ -1,13 +1,27 @@
import json
import os
import tarfile
from pathlib import Path

import boto3
import pandas as pd

from featuretools.entityset.relationship import Relationship
from featuretools.entityset.serialize import FORMATS
from featuretools.utils.gen_utils import check_schema_version
from featuretools.utils.gen_utils import (
check_schema_version,
is_python_2,
use_s3fs_es,
use_smartopen_es
)
from featuretools.utils.wrangle import _is_s3, _is_url
from featuretools.variable_types.variable import find_variable_types

if is_python_2():
from backports import tempfile
else:
import tempfile


def description_to_variable(description, entity=None):
'''Deserialize variable from variable description.
Expand Down Expand Up @@ -132,14 +146,15 @@ def read_entity_data(description, path):


def read_data_description(path):
'''Read data description from disk.
'''Read data description from disk, S3 path, or URL.

Args:
path (str): Location on disk to read `data_description.json`.
path (str): Location on disk, S3 path, or URL to read `data_description.json`.

Returns:
description (dict) : Description of :class:`.EntitySet`.
'''

path = os.path.abspath(path)
assert os.path.exists(path), '"{}" does not exist'.format(path)
file = os.path.join(path, 'data_description.json')
Expand All @@ -149,12 +164,38 @@ def read_data_description(path):
return description


def read_entityset(path, **kwargs):
'''Read entityset from disk.
def read_entityset(path, profile_name=None, **kwargs):
'''Read entityset from disk, S3 path, or URL.

Args:
path (str): Directory on disk to read `data_description.json`.
path (str): Directory on disk, S3 path, or URL to read `data_description.json`.
profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials.
Set to False to use an anonymous profile.
kwargs (keywords): Additional keyword arguments to pass as keyword arguments to the underlying deserialization method.
'''
data_description = read_data_description(path)
return description_to_entityset(data_description, **kwargs)
if _is_url(path) or _is_s3(path):
with tempfile.TemporaryDirectory() as tmpdir:
file_name = Path(path).name
file_path = os.path.join(tmpdir, file_name)
transport_params = {}
session = boto3.Session()

if _is_url(path):
use_smartopen_es(file_path, path)
elif isinstance(profile_name, str):
transport_params = {'session': boto3.Session(profile_name=profile_name)}
use_smartopen_es(file_path, path, transport_params)
elif profile_name is False:
use_s3fs_es(file_path, path)
elif session.get_credentials() is not None:
use_smartopen_es(file_path, path)
else:
use_s3fs_es(file_path, path)

tar = tarfile.open(str(file_path))
tar.extractall(path=tmpdir)
data_description = read_data_description(tmpdir)
return description_to_entityset(data_description, **kwargs)
else:
data_description = read_data_description(path)
return description_to_entityset(data_description, **kwargs)
23 changes: 16 additions & 7 deletions featuretools/entityset/entityset.py
Expand Up @@ -143,38 +143,47 @@ def metadata(self):
def reset_data_description(self):
self._data_description = None

def to_pickle(self, path, compression=None):
'''Write entityset to disk in the pickle format, location specified by `path`.
def to_pickle(self, path, compression=None, profile_name=None):
'''Write entityset in the pickle format, location specified by `path`.
Path could be a local path or a S3 path.
If writing to S3 a tar archive of files will be written.

Args:
path (str): location on disk to write to (will be created as a directory)
compression (str) : Name of the compression to use. Possible values are: {'gzip', 'bz2', 'zip', 'xz', None}.
profile_name (str) : Name of AWS profile to use, False to use an anonymous profile, or None.
'''
serialize.write_data_description(self, path, format='pickle', compression=compression)
serialize.write_data_description(self, path, format='pickle', compression=compression, profile_name=profile_name)
return self

def to_parquet(self, path, engine='auto', compression=None):
def to_parquet(self, path, engine='auto', compression=None, profile_name=None):
'''Write entityset to disk in the parquet format, location specified by `path`.
Path could be a local path or a S3 path.
If writing to S3 a tar archive of files will be written.

Args:
path (str): location on disk to write to (will be created as a directory)
engine (str) : Name of the engine to use. Possible values are: {'auto', 'pyarrow', 'fastparquet'}.
compression (str) : Name of the compression to use. Possible values are: {'snappy', 'gzip', 'brotli', None}.
profile_name (str) : Name of AWS profile to use, False to use an anonymous profile, or None.
'''
serialize.write_data_description(self, path, format='parquet', engine=engine, compression=compression)
serialize.write_data_description(self, path, format='parquet', engine=engine, compression=compression, profile_name=profile_name)
return self

def to_csv(self, path, sep=',', encoding='utf-8', engine='python', compression=None):
def to_csv(self, path, sep=',', encoding='utf-8', engine='python', compression=None, profile_name=None):
'''Write entityset to disk in the csv format, location specified by `path`.
Path could be a local path or a S3 path.
If writing to S3 a tar archive of files will be written.

Args:
path (str) : Location on disk to write to (will be created as a directory)
sep (str) : String of length 1. Field delimiter for the output file.
encoding (str) : A string representing the encoding to use in the output file, defaults to 'utf-8'.
engine (str) : Name of the engine to use. Possible values are: {'c', 'python'}.
compression (str) : Name of the compression to use. Possible values are: {'gzip', 'bz2', 'zip', 'xz', None}.
profile_name (str) : Name of AWS profile to use, False to use an anonymous profile, or None.
'''
serialize.write_data_description(self, path, format='csv', index=False, sep=sep, encoding=encoding, engine=engine, compression=compression)
serialize.write_data_description(self, path, format='csv', index=False, sep=sep, encoding=encoding, engine=engine, compression=compression, profile_name=profile_name)
return self

def to_dictionary(self):
Expand Down
71 changes: 61 additions & 10 deletions featuretools/entityset/serialize.py
@@ -1,6 +1,22 @@
import datetime
import json
import os
import shutil
import tarfile

import boto3

from featuretools.utils.gen_utils import (
is_python_2,
use_s3fs_es,
use_smartopen_es
)
from featuretools.utils.wrangle import _is_s3, _is_url

if is_python_2():
from backports import tempfile
else:
import tempfile

FORMATS = ['csv', 'pickle', 'parquet']
SCHEMA_VERSION = "1.0.0"
Expand Down Expand Up @@ -58,7 +74,7 @@ def entityset_to_description(entityset):


def write_entity_data(entity, path, format='csv', **kwargs):
'''Write entity data to disk.
'''Write entity data to disk or S3 path.

Args:
entity (Entity) : Instance of :class:`.Entity`.
Expand Down Expand Up @@ -97,23 +113,58 @@ def write_entity_data(entity, path, format='csv', **kwargs):
return {'location': location, 'type': format, 'params': kwargs}


def write_data_description(entityset, path, **kwargs):
'''Serialize entityset to data description and write to disk.
def write_data_description(entityset, path, profile_name=None, **kwargs):
'''Serialize entityset to data description and write to disk or S3 path.

Args:
entityset (EntitySet) : Instance of :class:`.EntitySet`.
path (str) : Location on disk to write `data_description.json` and entity data.
kwargs (keywords) : Additional keyword arguments to pass as keywords arguments to the underlying serialization method.
path (str) : Location on disk or S3 path to write `data_description.json` and entity data.
profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials.
Set to False to use an anonymous profile.
kwargs (keywords) : Additional keyword arguments to pass as keywords arguments to the underlying serialization method or to specify AWS profile.
'''
path = os.path.abspath(path)
if os.path.exists(path):
shutil.rmtree(path)
for dirname in [path, os.path.join(path, 'data')]:
os.makedirs(dirname)
if _is_s3(path):
with tempfile.TemporaryDirectory() as tmpdir:
os.makedirs(os.path.join(tmpdir, 'data'))
dump_data_description(entityset, tmpdir, **kwargs)
file_path = create_archive(tmpdir)

transport_params = {}
session = boto3.Session()
if isinstance(profile_name, str):
transport_params = {'session': boto3.Session(profile_name=profile_name)}
use_smartopen_es(file_path, path, transport_params, read=False)
elif profile_name is False:
use_s3fs_es(file_path, path, read=False)
elif session.get_credentials() is not None:
use_smartopen_es(file_path, path, read=False)
else:
use_s3fs_es(file_path, path, read=False)
elif _is_url(path):
raise ValueError("Writing to URLs is not supported")
else:
path = os.path.abspath(path)
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(os.path.join(path, 'data'))
dump_data_description(entityset, path, **kwargs)


def dump_data_description(entityset, path, **kwargs):
description = entityset_to_description(entityset)
for entity in entityset.entities:
loading_info = write_entity_data(entity, path, **kwargs)
description['entities'][entity.id]['loading_info'].update(loading_info)
file = os.path.join(path, 'data_description.json')
with open(file, 'w') as file:
json.dump(description, file)


def create_archive(tmpdir):
file_name = "es-{date:%Y-%m-%d_%H:%M:%S}.tar".format(date=datetime.datetime.now())
file_path = os.path.join(tmpdir, file_name)
tar = tarfile.open(str(file_path), 'w')
tar.add(str(tmpdir) + '/data_description.json', arcname='/data_description.json')
tar.add(str(tmpdir) + '/data', arcname='/data')
tar.close()
return file_path
42 changes: 33 additions & 9 deletions featuretools/feature_base/features_deserializer.py
@@ -1,5 +1,7 @@
import json

import boto3

from featuretools.entityset.deserialize import \
description_to_entityset as deserialize_es
from featuretools.feature_base.feature_base import (
Expand All @@ -12,23 +14,31 @@
TransformFeature
)
from featuretools.primitives.utils import PrimitivesDeserializer
from featuretools.utils.gen_utils import check_schema_version
from featuretools.utils.gen_utils import (
check_schema_version,
use_s3fs_features,
use_smartopen_features
)
from featuretools.utils.wrangle import _is_s3, _is_url


def load_features(features):
"""Loads the features from a filepath, an open file, or a JSON formatted string.
def load_features(features, profile_name=None):
"""Loads the features from a filepath, S3 path, URL, an open file, or a JSON formatted string.

Args:
features (str or :class:`.FileObject`): The location of where features has
been saved which this must include the name of the file, or a JSON formatted
string, or a readable file handle where the features have been saved.

profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials.
Set to False to use an anonymous profile.

Returns:
features (list[:class:`.FeatureBase`]): Feature definitions list.

Note:
Features saved in one version of Featuretools are not guaranteed to work in another.
After upgrading Featuretools, features may need to be generated again.
Features saved in one version of Featuretools or python are not guaranteed to work in another.
rwedge marked this conversation as resolved.
Show resolved Hide resolved
After upgrading Featuretools or python, features may need to be generated again.

Example:
.. ipython:: python
Expand All @@ -51,7 +61,7 @@ def load_features(features):
.. seealso::
:func:`.save_features`
"""
return FeaturesDeserializer.load(features).to_list()
return FeaturesDeserializer.load(features, profile_name).to_list()


class FeaturesDeserializer(object):
Expand All @@ -73,13 +83,27 @@ def __init__(self, features_dict):
self._primitives_deserializer = PrimitivesDeserializer()

@classmethod
def load(cls, features):
def load(cls, features, profile_name):
if isinstance(features, str):
try:
features_dict = json.loads(features)
except ValueError:
with open(features, 'r') as f:
features_dict = json.load(f)
if _is_url(features):
features_dict = use_smartopen_features(features)
elif _is_s3(features):
session = boto3.Session()
if isinstance(profile_name, str):
transport_params = {'session': boto3.Session(profile_name=profile_name)}
features_dict = use_smartopen_features(features, transport_params)
elif profile_name is False:
features_dict = use_s3fs_features(features)
elif session.get_credentials() is not None:
features_dict = use_smartopen_features(features)
else:
features_dict = use_s3fs_features(features)
else:
with open(features, 'r') as f:
features_dict = json.load(f)
return cls(features_dict)
return cls(json.load(features))

Expand Down