Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel verbose fixes #282

Merged
merged 4 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/source/guides/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ The 'cluster' value can either be the actual cluster object or a string of the a
dask_kwargs={'cluster': cluster.scheduler.address},
verbose=True)

.. note::

When using a persistent cluster, Featuretools publishes a copy of the ``EntitySet`` to the cluster the first time it calculates a feature matrix. Based on the ``EntitySet``'s metadata the cluster will reuse it for successive computations. This means if two ``EntitySets`` have the same metadata but different row values (e.g. new data is added to the ``EntitySet``), Featuretools won’t recopy the second ``EntitySet`` in later calls. A simple way to avoid this scenario is to use a unique ``EntitySet`` id.

Using the distributed dashboard
-------------------------------
Expand All @@ -52,4 +55,4 @@ The dashboard requires an additional python package, bokeh, to work. Once bokeh

Parallel Computation by Partioning Data
---------------------------------------
As an alternative to Featuretool's parallelization, the data can be partitioned and the feature calculations run on multiple cores or a cluster using Dask or Apache Spark with PySpark. This approach may be necessary with a large `Entityset` because the current parallel implementation sends the entire `EntitySet` to each worker which may exhaust the worker memory. For more information on partitioning the data and using Dask or Spark, see :doc:`/guides/performance`. Dask and Spark allow Featuretools to scale to multiple cores on a single machine or multiple machines on a cluster.
As an alternative to Featuretool's parallelization, the data can be partitioned and the feature calculations run on multiple cores or a cluster using Dask or Apache Spark with PySpark. This approach may be necessary with a large ``EntitySet`` because the current parallel implementation sends the entire ``EntitySet`` to each worker which may exhaust the worker memory. For more information on partitioning the data and using Dask or Spark, see :doc:`/guides/performance`. Dask and Spark allow Featuretools to scale to multiple cores on a single machine or multiple machines on a cluster.
16 changes: 10 additions & 6 deletions featuretools/computational_backends/calculate_feature_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,13 @@ def parallel_calculate_chunks(chunks, features, approximate, training_window,
entityset_size=entityset.__sizeof__())
# scatter the entityset
# denote future with leading underscore
start = time.time()
if verbose:
start = time.time()
es_token = "EntitySet-{}".format(tokenize(entityset))
if es_token in client.list_datasets():
print("Using EntitySet persisted on the cluster as dataset %s" % (es_token))
if verbose:
msg = "Using EntitySet persisted on the cluster as dataset {}"
print(msg.format(es_token))
_es = client.get_dataset(es_token)
else:
_es = client.scatter([entityset])[0]
Expand All @@ -557,10 +560,11 @@ def parallel_calculate_chunks(chunks, features, approximate, training_window,
pickled_feats = cloudpickle.dumps(features)
_saved_features = client.scatter(pickled_feats)
client.replicate([_es, _saved_features])
end = time.time()
scatter_time = end - start
scatter_string = "EntitySet scattered to workers in {:.3f} seconds"
print(scatter_string.format(scatter_time))
if verbose:
end = time.time()
scatter_time = end - start
scatter_string = "EntitySet scattered to workers in {:.3f} seconds"
print(scatter_string.format(scatter_time))

# map chunks
# TODO: consider handling task submission dask kwargs
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ pyyaml>=3.12
cloudpickle>=0.4.0
future>=0.16.0
pympler>=0.5
dask>=0.17.5
distributed>=1.21.8
dask>=0.19.4
distributed>=1.23.3
psutil