Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding progress_callback parameter to dfs() and calculate_feature_matrix(), removing time remaining #739

Merged
merged 14 commits into from Sep 16, 2019
Merged
2 changes: 2 additions & 0 deletions docs/source/changelog.rst
Expand Up @@ -7,6 +7,7 @@ Changelog
* Improve how files are copied and written (:pr:`721`)
* Add number of rows to graph in entityset.plot (:pr:`727`)
* Enable feature-specific top_n value using a dictionary in encode_features (:pr:`735`)
* Added progress_callback parameter to dfs() and calculate_feature_matrix() (:pr:`739`)
* Fixes
* Fixed entity set deserialization (:pr:`720`)
* Added error message when DateTimeIndex is a variable but not set as the time_index (:pr:`723`)
Expand All @@ -15,6 +16,7 @@ Changelog
* Don't delete the whole destination folder while saving entityset (:pr:`717`)
* Changes
* Raise warning and not error on schema version mismatch (:pr:`718`)
* Removed time remaining from displayed progress bar in dfs() and calculate_feature_matrix() (:pr:`739`)
* Documentation Changes
* Updated URL for Compose (:pr:`716`)
* Testing Changes
Expand Down
77 changes: 52 additions & 25 deletions featuretools/computational_backends/calculate_feature_matrix.py
Expand Up @@ -38,15 +38,15 @@
logger = logging.getLogger('featuretools.computational_backend')

PBAR_FORMAT = "Elapsed: {elapsed} | Progress: {l_bar}{bar}"
PBAR_FORMAT_REMAINING = PBAR_FORMAT + "| Remaining: {remaining}"


def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instance_ids=None,
entities=None, relationships=None,
cutoff_time_in_index=False,
training_window=None, approximate=None,
save_progress=None, verbose=False,
chunk_size=None, n_jobs=1, dask_kwargs=None):
chunk_size=None, n_jobs=1,
dask_kwargs=None, progress_callback=None):
"""Calculates a matrix for a given set of instance ids and calculation times.

Args:
Expand Down Expand Up @@ -115,6 +115,14 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc
Valid keyword arguments for LocalCluster will also be accepted.

save_progress (str, optional): path to save intermediate computational results.

progress_callback (callable): function to be called with incremental progress updates.
Has the following parameters:

update: percentage change (float between 0 and 100) in progress since last call
progress_percent: percentage (float between 0 and 100) of total computation completed
time_elapsed: total time in seconds that has elapsed since start of call

"""
assert (isinstance(features, list) and features != [] and
all([isinstance(feature, FeatureBase) for feature in features])), \
Expand Down Expand Up @@ -230,14 +238,17 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc
cutoff_time_to_pass = cutoff_time

chunk_size = _handle_chunk_size(chunk_size, cutoff_time.shape[0])
tqdm_options = {'total': (cutoff_time.shape[0] / .95), # make total 5% higher to allot time for wrapping up at end
'bar_format': PBAR_FORMAT,
'disable': True}

# make total 5% higher to allot time for wrapping up at end
progress_bar = make_tqdm_iterator(
total=cutoff_time.shape[0] / .95,
smoothing=.05, # arbitrary selection close to 0, which would be no smoothing
bar_format=PBAR_FORMAT,
disable=(not verbose)
)
if verbose:
tqdm_options.update({'disable': False})
elif progress_callback is not None:
# allows us to utilize progress_bar updates without printing to anywhere
tqdm_options.update({'file': open(os.devnull, 'w'), 'disable': False})

progress_bar = make_tqdm_iterator(**tqdm_options)

if n_jobs != 1 or dask_kwargs is not None:
feature_matrix = parallel_calculate_chunks(cutoff_time=cutoff_time_to_pass,
Expand All @@ -253,7 +264,8 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc
target_time=target_time,
pass_columns=pass_columns,
progress_bar=progress_bar,
dask_kwargs=dask_kwargs or {})
dask_kwargs=dask_kwargs or {},
progress_callback=progress_callback)
else:
feature_matrix = calculate_chunk(cutoff_time=cutoff_time_to_pass,
chunk_size=chunk_size,
Expand All @@ -266,7 +278,8 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc
cutoff_df_time_var=cutoff_df_time_var,
target_time=target_time,
pass_columns=pass_columns,
progress_bar=progress_bar)
progress_bar=progress_bar,
progress_callback=progress_callback)

feature_matrix.sort_index(level='time', kind='mergesort', inplace=True)
if not cutoff_time_in_index:
Expand All @@ -276,7 +289,13 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc
shutil.rmtree(os.path.join(save_progress, 'temp'))

# force to 100% since we saved last 5 percent
previous_progress = progress_bar.n
progress_bar.update(progress_bar.total - progress_bar.n)

if progress_callback is not None:
update, progress_percent, time_elapsed = update_progress_callback_parameters(progress_bar, previous_progress)
progress_callback(update, progress_percent, time_elapsed)

progress_bar.refresh()
progress_bar.close()

Expand All @@ -285,7 +304,7 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc

def calculate_chunk(cutoff_time, chunk_size, feature_set, entityset, approximate, training_window,
save_progress, no_unapproximated_aggs, cutoff_df_time_var, target_time,
pass_columns, progress_bar=None):
pass_columns, progress_bar=None, progress_callback=None):
if not isinstance(feature_set, FeatureSet):
feature_set = cloudpickle.loads(feature_set)

Expand All @@ -312,19 +331,21 @@ def calculate_chunk(cutoff_time, chunk_size, feature_set, entityset, approximate
@save_csv_decorator(save_progress)
def calc_results(time_last, ids, precalculated_features=None, training_window=None):

progress_callback = None
update_progress_callback = None

if progress_bar is not None:
def progress_callback(done):
def update_progress_callback(done):
previous_progress = progress_bar.n
progress_bar.update(done * group.shape[0])

if progress_callback is not None:
update, progress_percent, time_elapsed = update_progress_callback_parameters(progress_bar, previous_progress)
progress_callback(update, progress_percent, time_elapsed)
calculator = FeatureSetCalculator(entityset,
feature_set,
time_last,
training_window=training_window,
precalculated_features=precalculated_features)

matrix = calculator.run(ids, progress_callback=progress_callback)
matrix = calculator.run(ids, progress_callback=update_progress_callback)
return matrix

# if all aggregations have been approximated, can calculate all together
Expand All @@ -341,9 +362,6 @@ def progress_callback(done):
inner_grouped = _chunk_dataframe_groups(inner_grouped, chunk_size)

for time_last, group in inner_grouped:
if len(feature_matrix) == 1:
progress_bar.bar_format = PBAR_FORMAT_REMAINING
progress_bar.refresh()

# sort group by instance id
ids = group['instance_id'].sort_values().values
Expand Down Expand Up @@ -487,13 +505,10 @@ def scatter_warning(num_scattered_workers, num_workers):
def parallel_calculate_chunks(cutoff_time, chunk_size, feature_set, approximate, training_window,
save_progress, entityset, n_jobs, no_unapproximated_aggs,
cutoff_df_time_var, target_time, pass_columns,
progress_bar, dask_kwargs=None):
progress_bar, dask_kwargs=None, progress_callback=None):
from distributed import as_completed, Future
from dask.base import tokenize

progress_bar.bar_format = PBAR_FORMAT_REMAINING
progress_bar.refresh()

client = None
cluster = None
try:
Expand Down Expand Up @@ -557,15 +572,20 @@ def parallel_calculate_chunks(cutoff_time, chunk_size, feature_set, approximate,
cutoff_df_time_var=cutoff_df_time_var,
target_time=target_time,
pass_columns=pass_columns,
progress_bar=None)
progress_bar=None,
progress_callback=progress_callback)

feature_matrix = []
iterator = as_completed(_chunks).batches()
for batch in iterator:
results = client.gather(batch)
for result in results:
feature_matrix.append(result)
previous_progress = progress_bar.n
progress_bar.update(result.shape[0])
if progress_callback is not None:
update, progress_percent, time_elapsed = update_progress_callback_parameters(progress_bar, previous_progress)
progress_callback(update, progress_percent, time_elapsed)

except Exception:
raise
Expand Down Expand Up @@ -630,3 +650,10 @@ def _handle_chunk_size(chunk_size, total_size):
chunk_size = int(chunk_size)

return chunk_size


def update_progress_callback_parameters(progress_bar, previous_progress):
update = (progress_bar.n - previous_progress) / progress_bar.total * 100
progress_percent = (progress_bar.n / progress_bar.total) * 100
time_elapsed = progress_bar.format_dict["elapsed"]
return (update, progress_percent, time_elapsed)
Expand Up @@ -95,7 +95,6 @@ def run(self, instance_ids, progress_callback=None):
# do nothing for the progress call back if not provided
def progress_callback(*args):
pass

feature_trie = self.feature_set.feature_trie

df_trie = Trie(path_constructor=RelationshipPath)
Expand Down
16 changes: 13 additions & 3 deletions featuretools/synthesis/dfs.py
Expand Up @@ -34,7 +34,8 @@ def dfs(entities=None,
n_jobs=1,
dask_kwargs=None,
verbose=False,
return_variable_types=None):
return_variable_types=None,
progress_callback=None):
'''Calculates a feature matrix and features given a dictionary of entities
and a list of relationships.

Expand Down Expand Up @@ -159,6 +160,13 @@ def dfs(entities=None,
Numeric, Discrete, and Boolean. If given as
the string 'all', use all available variable types.

progress_callback (callable): function to be called with incremental progress updates.
angela97lin marked this conversation as resolved.
Show resolved Hide resolved
Has the following parameters:

update: percentage change (float between 0 and 100) in progress since last call
progress_percent: percentage (float between 0 and 100) of total computation completed
time_elapsed: total time in seconds that has elapsed since start of call

Examples:
.. code-block:: python

Expand Down Expand Up @@ -214,7 +222,8 @@ def dfs(entities=None,
chunk_size=chunk_size,
n_jobs=n_jobs,
dask_kwargs=dask_kwargs,
verbose=verbose)
verbose=verbose,
progress_callback=progress_callback)
else:
feature_matrix = calculate_feature_matrix(features,
entityset=entityset,
Expand All @@ -227,5 +236,6 @@ def dfs(entities=None,
chunk_size=chunk_size,
n_jobs=n_jobs,
dask_kwargs=dask_kwargs,
verbose=verbose)
verbose=verbose,
progress_callback=progress_callback)
return feature_matrix, features
Expand Up @@ -1213,3 +1213,31 @@ def test_chunk_dataframe_groups():
assert third[0] == 2 and third[1].shape[0] == 2
fourth = next(chunked_grouped)
assert fourth[0] == 3 and fourth[1].shape[0] == 1


def test_calls_progress_callback(es):
class MockProgressCallback:
def __init__(self):
self.total_update = 0
self.total_progress_percent = 0

def __call__(self, update, progress_percent, time_elapsed):
self.total_update += update
self.total_progress_percent = progress_percent

mock_progress_callback = MockProgressCallback()
es = ft.demo.load_mock_customer(return_entityset=True, random_seed=0)
trans_per_session = ft.Feature(es["transactions"]["transaction_id"], parent_entity=es["sessions"], primitive=Count)
trans_per_customer = ft.Feature(es["transactions"]["transaction_id"], parent_entity=es["customers"], primitive=Count)
features = [trans_per_customer, ft.Feature(trans_per_session, parent_entity=es["customers"], primitive=Max)]
ft.calculate_feature_matrix(features, entityset=es, progress_callback=mock_progress_callback)

assert np.isclose(mock_progress_callback.total_update, 100.0)
assert np.isclose(mock_progress_callback.total_progress_percent, 100.0)

# test with multiple jobs
mock_progress_callback = MockProgressCallback()
ft.calculate_feature_matrix(features, entityset=es, progress_callback=mock_progress_callback, n_jobs=3)

assert np.isclose(mock_progress_callback.total_update, 100.0)
assert np.isclose(mock_progress_callback.total_progress_percent, 100.0)
34 changes: 33 additions & 1 deletion featuretools/tests/synthesis/test_dfs_method.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-

import numpy as np
import pandas as pd
import pytest
from distributed.utils_test import cluster
Expand Down Expand Up @@ -207,3 +207,35 @@ def test_accepts_pandas_training_window(datetime_es):
training_window=pd.Timedelta(90, "D"))

assert (feature_matrix.index == [2, 3, 4]).all()


def test_calls_progress_callback(entities, relationships):
class MockProgressCallback:
def __init__(self):
self.total_update = 0
self.total_progress_percent = 0

def __call__(self, update, progress_percent, time_elapsed):
self.total_update += update
self.total_progress_percent = progress_percent

mock_progress_callback = MockProgressCallback()

feature_matrix, features = dfs(entities=entities,
relationships=relationships,
target_entity="transactions",
progress_callback=mock_progress_callback)

assert np.isclose(mock_progress_callback.total_update, 100.0)
assert np.isclose(mock_progress_callback.total_progress_percent, 100.0)

# test with multiple jobs
mock_progress_callback = MockProgressCallback()
feature_matrix, features = dfs(entities=entities,
relationships=relationships,
target_entity="transactions",
n_jobs=3,
progress_callback=mock_progress_callback)

assert np.isclose(mock_progress_callback.total_update, 100.0)
assert np.isclose(mock_progress_callback.total_progress_percent, 100.0)