Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Informative N Jobs Print #511

Merged
merged 14 commits into from Apr 26, 2019
20 changes: 15 additions & 5 deletions featuretools/computational_backends/calculate_feature_matrix.py
Expand Up @@ -5,6 +5,7 @@
import os
import shutil
import time
import warnings
from builtins import zip
from collections import defaultdict
from datetime import datetime
Expand Down Expand Up @@ -537,11 +538,17 @@ def linear_calculate_chunks(chunks, features, approximate, training_window,
return feature_matrix


def scatter_warning(num_scattered_workers, num_workers):
if num_scattered_workers != num_workers:
scatter_warning = "EntitySet was only scattered to {} out of {} workers"
warnings.warn(scatter_warning.format(num_scattered_workers, num_workers))


def parallel_calculate_chunks(chunks, features, approximate, training_window,
verbose, save_progress, entityset, n_jobs,
no_unapproximated_aggs, cutoff_df_time_var,
target_time, pass_columns, dask_kwargs=None):
from distributed import as_completed
from distributed import as_completed, Future
from dask.base import tokenize

client = None
Expand Down Expand Up @@ -569,12 +576,15 @@ def parallel_calculate_chunks(chunks, features, approximate, training_window,
pickled_feats = cloudpickle.dumps(features)
_saved_features = client.scatter(pickled_feats)
client.replicate([_es, _saved_features])
num_scattered_workers = len(client.who_has([Future(es_token)]).get(es_token, []))
num_workers = len(client.scheduler_info()['workers'].values())

scatter_warning(num_scattered_workers, num_workers)
if verbose:
end = time.time()
scatter_time = end - start
scatter_string = "EntitySet scattered to workers in {:.3f} seconds"
print(scatter_string.format(scatter_time))

scatter_time = round(end - start)
Copy link
Contributor

@rwedge rwedge Apr 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of rounding up round rounds to the nearest integer. To avoid getting zero we could use a function that rounds up or take the min of the rounded number and 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for wanting to tell the user it took 1 second if it actually took 0.001 seconds

Copy link
Contributor

@rwedge rwedge Apr 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought saying something took 0 seconds would seem strange to the user since clearly it took at least some time to do, but it's probably fine so let's just leave it as is.

scatter_string = "EntitySet scattered to {} workers in {} seconds"
print(scatter_string.format(num_scattered_workers, scatter_time))
# map chunks
# TODO: consider handling task submission dask kwargs
_chunks = client.map(calculate_chunk,
Expand Down
14 changes: 12 additions & 2 deletions featuretools/computational_backends/utils.py
Expand Up @@ -214,8 +214,18 @@ def create_client_and_cluster(n_jobs, num_tasks, dask_kwargs, entityset_size):
diagnostics_port = dask_kwargs['diagnostics_port']
del dask_kwargs['diagnostics_port']

workers = n_jobs_to_workers(n_jobs)
workers = min(workers, num_tasks)
cpu_workers = n_jobs_to_workers(n_jobs)
workers = min(cpu_workers, num_tasks)
if n_jobs != -1 and workers < n_jobs:
warning_string = "{} workers requested, but only {} workers created."
warning_string = warning_string.format(n_jobs, workers)
if cpu_workers < n_jobs:
warning_string += " Not enough cpu cores ({}).".format(cpu_workers)

if num_tasks < n_jobs:
chunk_warning = " Not enough chunks ({}), consider reducing the chunk size"
warning_string += chunk_warning.format(num_tasks)
warnings.warn(warning_string)

# Distributed default memory_limit for worker is 'auto'. It calculates worker
# memory limit as total virtual memory divided by the number
Expand Down
Expand Up @@ -19,6 +19,9 @@

import featuretools as ft
from featuretools import EntitySet, Timedelta, calculate_feature_matrix, dfs
from featuretools.computational_backends.calculate_feature_matrix import (
scatter_warning
)
from featuretools.computational_backends.utils import (
bin_cutoff_times,
calc_num_per_chunk,
Expand All @@ -44,7 +47,13 @@ def int_es():
return make_ecommerce_entityset(with_integer_time_index=True)


# TODO test mean ignores nan values
def test_scatter_warning():
match = r'EntitySet was only scattered to .* out of .* workers'
with pytest.warns(UserWarning, match=match) as record:
scatter_warning(1, 2)
assert len(record) == 1

rwedge marked this conversation as resolved.
Show resolved Hide resolved

def test_calc_feature_matrix(entityset):
times = list([datetime(2011, 4, 9, 10, 30, i * 6) for i in range(5)] +
[datetime(2011, 4, 9, 10, 31, i * 9) for i in range(4)] +
Expand Down
10 changes: 10 additions & 0 deletions featuretools/tests/utils_tests/test_computational_backend_utils.py
@@ -0,0 +1,10 @@
import pytest

from featuretools.computational_backends.utils import create_client_and_cluster


def test_create_client_and_cluster():
match = r'.*workers requested, but only .* workers created'
with pytest.warns(UserWarning, match=match) as record:
create_client_and_cluster(1000, 2, {}, 1)
assert len(record) == 1