Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused Dask code and update docs #1012

Merged
merged 1 commit into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ Changelog
---------
.. **Future Release**
* Enhancements
* Support use of Dask DataFrames in entitysets (:pr:`783`)
* Fixes
* Changes
* Documentation Changes
* Testing Changes
Thanks to the following people for contributing to this release:
:user:`frances-h`, :user:`rwedge`, :user:`thehomebrewnerd`

**v0.15.0 May 29, 2020**
* Enhancements
Expand All @@ -18,7 +20,6 @@ Changelog
* Add ``include_cutoff_time`` arg - control whether data at cutoff times are included in feature calculations (:pr:`959`)
* Allow ``variables_types`` to be referenced by their ``type_string``
for the ``entity_from_dataframe`` function (:pr:`988`)
* Support use of Dask DataFrames in entitysets (:pr:`783`)
* Fixes
* Fix errors with Equals and NotEquals primitives when comparing categoricals or different dtypes (:pr:`968`)
* Normalized type_strings of ``Variable`` classes so that the ``find_variable_types`` function produces a
Expand Down
2 changes: 1 addition & 1 deletion docs/source/guides/dfs_with_dask_entitysets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ To obtain a list of the primitives that can be used with a Dask ``EntitySet``, y

Primitive Limitations
*********************
At this time, custom primitives created with ``featuretools.primitives.make_trans_primitive()`` or ``featuretools.primitives.make_agg_primitive()`` cannot be used for running deep feature synthesis on a Dask ``EntitySet``. While it is possible to create custom primitives for use with a Dask ``EntitySet`` by extending the proper primitive class, there are several potential problems in doing so, and those issues are beyond the scope of this guide.
At this time, custom primitives created with ``featuretools.primitives.make_trans_primitive()`` or ``featuretools.primitives.make_agg_primitive()`` cannot be used for running deep feature synthesis on a Dask ``EntitySet``. Additionally, multivariable and time-dependent aggregation primitives are not currently supported. While it is possible to create custom primitives for use with a Dask ``EntitySet`` by extending the proper primitive class, there are several potential problems in doing so, and those issues are beyond the scope of this guide.

Entity Limitations
******************
Expand Down
80 changes: 11 additions & 69 deletions featuretools/computational_backends/feature_set_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,63 +669,17 @@ def last_n(df):
# Apply the non-aggregable functions generate a new dataframe, and merge
# it with the existing one
if len(to_apply):
if isinstance(base_frame, dd.DataFrame):
aggregations = {}
apply_rename = {}
multi_output = {}
aggregable_frame = base_frame[base_frame.columns]
for f in to_apply:
variable_ids = [bf.get_name() for bf in f.base_features]

# combine input columns into a single column for primitives that take more than one input
if len(variable_ids) > 1:
aggregable_frame[",".join(variable_ids)] = pd.Series(
(zip(*[aggregable_frame[v] for v in variable_ids]))
)

func = f.get_dask_aggregation()
funcname = func.__name__
variable_id = ",".join(variable_ids)
if variable_id not in aggregations:
aggregations[variable_id] = []
aggregations[variable_id].append(func)

apply_rename[u"{}-{}".format(variable_id, funcname)] = f.get_name()

# primitives whose results that should be split into multiple columns
if f.number_output_features > 1:
multi_output[f.get_name()] = f

to_merge = aggregable_frame.groupby(aggregable_frame[groupby_var]).agg(aggregations)

# rename aggregation columns:
to_merge.columns = [apply_rename["-".join(x)] for x in to_merge.columns.ravel()]
to_merge = to_merge[list(apply_rename.values())]

# separate outputs of multi-outputs into separate columns:
to_merge = to_merge.apply(dask_split_output,
axis=1,
columns=to_merge.columns,
multi_output_prims=multi_output)

child_merge_var = to_merge.index.name
# Make sure the merge columns have the same data type
to_merge = to_merge.reset_index()
to_merge[child_merge_var] = to_merge[child_merge_var].astype(frame[parent_merge_var].dtype)
frame = dd.merge(left=frame, right=to_merge.reset_index(),
left_on=parent_merge_var, right_on=child_merge_var, how='left')
else:
wrap = agg_wrapper(to_apply, self.time_last)
# groupby_var can be both the name of the index and a column,
# to silence pandas warning about ambiguity we explicitly pass
# the column (in actuality grouping by both index and group would
# work)
to_merge = base_frame.groupby(base_frame[groupby_var],
observed=True,
sort=False).apply(wrap)
frame = pd.merge(left=frame, right=to_merge,
left_index=True,
right_index=True, how='left')
wrap = agg_wrapper(to_apply, self.time_last)
# groupby_var can be both the name of the index and a column,
# to silence pandas warning about ambiguity we explicitly pass
# the column (in actuality grouping by both index and group would
# work)
to_merge = base_frame.groupby(base_frame[groupby_var],
observed=True,
sort=False).apply(wrap)
frame = pd.merge(left=frame, right=to_merge,
left_index=True,
right_index=True, how='left')

progress_callback(len(to_apply) / float(self.num_features))

Expand Down Expand Up @@ -831,18 +785,6 @@ def wrap(df):
return wrap


def dask_split_output(row, columns, multi_output_prims):
d = {}
for col, val in zip(columns, row):
feature_values = []
if col in multi_output_prims:
feature_values.append((multi_output_prims[col], val))
else:
feature_values.append((col, val))
d = update_feature_columns(feature_values, d)
return pd.Series(d)


def set_default_column(frame, f):
for name in f.get_feature_names():
frame[name] = f.default_value
Expand Down
4 changes: 0 additions & 4 deletions featuretools/tests/primitive_tests/test_transform_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ def test_latlong(pd_es):
longitude = ft.Feature(log_latlong_feat, primitive=Longitude)
features = [latitude, longitude]
df = ft.calculate_feature_matrix(entityset=pd_es, features=features, instance_ids=range(15))
if isinstance(df, dd.DataFrame):
df = df.compute()
latvalues = df[latitude.get_name()].values
lonvalues = df[longitude.get_name()].values
assert len(latvalues) == 15
Expand Down Expand Up @@ -750,8 +748,6 @@ def test_percentile(pd_es):
feature_set = FeatureSet([p])
calculator = FeatureSetCalculator(pd_es, feature_set)
df = calculator.run(np.array(range(10, 17)))
if isinstance(df, dd.DataFrame):
df = df.compute()
true = pd_es['log'].df[v.get_name()].rank(pct=True)
true = true.loc[range(10, 17)]
for t, a in zip(true.values, df[p.get_name()].values):
Expand Down