Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bindings #719

Merged
merged 5 commits into from
Jun 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Unreleased
- Implemented the benford distribution feature (#689)
- Reworked the notebooks (#701, #704)
- Speed up the result pivoting (#705)
- Add a test for the dask bindings (#719)
- Bugfixes:
- Fixed readthedocs (#695, #696)
- Fix spark and dask after #705 and for non-id named id columns (#712)
Expand Down
33 changes: 33 additions & 0 deletions tests/integrations/test_bindings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from unittest import TestCase

from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
from tsfresh.feature_extraction.settings import MinimalFCParameters

from dask import dataframe as dd
import pandas as pd


class DaskBindingsTestCase(TestCase):
def test_feature_extraction(self):
df = pd.DataFrame({"my_id": [1, 1, 1, 2, 2, 2], "my_kind": ["a"]*6,
"my_value": [1, 2, 3, 4, 5, 6]})

df = dd.from_pandas(df, chunksize=3)

df_grouped = df.groupby(["my_id", "my_kind"])

features = dask_feature_extraction_on_chunk(df_grouped, column_id="my_id",
column_kind="my_kind",
column_value="my_value",
column_sort=None,
default_fc_parameters=MinimalFCParameters())

features = features.categorize(columns=["variable"])
features = features.reset_index(drop=True)

feature_table = features.pivot_table(index="my_id", columns="variable", values="value", aggfunc="sum")

feature_table = feature_table.compute()

self.assertEqual(len(feature_table.columns), len(MinimalFCParameters()))
self.assertEqual(len(feature_table), 2)
15 changes: 9 additions & 6 deletions tsfresh/convenience/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ def _feature_extraction_on_chunk_helper(df, column_id, column_kind,
elif default_fc_parameters is None and kind_to_fc_parameters is not None:
default_fc_parameters = {}

chunk = df[column_id].iloc[0], df[column_kind].iloc[0], df.sort_values(column_sort)[column_value]
if column_sort is not None:
df = df.sort_values(column_sort)

chunk = df[column_id].iloc[0], df[column_kind].iloc[0], df[column_value]
features = _do_extraction_on_chunk(chunk, default_fc_parameters=default_fc_parameters,
kind_to_fc_parameters=kind_to_fc_parameters)
features = pd.DataFrame(features, columns=[column_id, "variable", "value"])
Expand All @@ -31,7 +34,7 @@ def _feature_extraction_on_chunk_helper(df, column_id, column_kind,


def dask_feature_extraction_on_chunk(df, column_id, column_kind,
column_sort, column_value,
column_value, column_sort=None,
default_fc_parameters=None, kind_to_fc_parameters=None):
"""
Extract features on a grouped dask dataframe given the column names and the extraction settings.
Expand Down Expand Up @@ -99,7 +102,7 @@ def dask_feature_extraction_on_chunk(df, column_id, column_kind,
:type column_id: str

:param column_sort: The name of the sort column.
:type column_sort: str
:type column_sort: str or None

:param column_kind: The name of the column keeping record on the kind of the value.
:type column_kind: str
Expand All @@ -121,8 +124,8 @@ def dask_feature_extraction_on_chunk(df, column_id, column_kind,


def spark_feature_extraction_on_chunk(df, column_id, column_kind,
column_sort, column_value,
default_fc_parameters, kind_to_fc_parameters=None):
column_value, column_sort=None,
default_fc_parameters=None, kind_to_fc_parameters=None):
"""
Extract features on a grouped spark dataframe given the column names and the extraction settings.
This wrapper function should only be used if you have a spark dataframe as input.
Expand Down Expand Up @@ -184,7 +187,7 @@ def spark_feature_extraction_on_chunk(df, column_id, column_kind,
:type column_id: str

:param column_sort: The name of the sort column.
:type column_sort: str
:type column_sort: str or None

:param column_kind: The name of the column keeping record on the kind of the value.
:type column_kind: str
Expand Down