New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocessing #170

Merged
merged 86 commits into from Jun 21, 2018

Conversation

Projects
None yet
4 participants
@rwedge
Contributor

rwedge commented Jun 18, 2018

Adds support for calculating feature matrices on multiple cores using Dask distributed. When njobs does not equal 1, calculate_feature_matrix will create njobs many distributed workers to calculate the feature matrix with. An existing dask cluster can be used if the address is passed in as dask_kwargs['cluster'].

@@ -86,6 +89,15 @@ def calculate_feature_matrix(features, entityset=None, cutoff_time=None, instanc
percentage of all instances. If passed the string "cutoff time",
rows are split per cutoff time.
njobs (int, optional): number of parallel threads to use when

This comment has been minimized.

@kmax12

kmax12 Jun 18, 2018

Member

for consistency with sklearn, let's call this parameter n_jobs.

also, this controls number of parallel processes, not threads, correct?

pass_columns=pass_columns)
feature_matrix = []
# TODO: handle errored / failed

This comment has been minimized.

@kmax12

kmax12 Jun 18, 2018

Member

is this still a todo?

return feature_matrix
def dask_calculate_chunk(chunk, saved_features, entityset,

This comment has been minimized.

@kmax12

kmax12 Jun 18, 2018

Member

i see this function doesn't do much. I wonder if we can refactor the inputs to calculate_chunk without needing to have a separate dask_calculate_chunk

This comment has been minimized.

@rwedge

rwedge Jun 18, 2018

Contributor

For the feature list we could check if features was a string instead of a list and unpickle the string. For PandasBackend we could make backend optional and create it inside calculate_chunk if backend is None

@@ -188,6 +188,9 @@ def __eq__(self, other, deep=False):
return True
def __sizeof__(self):
return self.df.__sizeof__() + self.last_time_index.__sizeof__()

This comment has been minimized.

@kmax12

kmax12 Jun 18, 2018

Member

can we use self.data.__sizeof__()? that would be more future proof as well

def __sizeof__(self):
return sum([entity.__sizeof__() for entity in self.entities])
def __dask_tokenize__(self):

This comment has been minimized.

@kmax12

kmax12 Jun 18, 2018

Member

would it be possible to use the metadata to create this?

This comment has been minimized.

@bschreck

bschreck Jun 21, 2018

Contributor

this looks resolved, that's exactly what its doing

pass_columns=pass_columns,
dask_kwargs=dask_kwargs or {})
else:
backend = PandasBackend(entityset, features)

This comment has been minimized.

@kmax12

kmax12 Jun 20, 2018

Member

for consistency, perhaps this else block should be in a function like parallel_calculate_chunks is above

Using the distributed dashboard
-------------------------------
Dask.distributed has a web-based diagnostics dashboard that can be used to analyze the state of the workers and task. An in-depth description of the web interface can be found `here <https://distributed.readthedocs.io/en/latest/web.html>`_. The dashboard requires an additional python package, bokeh, to work. Once bokeh is installed, the web interface will be launched by default when a LocalCluster is created. The cluster created by featuretools when using ``n_jobs`` does not enable the web interface automatically. To do so, the port to launch the main web interface on must be specified in ``dask_kwargs``::

This comment has been minimized.

@kmax12

kmax12 Jun 21, 2018

Member

can we put a screen shot of a dashboard in the documentation?

client.publish_dataset(**{_es.key: _es})
# save features to a tempfile and scatter it
pickled_feats = cloudpickle.dumps(features)

This comment has been minimized.

@bschreck

bschreck Jun 21, 2018

Contributor

Why is features handled differently than entityset? I think I know just want to double check. For EntitySet, we want to cache it on the cluster for further computation. But feature objects are small and fast to compute, so we don't want them to stay there and just send them out for the current job. Is that right?

This comment has been minimized.

@rwedge

rwedge Jun 21, 2018

Contributor

Yeah, my thoughts are that features are more likely to change between computations than the EntitySet and features are small enough that resending them shouldn't slow things down very much.

@kmax12

This comment has been minimized.

Member

kmax12 commented Jun 21, 2018

Looks great. Merging!

@kmax12 kmax12 merged commit b4dd270 into master Jun 21, 2018

2 checks passed

ci/circleci Your tests passed on CircleCI!
Details
license/cla Contributor License Agreement is signed.
Details

@rwedge rwedge referenced this pull request Jun 22, 2018

Merged

v0.2.0 #173

@kmax12 kmax12 deleted the multiprocessing branch Aug 15, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment