Permalink
Browse files

add generate_data_in_chunks method (#435)

* add generate_data_chunk_format method
* new unit test for generate chunk data
  • Loading branch information...
MaxBenChrist committed Sep 21, 2018
1 parent 52e50bd commit aa89e364d9b618beec1597b477b59e8a480d9d08
View
@@ -100,7 +100,7 @@ python, just head over to our
instructions.
.. |Documentation Status| image:: https://readthedocs.org/projects/tsfresh/badge/?version=latest
:target: http://tsfresh.readthedocs.io/en/latest/?badge=latest
:target: https://tsfresh.readthedocs.io/en/latest/?badge=latest
.. |Build Status| image:: https://travis-ci.org/blue-yonder/tsfresh.svg?branch=master
:target: https://travis-ci.org/blue-yonder/tsfresh
.. |Coverage Status| image:: https://coveralls.io/repos/github/blue-yonder/tsfresh/badge.svg?branch=master
@@ -12,7 +12,7 @@
from mock import Mock
from tests.fixtures import DataTestCase
from tsfresh.feature_extraction.extraction import extract_features
from tsfresh.feature_extraction.extraction import extract_features, generate_data_chunk_format
from tsfresh.feature_extraction.settings import ComprehensiveFCParameters
import tempfile
@@ -33,7 +33,6 @@ def test_extract_features(self):
extracted_features = extract_features(df, column_id="id", column_sort="sort", column_kind="kind",
column_value="val",
n_jobs=self.n_jobs)
self.assertIsInstance(extracted_features, pd.DataFrame)
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77])))
self.assertTrue(np.all(extracted_features.a__sum_values == np.array([691, 1017])))
@@ -137,6 +136,7 @@ def test_extract_features_with_and_without_parallelization(self):
features_parallel = extract_features(df, column_id="id", column_sort="sort", column_kind="kind",
column_value="val",
n_jobs=self.n_jobs)
features_serial = extract_features(df, column_id="id", column_sort="sort", column_kind="kind",
column_value="val", n_jobs=0)
@@ -215,4 +215,37 @@ def test_distributor_map_reduce_and_close_are_called(self):
distributor=mock)
self.assertTrue(mock.close.called)
self.assertTrue(mock.map_reduce.called)
self.assertTrue(mock.map_reduce.called)
class GenerateDataChunkTestCase(DataTestCase):
def assert_data_chunk_object_equal(self, result, expected):
dic_result = {str(x[0]) + "_" + str(x[1]): x[2] for x in result}
dic_expected = {str(x[0]) + "_" + str(x[1]): x[2] for x in expected}
for k in dic_result.keys():
pd.testing.assert_series_equal(dic_result[k], dic_expected[k])
def test_simple_data_sample_two_timeseries(self):
df = pd.DataFrame({"id": [10] * 4 , "kind": ["a"] * 2 + ["b"] * 2, "val": [36, 71, 78, 37]})
df.set_index("id", drop=False, inplace=True)
df.index.name = None
result = generate_data_chunk_format(df, "id", "kind", "val")
expected = [(10, 'a', pd.Series([36, 71], index=[10]*2, name="val")),
(10, 'b', pd.Series([78, 37], index=[10]*2, name="val"))]
self.assert_data_chunk_object_equal(result, expected)
def test_simple_data_sample_four_timeseres(self):
df = self.create_test_data_sample()
# todo: investigate the names that are given
df.index.name = None
df.sort_values(by=["id", "kind", "sort"], inplace=True)
result = generate_data_chunk_format(df, "id", "kind", "val")
expected = [(10, 'a', pd.Series([36, 71, 27, 62, 56, 58, 67, 11, 2, 24, 45, 30, 0, 9, 41, 28, 33, 19, 29, 43], index=[10]*20, name="val")),
(10, 'b', pd.Series([78, 37, 23, 44, 6, 3, 21, 61, 39, 31, 53, 16, 66, 50, 40, 47, 7, 42, 38, 55], index=[10] *20, name="val")),
(500, 'a', pd.Series([76, 72, 74, 75, 32, 64, 46, 35, 15, 70, 57, 65, 51, 26, 5, 25, 10, 69, 73, 77], index=[500]*20, name="val")),
(500, 'b', pd.Series([8, 60, 12, 68, 22, 17, 18, 63, 49, 34, 20, 52, 48, 14, 79, 4, 1, 59, 54, 13], index=[500] *20, name="val"))]
self.assert_data_chunk_object_equal(result, expected)
@@ -58,7 +58,7 @@ def test_with_dictionaries_two_rows_sorted(self):
self.assertEqual(result_df[result_df[column_kind] == "a"].iloc[0].to_dict(), {"_variables": "a", "value": 2, "id": "id_1"})
def test_with_df(self):
def test_with_df_1(self):
# give everyting
test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "sort": 1}])
result_df, column_id, column_kind, column_value = \
@@ -72,6 +72,7 @@ def test_with_df(self):
self.assertEqual(list(result_df[result_df[column_kind] == "a"]["value"]), [3])
self.assertEqual(list(result_df[result_df[column_kind] == "a"]["id"]), [0])
def test_with_df_2(self):
# give no kind
test_df = pd.DataFrame([{"id": 0, "value": 3, "sort": 1}])
result_df, column_id, column_kind, column_value = \
@@ -85,6 +86,7 @@ def test_with_df(self):
self.assertEqual(list(result_df[result_df[column_kind] == "value"]["value"]), [3])
self.assertEqual(list(result_df[result_df[column_kind] == "value"]["id"]), [0])
def test_with_df_3(self):
# Let the function find the values
test_df = pd.DataFrame([{"id": 0, "a": 3, "b": 5, "sort": 1}])
result_df, column_id, column_kind, column_value = \
@@ -170,6 +170,52 @@ def extract_features(timeseries_container, default_fc_parameters=None,
return result
def generate_data_chunk_format(df, column_id, column_kind, column_value):
"""Converts the dataframe df in into a list of individual time seriess.
E.g. the DataFrame
==== ====== =========
id kind val
==== ====== =========
1 a -0.21761
1 a -0.613667
1 a -2.07339
2 b -0.576254
2 b -1.21924
==== ====== =========
into
[(1, 'a', pd.Series([-0.217610, -0.613667, -2.073386]),
(2, 'b', pd.Series([-0.576254, -1.219238])]
The data is separated out into those single time series and the _do_extraction_on_chunk is
called on each of them. The results are then combined into a single pandas DataFrame.
The call is either happening in parallel or not and is showing a progress bar or not depending
on the given flags.
:param df: The dataframe in the normalized format which is used for extraction.
:type df: pd.DataFrame
:param column_id: The name of the id column to group by.
:type column_id: str
:param column_kind: The name of the column keeping record on the kind of the value.
:type column_kind: str
:param column_value: The name for the column keeping the value itself.
:type column_value: str
:return: the data in chunks
:rtype: list
"""
data_in_chunks = [x + (y,) for x, y in df.groupby([column_id, column_kind])[column_value]]
return data_in_chunks
def _do_extraction(df, column_id, column_value, column_kind,
default_fc_parameters, kind_to_fc_parameters,
n_jobs, chunk_size, disable_progressbar, distributor):
@@ -221,10 +267,10 @@ def _do_extraction(df, column_id, column_value, column_kind,
:return: the extracted features
:rtype: pd.DataFrame
"""
data_in_chunks = [x + (y,) for x, y in df.groupby([column_id, column_kind])[column_value]]
if distributor is None:
data_in_chunks = generate_data_chunk_format(df, column_id, column_kind, column_value)
if distributor is None:
if n_jobs == 0:
distributor = MapDistributor(disable_progressbar=disable_progressbar,
progressbar_title="Feature Extraction")
@@ -5,6 +5,7 @@
Utility functions for handling the DataFrame conversions to the internal normalized format
(see ``normalize_input_to_internal_representation``) or on how to handle ``NaN`` and ``inf`` in the DataFrames.
"""
import gc
import warnings
import numpy as np
@@ -281,6 +282,7 @@ def _normalize_input_to_internal_representation(timeseries_container, column_id,
df[column_kind] = kind
timeseries_container = pd.concat(timeseries_container.values())
gc.collect()
# Check ID column
if column_id is None:
@@ -296,7 +298,6 @@ def _normalize_input_to_internal_representation(timeseries_container, column_id,
if column_sort is not None:
if timeseries_container[column_sort].isnull().any():
raise ValueError("You have NaN values in your sort column.")
timeseries_container = timeseries_container.sort_values(column_sort).drop(column_sort, axis=1)
# Check that either kind and value is None or both not None.
if column_kind is None and column_value is not None:
@@ -307,10 +308,22 @@ def _normalize_input_to_internal_representation(timeseries_container, column_id,
raise ValueError("If passing the kind, you also have to pass the value.")
if column_kind is None and column_value is None:
column_kind = "_variables"
column_value = "_values"
timeseries_container = pd.melt(timeseries_container, id_vars=[column_id],
value_name=column_value, var_name=column_kind)
if column_sort is not None:
column_kind = "_variables"
column_value = "_values"
sort = timeseries_container[column_sort].values
timeseries_container = pd.melt(timeseries_container.drop(column_sort, axis=1),
id_vars=[column_id],
value_name=column_value, var_name=column_kind)
timeseries_container[column_sort] = np.repeat(sort, (len(timeseries_container) // len(sort)))
else:
column_kind = "_variables"
column_value = "_values"
column_sort = "_sort"
sort = range(len(timeseries_container))
timeseries_container = pd.melt(timeseries_container, id_vars=[column_id],
value_name=column_value, var_name=column_kind)
timeseries_container[column_sort] = np.repeat(sort, (len(timeseries_container) // len(sort)))
# Check kind column
if column_kind not in timeseries_container.columns:
@@ -326,6 +339,12 @@ def _normalize_input_to_internal_representation(timeseries_container, column_id,
if timeseries_container[column_value].isnull().any():
raise ValueError("You have NaN values in your value column.")
if column_sort:
timeseries_container = timeseries_container.sort_values([column_id, column_kind, column_sort])
timeseries_container = timeseries_container.drop(column_sort, axis=1)
else:
timeseries_container = timeseries_container.sort_values([column_id, column_kind])
return timeseries_container, column_id, column_kind, column_value

0 comments on commit aa89e36

Please sign in to comment.