Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Informative N Jobs Print #511

Merged
merged 14 commits into from
Apr 26, 2019
18 changes: 14 additions & 4 deletions featuretools/computational_backends/calculate_feature_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import shutil
import time
import warnings
from builtins import zip
from collections import defaultdict
from datetime import datetime
Expand Down Expand Up @@ -537,11 +538,17 @@ def linear_calculate_chunks(chunks, features, approximate, training_window,
return feature_matrix


def scatter_warning(num_scattered_workers, num_workers):
if num_scattered_workers != num_workers:
scatter_warning = "EntitySet was only scattered to {} out of {} workers"
warnings.warn(scatter_warning.format(num_scattered_workers, num_workers))


def parallel_calculate_chunks(chunks, features, approximate, training_window,
verbose, save_progress, entityset, n_jobs,
no_unapproximated_aggs, cutoff_df_time_var,
target_time, pass_columns, dask_kwargs=None):
from distributed import as_completed
from distributed import as_completed, Future
from dask.base import tokenize

client = None
Expand Down Expand Up @@ -569,12 +576,15 @@ def parallel_calculate_chunks(chunks, features, approximate, training_window,
pickled_feats = cloudpickle.dumps(features)
_saved_features = client.scatter(pickled_feats)
client.replicate([_es, _saved_features])
num_scattered_workers = len(client.who_has([Future(es_token)]).get(es_token, []))
num_workers = len(client.scheduler_info()['workers'].values())

scatter_warning(num_scattered_workers, num_workers)
if verbose:
end = time.time()
scatter_time = end - start
scatter_string = "EntitySet scattered to workers in {:.3f} seconds"
print(scatter_string.format(scatter_time))

scatter_string = "EntitySet scattered to {} workers in {:.3f} seconds"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue wanted to remove the decimal part of the "in X seconds" message. Maybe round to integer and if it took less than 1 second display "<1" or "under 1"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added in the rounding but I decided to not add in the "<1" so that I don't have to deal with a complex code cov situation. In theory I could create a context manager to time it, and then run an on complete function which then prints out a special string but it feels too complex for a very minor piece of functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be easiest to just round up to the near integer seconds. This is just diagnostic info, so +/- 1 second doesn't matter that much

print(scatter_string.format(num_scattered_workers, scatter_time))
# map chunks
# TODO: consider handling task submission dask kwargs
_chunks = client.map(calculate_chunk,
Expand Down
13 changes: 11 additions & 2 deletions featuretools/computational_backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,17 @@ def create_client_and_cluster(n_jobs, num_tasks, dask_kwargs, entityset_size):
diagnostics_port = dask_kwargs['diagnostics_port']
del dask_kwargs['diagnostics_port']

workers = n_jobs_to_workers(n_jobs)
workers = min(workers, num_tasks)
cpu_workers = n_jobs_to_workers(n_jobs)
workers = min(cpu_workers, num_tasks)
if n_jobs != -1 and workers < n_jobs:
warning_string = "{} workers requested, but only {} workers created."
warning_string = warning_string.format(n_jobs, workers)
if cpu_workers < n_jobs:
warning_string += " Not enough cpu cores({}).".format(cpu_workers)

if num_tasks < n_jobs:
warning_string += " Not enough tasks({}).".format(num_tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to " Not enough chunks (X), consider reducing the chunk size"

warnings.warn(warning_string)

# Distributed default memory_limit for worker is 'auto'. It calculates worker
# memory limit as total virtual memory divided by the number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import shutil
import tempfile
import warnings
from builtins import range
from datetime import datetime
from itertools import combinations
Expand All @@ -19,6 +20,9 @@

import featuretools as ft
from featuretools import EntitySet, Timedelta, calculate_feature_matrix, dfs
from featuretools.computational_backends.calculate_feature_matrix import (
scatter_warning
)
from featuretools.computational_backends.utils import (
bin_cutoff_times,
calc_num_per_chunk,
Expand All @@ -44,7 +48,17 @@ def int_es():
return make_ecommerce_entityset(with_integer_time_index=True)


def test_scatter_warning():
with warnings.catch_warnings(record=True) as w:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pytest.warns could probably work here

# Cause all warnings to always be triggered.
warnings.simplefilter("always")

scatter_warning(1, 2)
assert len(w) == 1

# TODO test mean ignores nan values

rwedge marked this conversation as resolved.
Show resolved Hide resolved

def test_calc_feature_matrix(entityset):
times = list([datetime(2011, 4, 9, 10, 30, i * 6) for i in range(5)] +
[datetime(2011, 4, 9, 10, 31, i * 9) for i in range(4)] +
Expand Down
12 changes: 12 additions & 0 deletions featuretools/tests/utils_tests/test_computational_backend_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import warnings

from featuretools.computational_backends.utils import create_client_and_cluster


def test_create_client_and_cluster():
with warnings.catch_warnings(record=True) as w:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as with test_scatter_warning

# Cause all warnings to always be triggered.
warnings.simplefilter("always")

create_client_and_cluster(1000, 2, {}, 1)
assert len(w) == 1