Skip to content

Commit

Permalink
Parquet serialization (#152)
Browse files Browse the repository at this point in the history
* starting

* works

* moved data to properties

* trying

* tests pass

* mostly working

* tests pass

* modified deployment

* entityset optional arg

* rm extraneous files

* added comments

* updates

* remove unecessary import

* update per roy suggestion

* added comment about entityset metadata memory saving

* working

* got rid of lots of stuff

* 1 entityset test fails

* tests pass

* lint passed

* added __ne__ method to entityset

* lint:

* fixed old merge conflict

* tests pass

* removed class_from_dtype method

* added docstrings

* remove import

* remove add variable to variable_types property

* entity_stores to entity_dict

* update_data reclaculates last_time_indexes and better serialization of entitysets

* serialization using pickle, parquet, and saving metadata in a json

* tests pass, lots of warnings though

* type warning removed

* added ft.read_pickle and ft.read_parquet

* linteD

* reverted

* fixed data type not understood error

* updated test

* commit

* fixed everything except multiindex multiple types not raising typeerror

* updated reqs to run tests on ci

* linted

* linted

* almost

* tests pass

* raise useful error if attempting to use parquet with unicode column names in python 2.7

* Removed old cfm lines from merge conflict

* switched to pyarrow

* tests pass in py2 and py3

* tests pass

* fixed lint

* explicit metadata dict

* linted

* serialize to write namechange

* use public api

* to_parquet test working

* initial pass through

* all tests passing

* fix linting

* change test name

* update update data

* update per code review

* tests pass except stats

* tests pass

* linted

* remove bad file

* added docstrings

* remove metadata_filename arg

* linted
  • Loading branch information
bschreck committed Jun 22, 2018
1 parent b4dd270 commit e3deb21
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 164 deletions.
10 changes: 9 additions & 1 deletion docs/source/api_reference.rst
Expand Up @@ -258,11 +258,19 @@ EntitySet load and prepare data

EntitySet serialization
-------------------------------
.. currentmodule:: featuretools
.. autosummary::
:toctree: generated/

read_pickle
read_parquet

.. currentmodule:: featuretools.entityset
.. autosummary::
:toctree: generated/

EntitySet.to_pickle
EntitySet.read_pickle
EntitySet.to_parquet

EntitySet query methods
-----------------------
Expand Down
1 change: 1 addition & 0 deletions featuretools/entityset/api.py
Expand Up @@ -2,4 +2,5 @@
from .entity import Entity
from .entityset import EntitySet
from .relationship import Relationship
from .serialization import read_entityset, read_parquet, read_pickle
from .timedelta import Timedelta
44 changes: 27 additions & 17 deletions featuretools/entityset/entity.py
Expand Up @@ -41,7 +41,7 @@ class Entity(object):
index = None
indexed_by = None

def __init__(self, id, df, entityset, variable_types=None, name=None,
def __init__(self, id, df, entityset, variable_types=None,
index=None, time_index=None, secondary_time_index=None,
last_time_index=None, encoding=None, relationships=None,
already_sorted=False, created_index=None, verbose=False):
Expand All @@ -56,7 +56,6 @@ def __init__(self, id, df, entityset, variable_types=None, name=None,
entity_id to variable_types dict with which to initialize an
entity's store.
An entity's variable_types dict maps string variable ids to types (:class:`.Variable`).
name (str): Name of entity.
index (str): Name of id column in the dataframe.
time_index (str): Name of time column in the dataframe.
secondary_time_index (dict[str -> str]): Dictionary mapping columns
Expand All @@ -80,7 +79,6 @@ def __init__(self, id, df, entityset, variable_types=None, name=None,
self.created_index = created_index
self.convert_all_variable_data(variable_types)
self.id = id
self.name = name
self.entityset = entityset
self.indexed_by = {}
variable_types = variable_types or {}
Expand All @@ -92,6 +90,7 @@ def __init__(self, id, df, entityset, variable_types=None, name=None,
if ti not in cols:
cols.append(ti)

relationships = relationships or []
link_vars = [v.id for rel in relationships for v in [rel.parent_variable, rel.child_variable]
if v.entity.id == self.id]

Expand Down Expand Up @@ -125,7 +124,6 @@ def __init__(self, id, df, entityset, variable_types=None, name=None,
if v.id == self.index][0]
self.variables = [index_variable] + [v for v in self.variables
if v.id != self.index]

self.update_data(df=self.df,
already_sorted=already_sorted,
recalculate_last_time_indexes=False,
Expand Down Expand Up @@ -538,18 +536,21 @@ def infer_variable_types(self, ignore=None, link_vars=None):

return inferred_types

def update_data(self, df=None, data=None, already_sorted=False,
def update_data(self, df, already_sorted=False,
reindex=True, recalculate_last_time_indexes=True):
'''Update entity's internal dataframe, optionaly making sure data is sorted,
reference indexes to other entities are consistent, and last_time_indexes
are consistent.
'''
if len(df.columns) != len(self.variables):
raise ValueError("Updated dataframe contains {} columns, expecting {}".format(len(df.columns),
len(self.variables)))
for v in self.variables:
if v.id not in df.columns:
raise ValueError("Updated dataframe is missing new {} column".format(v.id))

if data is not None:
self.data = data
elif df is not None:
self.df = df

if data or df is not None:
# Make sure column ordering matches variable ordering
self.df = self.df[[v.id for v in self.variables]]

# Make sure column ordering matches variable ordering
self.df = df[[v.id for v in self.variables]]
self.set_index(self.index)
self.set_time_index(self.time_index, already_sorted=already_sorted)
self.set_secondary_time_index(self.secondary_time_index)
Expand Down Expand Up @@ -648,7 +649,12 @@ def delete_variable(self, variable_id):
def set_time_index(self, variable_id, already_sorted=False):
if variable_id is not None:
# check time type
time_type = _check_time_type(self.df[variable_id].iloc[0])
if self.df.empty:
time_to_check = vtypes.DEFAULT_DTYPE_VALUES[self[variable_id]._default_pandas_dtype]
else:
time_to_check = self.df[variable_id].iloc[0]

time_type = _check_time_type(time_to_check)
if time_type is None:
raise TypeError("%s time index not recognized as numeric or"
" datetime" % (self.id))
Expand All @@ -665,7 +671,7 @@ def set_time_index(self, variable_id, already_sorted=False):
# sort by time variable, then by index
self.df.sort_values([variable_id, self.index], inplace=True)

t = vtypes.TimeIndex
t = vtypes.NumericTimeIndex
if col_is_datetime(self.df[variable_id]):
t = vtypes.DatetimeTimeIndex
self.convert_variable_type(variable_id, t, convert_data=False)
Expand Down Expand Up @@ -695,7 +701,11 @@ def set_index(self, variable_id, unique=True):
def set_secondary_time_index(self, secondary_time_index):
if secondary_time_index is not None:
for time_index in secondary_time_index:
time_type = _check_time_type(self.df[time_index].iloc[0])
if self.df.empty:
time_to_check = vtypes.DEFAULT_DTYPE_VALUES[self[time_index]._default_pandas_dtype]
else:
time_to_check = self.df[time_index].iloc[0]
time_type = _check_time_type(time_to_check)
if time_type is None:
raise TypeError("%s time index not recognized as numeric or"
" datetime" % (self.id))
Expand Down
183 changes: 109 additions & 74 deletions featuretools/entityset/entityset.py
Expand Up @@ -7,11 +7,11 @@
import cloudpickle
import numpy as np
import pandas as pd
from pandas.api.types import is_dtype_equal
from pandas.api.types import is_dtype_equal, is_numeric_dtype

from .entity import Entity
from .relationship import Relationship
from .serialization import read_pickle, to_pickle
from .serialization import load_entity_data, write_entityset

import featuretools.variable_types.variable as vtypes
from featuretools.utils.gen_utils import make_tqdm_iterator
Expand Down Expand Up @@ -172,22 +172,24 @@ def entities(self):

@property
def metadata(self):
'''Defined as a property because an EntitySet's metadata
is used in many places, for instance, for each feature in a feature list.
'''Version of this EntitySet with all data replaced with empty DataFrames.
An EntitySet's metadata is used in many places, for instance,
for each feature in a feature list.
To prevent using copying the full metadata object to each feature,
we generate a new metadata object and check if it's the same as the existing one,
and if it is return the existing one. Thus, all features in the feature list
would reference the same object, rather than copies. This saves a lot of memory
'''
new_metadata = self.from_metadata(self.create_metadata_dict(),
data_root=None)
if self._metadata is None:
self._metadata = self._gen_metadata()
self._metadata = new_metadata
else:
new_metadata = self._gen_metadata()
# Don't want to keep making new copies of metadata
# Only make a new one if something was changed
if not self._metadata.__eq__(new_metadata):
if self._metadata != new_metadata:
self._metadata = new_metadata

return self._metadata

@property
Expand All @@ -199,12 +201,77 @@ def is_metadata(self):
return all(e.df.empty for e in self.entity_dict.values())

def to_pickle(self, path):
to_pickle(self, path)
'''Write entityset to disk in the pickle format, location specified by `path`.
Args:
* entityset: entityset to write to disk
* path (str): location on disk to write to (will be created as a directory)
'''

write_entityset(self, path, serialization_method='pickle')
return self

def to_parquet(self, path):
'''Write entityset to disk in the parquet format, location specified by `path`.
Args:
* entityset: entityset to write to disk
* path (str): location on disk to write to (will be created as a directory)
'''

write_entityset(self, path, serialization_method='parquet')
return self

def create_metadata_dict(self):
return {
'id': self.id,
'relationships': [{
'parent_entity': r.parent_entity.id,
'parent_variable': r.parent_variable.id,
'child_entity': r.child_entity.id,
'child_variable': r.child_variable.id,
} for r in self.relationships],
'entity_dict': {eid: {
'index': e.index,
'time_index': e.time_index,
'secondary_time_index': e.secondary_time_index,
'encoding': e.encoding,
'variables': {
v.id: v.create_metadata_dict()
for v in e.variables
},
'has_last_time_index': e.last_time_index is not None
} for eid, e in self.entity_dict.items()},
}

@classmethod
def read_pickle(cls, path):
return read_pickle(path)
def from_metadata(cls, metadata, data_root=None):
es = EntitySet(metadata['id'])
set_last_time_indexes = False
for eid, entity in metadata['entity_dict'].items():
df, variable_types = cls._load_dummy_entity_data_and_variable_types(entity)
if data_root is not None:
df = load_entity_data(entity, data_root)
es.entity_from_dataframe(eid,
df,
index=entity['index'],
time_index=entity['time_index'],
secondary_time_index=entity['secondary_time_index'],
encoding=entity['encoding'],
variable_types=variable_types)
if entity['has_last_time_index']:
set_last_time_indexes = True
for vid, v in entity['variables'].items():
if v['interesting_values'] and len(v['interesting_values']):
es[eid][vid].interesting_values = v['interesting_values']
for rel in metadata['relationships']:
es.add_relationship(Relationship(
es[rel['parent_entity']][rel['parent_variable']],
es[rel['child_entity']][rel['child_variable']],
))
if set_last_time_indexes:
es.add_last_time_indexes()
return es

###########################################################################
# Public getter/setter methods #########################################
Expand Down Expand Up @@ -273,6 +340,13 @@ def add_relationship(self, relationship):
parent_e.convert_variable_type(variable_id=parent_v,
new_type=vtypes.Index,
convert_data=False)
# Empty dataframes (as a result of accessing Entity.metadata)
# default to object dtypes for discrete variables, but
# indexes/ids default to ints. In this case, we convert
# the empty column's type to int
if (child_e.df.empty and child_e.df[child_v].dtype == object and
is_numeric_dtype(parent_e.df[parent_v])):
child_e.df[child_v] = pd.Series(name=child_v, dtype=np.int64)

parent_dtype = parent_e.df[parent_v].dtype
child_dtype = child_e.df[child_v].dtype
Expand Down Expand Up @@ -1116,69 +1190,6 @@ def gen_relationship_var(self, child_eid, parent_eid):
# Private methods ######################################################
###########################################################################

def _gen_metadata(self):
new_entityset = object.__new__(EntitySet)
new_entityset_dict = {}
for k, v in self.__dict__.items():
if k not in ["entity_dict", "relationships"]:
new_entityset_dict[k] = v
new_entityset_dict["entity_dict"] = {}
for eid, e in self.entity_dict.items():
metadata_e = self._entity_metadata(e)
new_entityset_dict['entity_dict'][eid] = metadata_e
new_entityset_dict["relationships"] = []
for r in self.relationships:
metadata_r = self._relationship_metadata(r)
new_entityset_dict['relationships'].append(metadata_r)
new_entityset.__dict__ = copy.deepcopy(new_entityset_dict)
for e in new_entityset.entity_dict.values():
e.entityset = new_entityset
for v in e.variables:
v.entity = new_entityset[v.entity_id]
for r in new_entityset.relationships:
r.entityset = new_entityset
return new_entityset

@classmethod
def _entity_metadata(cls, e):
new_dict = {}
for k, v in e.__dict__.items():
if k not in ["data", "entityset", "variables"]:
new_dict[k] = v
new_dict["data"] = {
"df": e.df.head(0),
"last_time_index": None,
"indexed_by": {}
}
new_dict["variables"] = [cls._variable_metadata(v)
for v in e.variables]
new_dict = copy.deepcopy(new_dict)
new_entity = object.__new__(Entity)
new_entity.__dict__ = new_dict
return new_entity

@classmethod
def _relationship_metadata(cls, r):
new_dict = {}
for k, v in r.__dict__.items():
if k != "entityset":
new_dict[k] = v
new_dict = copy.deepcopy(new_dict)
new_r = object.__new__(Relationship)
new_r.__dict__ = new_dict
return new_r

@classmethod
def _variable_metadata(cls, var):
new_dict = {}
for k, v in var.__dict__.items():
if k != "entity":
new_dict[k] = v
new_dict = copy.deepcopy(new_dict)
new_v = object.__new__(type(var))
new_v.__dict__ = new_dict
return new_v

def _import_from_dataframe(self,
entity_id,
dataframe,
Expand Down Expand Up @@ -1362,6 +1373,30 @@ def _add_multigenerational_link_vars(self, frames, start_entity_id,
right=child_df,
on=r.child_variable.id)

@classmethod
def _load_dummy_entity_data_and_variable_types(cls, metadata):
variable_types = {}
defaults = []
columns = []
variable_names = {}
for elt in dir(vtypes):
try:
cls = getattr(vtypes, elt)
if issubclass(cls, vtypes.Variable):
variable_names[cls._dtype_repr] = cls
except TypeError:
pass
for vid, vmetadata in metadata['variables'].items():
if vmetadata['dtype_repr']:
vtype = variable_names.get(vmetadata['dtype_repr'], vtypes.Variable)
variable_types[vid] = vtype
defaults.append(vtypes.DEFAULT_DTYPE_VALUES[vtype._default_pandas_dtype])
else:
defaults.append(vtypes.DEFAULT_DTYPE_VALUES[object])
columns.append(vid)
df = pd.DataFrame({c: [d] for c, d in zip(columns, defaults)}).head(0)
return df, variable_types


def make_index_variable_name(entity_id):
return entity_id + "_id"

0 comments on commit e3deb21

Please sign in to comment.