-
Notifications
You must be signed in to change notification settings - Fork 873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet serialization #152
Changes from 70 commits
77c0200
0b86de0
aa09c85
48e2794
f3b9c9c
958e999
3cb63bb
9883f8a
e95ab5b
c866ace
a17bb11
d683359
b8cf2a5
95b3900
9279055
6bb2245
e406bc7
2ea566a
12fded1
a42026d
f23b1a5
062dd7e
d0eb395
7b3e14f
2d9e7bf
5a80afc
e48211d
0015d84
e3646c6
3c0a8a6
6b3788a
af9f846
fcb9f41
df5a761
14657a8
aa531de
824d492
79c3675
ec5e203
6ca0e60
cad1ff9
72f18f0
0d1793d
430766f
cb6d1a3
a278b82
e3fba88
764bbc8
608ffcc
a259ee3
d2cc944
9cb9cd0
da8393e
1c9be57
8f1b298
cfba565
8d8d10f
0b07945
b76ce4a
fed43a7
2ece52a
94feaf4
d222d4e
bb4df2a
7e05b84
5d82132
42acfee
b404415
94a6268
cf7048d
6e74bbe
8b58943
783bb6a
63ab80b
b6037ba
c70b64c
7a28213
5123311
3a2cbc5
5333480
7cf4ab1
7dbecb7
ea36a3d
e7a1ce0
e28f28a
ec64175
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,7 @@ class Entity(object): | |
index = None | ||
indexed_by = None | ||
|
||
def __init__(self, id, df, entityset, variable_types=None, name=None, | ||
def __init__(self, id, df, entityset, variable_types=None, | ||
index=None, time_index=None, secondary_time_index=None, | ||
last_time_index=None, encoding=None, relationships=None, | ||
already_sorted=False, created_index=None, verbose=False): | ||
|
@@ -56,7 +56,6 @@ def __init__(self, id, df, entityset, variable_types=None, name=None, | |
entity_id to variable_types dict with which to initialize an | ||
entity's store. | ||
An entity's variable_types dict maps string variable ids to types (:class:`.Variable`). | ||
name (str): Name of entity. | ||
index (str): Name of id column in the dataframe. | ||
time_index (str): Name of time column in the dataframe. | ||
secondary_time_index (dict[str -> str]): Dictionary mapping columns | ||
|
@@ -80,7 +79,6 @@ def __init__(self, id, df, entityset, variable_types=None, name=None, | |
self.created_index = created_index | ||
self.convert_all_variable_data(variable_types) | ||
self.id = id | ||
self.name = name | ||
self.entityset = entityset | ||
self.indexed_by = {} | ||
variable_types = variable_types or {} | ||
|
@@ -92,6 +90,7 @@ def __init__(self, id, df, entityset, variable_types=None, name=None, | |
if ti not in cols: | ||
cols.append(ti) | ||
|
||
relationships = relationships or [] | ||
link_vars = [v.id for rel in relationships for v in [rel.parent_variable, rel.child_variable] | ||
if v.entity.id == self.id] | ||
|
||
|
@@ -120,6 +119,11 @@ def __init__(self, id, df, entityset, variable_types=None, name=None, | |
if self.index is not None and self.index not in inferred_variable_types: | ||
self.add_variable(self.index, vtypes.Index) | ||
|
||
# make sure index is at the beginning | ||
index_variable = [v for v in self.variables | ||
if v.id == self.index][0] | ||
self.variables = [index_variable] + [v for v in self.variables | ||
if v.id != self.index] | ||
self.update_data(df=self.df, | ||
already_sorted=already_sorted, | ||
recalculate_last_time_indexes=False, | ||
|
@@ -563,10 +567,23 @@ def infer_variable_types(self, ignore=None, link_vars=None): | |
|
||
def update_data(self, df=None, data=None, already_sorted=False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like the data argument is never actually used |
||
reindex=True, recalculate_last_time_indexes=True): | ||
to_check = None | ||
if df is not None: | ||
to_check = df | ||
elif data is not None: | ||
to_check = data['df'] | ||
|
||
if to_check is not None and len(to_check.columns) != len(self.variables): | ||
raise ValueError("Updated dataframe contains {} columns, expecting {}".format(len(to_check.columns), | ||
len(self.variables))) | ||
for v in self.variables: | ||
if v.id not in to_check.columns: | ||
raise ValueError("Updated dataframe is missing new {} column".format(v.id)) | ||
if data is not None: | ||
self.data = data | ||
elif df is not None: | ||
self.df = df | ||
self.df = self.df[[v.id for v in self.variables]] | ||
self.set_index(self.index) | ||
self.set_time_index(self.time_index, already_sorted=already_sorted) | ||
self.set_secondary_time_index(self.secondary_time_index) | ||
|
@@ -684,7 +701,7 @@ def set_time_index(self, variable_id, already_sorted=False): | |
# sort by time variable, then by index | ||
self.df.sort_values([variable_id, self.index], inplace=True) | ||
|
||
t = vtypes.TimeIndex | ||
t = vtypes.NumericTimeIndex | ||
if col_is_datetime(self.df[variable_id]): | ||
t = vtypes.DatetimeTimeIndex | ||
self.convert_variable_type(variable_id, t, convert_data=False) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,10 @@ | |
|
||
from .entity import Entity | ||
from .relationship import Relationship | ||
from .serialization import read_pickle, to_pickle | ||
from .serialization import (load_entity_data, | ||
read_parquet, | ||
read_pickle, | ||
write_entityset) | ||
|
||
import featuretools.variable_types.variable as vtypes | ||
from featuretools.utils.gen_utils import make_tqdm_iterator | ||
|
@@ -165,22 +168,22 @@ def entities(self): | |
|
||
@property | ||
def metadata(self): | ||
'''Defined as a property because an EntitySet's metadata | ||
is used in many places, for instance, for each feature in a feature list. | ||
'''An EntitySet's metadata is used in many places, for instance, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the first line of the doc string should be a one liner defining |
||
for each feature in a feature list. | ||
To prevent using copying the full metadata object to each feature, | ||
we generate a new metadata object and check if it's the same as the existing one, | ||
and if it is return the existing one. Thus, all features in the feature list | ||
would reference the same object, rather than copies. This saves a lot of memory | ||
''' | ||
new_metadata = self.from_metadata(self.create_metadata_dict(), | ||
load_data=False) | ||
if self._metadata is None: | ||
self._metadata = self._gen_metadata() | ||
self._metadata = new_metadata | ||
else: | ||
new_metadata = self._gen_metadata() | ||
# Don't want to keep making new copies of metadata | ||
# Only make a new one if something was changed | ||
if not self._metadata.__eq__(new_metadata): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not |
||
self._metadata = new_metadata | ||
|
||
return self._metadata | ||
|
||
@property | ||
|
@@ -192,13 +195,74 @@ def is_metadata(self): | |
return all(e.df.empty for e in self.entity_dict.values()) | ||
|
||
def to_pickle(self, path): | ||
to_pickle(self, path) | ||
write_entityset(self, path, to_parquet=False) | ||
return self | ||
|
||
def to_parquet(self, path): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets add an engine parameter like pandas: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_parquet.html |
||
write_entityset(self, path, to_parquet=True) | ||
return self | ||
|
||
@classmethod | ||
def read_pickle(cls, path): | ||
return read_pickle(path) | ||
|
||
@classmethod | ||
def read_parquet(cls, path): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should be like pandas and make the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's update api reference too |
||
return read_parquet(path) | ||
|
||
def create_metadata_dict(self): | ||
return { | ||
'id': self.id, | ||
'relationships': [{ | ||
'parent_entity': r.parent_entity.id, | ||
'parent_variable': r.parent_variable.id, | ||
'child_entity': r.child_entity.id, | ||
'child_variable': r.child_variable.id, | ||
} for r in self.relationships], | ||
'entity_dict': {eid: { | ||
'index': e.index, | ||
'time_index': e.time_index, | ||
'secondary_time_index': e.secondary_time_index, | ||
'encoding': e.encoding, | ||
'variables': { | ||
v.id: v.create_metadata_dict() | ||
for v in e.variables | ||
}, | ||
'has_last_time_index': e.last_time_index is not None | ||
} for eid, e in self.entity_dict.items()}, | ||
} | ||
|
||
@classmethod | ||
def from_metadata(cls, metadata, root=None, load_data=False): | ||
es = EntitySet(metadata['id']) | ||
set_last_time_indexes = False | ||
add_interesting_values = False | ||
for eid, entity in metadata['entity_dict'].items(): | ||
df, variable_types = load_entity_data(entity, root=root, | ||
dummy=not load_data) | ||
if any(v['interesting_values'] is not None and len(v['interesting_values']) | ||
for v in entity['variables'].values()): | ||
add_interesting_values = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add exact interesting values that were serialized |
||
es.entity_from_dataframe(eid, | ||
df, | ||
index=entity['index'], | ||
time_index=entity['time_index'], | ||
secondary_time_index=entity['secondary_time_index'], | ||
encoding=entity['encoding'], | ||
variable_types=variable_types) | ||
if entity['has_last_time_index']: | ||
set_last_time_indexes = True | ||
for rel in metadata['relationships']: | ||
es.add_relationship(Relationship( | ||
es[rel['parent_entity']][rel['parent_variable']], | ||
es[rel['child_entity']][rel['child_variable']], | ||
)) | ||
if set_last_time_indexes: | ||
es.add_last_time_indexes() | ||
if add_interesting_values: | ||
es.add_interesting_values() | ||
return es | ||
|
||
########################################################################### | ||
# Public getter/setter methods ######################################### | ||
########################################################################### | ||
|
@@ -1102,69 +1166,6 @@ def gen_relationship_var(self, child_eid, parent_eid): | |
# Private methods ###################################################### | ||
########################################################################### | ||
|
||
def _gen_metadata(self): | ||
new_entityset = object.__new__(EntitySet) | ||
new_entityset_dict = {} | ||
for k, v in self.__dict__.items(): | ||
if k not in ["entity_dict", "relationships"]: | ||
new_entityset_dict[k] = v | ||
new_entityset_dict["entity_dict"] = {} | ||
for eid, e in self.entity_dict.items(): | ||
metadata_e = self._entity_metadata(e) | ||
new_entityset_dict['entity_dict'][eid] = metadata_e | ||
new_entityset_dict["relationships"] = [] | ||
for r in self.relationships: | ||
metadata_r = self._relationship_metadata(r) | ||
new_entityset_dict['relationships'].append(metadata_r) | ||
new_entityset.__dict__ = copy.deepcopy(new_entityset_dict) | ||
for e in new_entityset.entity_dict.values(): | ||
e.entityset = new_entityset | ||
for v in e.variables: | ||
v.entity = new_entityset[v.entity_id] | ||
for r in new_entityset.relationships: | ||
r.entityset = new_entityset | ||
return new_entityset | ||
|
||
@classmethod | ||
def _entity_metadata(cls, e): | ||
new_dict = {} | ||
for k, v in e.__dict__.items(): | ||
if k not in ["data", "entityset", "variables"]: | ||
new_dict[k] = v | ||
new_dict["data"] = { | ||
"df": e.df.head(0), | ||
"last_time_index": None, | ||
"indexed_by": {} | ||
} | ||
new_dict["variables"] = [cls._variable_metadata(v) | ||
for v in e.variables] | ||
new_dict = copy.deepcopy(new_dict) | ||
new_entity = object.__new__(Entity) | ||
new_entity.__dict__ = new_dict | ||
return new_entity | ||
|
||
@classmethod | ||
def _relationship_metadata(cls, r): | ||
new_dict = {} | ||
for k, v in r.__dict__.items(): | ||
if k != "entityset": | ||
new_dict[k] = v | ||
new_dict = copy.deepcopy(new_dict) | ||
new_r = object.__new__(Relationship) | ||
new_r.__dict__ = new_dict | ||
return new_r | ||
|
||
@classmethod | ||
def _variable_metadata(cls, var): | ||
new_dict = {} | ||
for k, v in var.__dict__.items(): | ||
if k != "entity": | ||
new_dict[k] = v | ||
new_dict = copy.deepcopy(new_dict) | ||
new_v = object.__new__(type(var)) | ||
new_v.__dict__ = new_dict | ||
return new_v | ||
|
||
def _import_from_dataframe(self, | ||
entity_id, | ||
dataframe, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we already merged this into master?