Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix perf issue in dd.Series.isin #4727

Merged
merged 1 commit into from Apr 24, 2019
Merged

Fix perf issue in dd.Series.isin #4727

merged 1 commit into from Apr 24, 2019

Conversation

@jcrist
Copy link
Member

@jcrist jcrist commented Apr 23, 2019

Previously this was slow for large lists/large npartitions for two
reasons:

  • The values object was part of every task in the graph, increasing
    the serialization cost between workers, and traversal cost when
    evaluating on a worker.
  • The values object is usually a builtin python collection (usually a
    list), which coupled with the above would significantly slow down our
    optimization passes as a large list was traversed once for every
    partition.

We now wrap values in delayed, avoiding both of these problems.

Also expanded test coverage for isin, explicitly forbidding a few odd
arguments we don't support.

Fixes #4726, #3198.

Previously this was slow for large lists/large npartitions for two
reasons:

- The `values` object was part of every task in the graph, increasing
the serialization cost between workers, and traversal cost when
evaluating on a worker.
- The `values` object is usually a builtin python collection (usually a
list), which coupled with the above would significantly slow down our
optimization passes as a large list was traversed once for every
partition.

We now wrap `values` in `delayed`, avoiding both of these problems.

Also expanded test coverage for `isin`, explicitly forbidding a few odd
arguments we don't support.
@jcrist
Copy link
Member Author

@jcrist jcrist commented Apr 24, 2019

Ping @martindurant for review

# We wrap values in a delayed for two reasons:
# - avoid serializing data in every task
# - avoid cost of traversal of large list in optimizations
return self.map_partitions(M.isin, delayed(values), meta=meta)
Copy link
Member

@mrocklin mrocklin Apr 24, 2019

I think that we call delayed already if the object is large. This happens in map_partitions if you look at the normalized_arg call

dask/dask/array/core.py

Lines 288 to 305 in 21f8de8

def normalize_arg(x):
""" Normalize user provided arguments to blockwise or map_blocks
We do a few things:
1. If they are string literals that might collide with blockwise_token then we
quote them
2. IF they are large (as defined by sizeof) then we put them into the
graph on their own by using dask.delayed
"""
if is_dask_collection(x):
return x
elif isinstance(x, str) and re.match(r'_\d+', x):
return delayed(x)
elif sizeof(x) > 1e6:
return delayed(x)
else:
return x

If this isn't happening in some cases then it might make sense to review what's there and change things at that point instead.

(though, to be clear, I think that this change is fine)

Copy link
Member Author

@jcrist jcrist Apr 24, 2019

We were using elemwise before, which doesn't have that code path. In this case I think the threshold for nbytes (currently 1e6) in large lists (a common argument type) is too low - the cost of traversing the list in the graph for both optimization and execution becomes expensive with many partitions.

Copy link
Member

@mrocklin mrocklin Apr 24, 2019

Should we improve normalize_arg to put any list of length greater than 10 into a task with delayed?

if isinstance(x, list) and len(x) > 10:
    return delayed(x)

Mostly I'm just looking for ways to generalize the improvement you've found here. I suspect that it burns us in other situations as well.

Copy link
Member Author

@jcrist jcrist Apr 24, 2019

Maybe? I'd prefer to experiment with that later - there's a few things I'd like to cleanup in dask-dataframe like this. I'll make an issue.

Copy link
Member Author

@jcrist jcrist Apr 24, 2019

bad_types = (_Frame,)
if isinstance(values, bad_types):
raise NotImplementedError(
"Passing a %r to `isin`" % typename(type(values))
Copy link
Member

@mrocklin mrocklin Apr 24, 2019

I'm a little bit surprised that we can't call df.isin(my_pandas_series). Can you help explain what was going on here?

Copy link
Member Author

@jcrist jcrist Apr 24, 2019

For pd.Series.isin all arguments are treated the same, but for dataframes Pandas does a weird with alignment along the index that we can't really support (see the comment above here).

Copy link
Member

@mrocklin mrocklin Apr 24, 2019

I think that it would be good to guide the user here a bit towards correct behavior. "Ok, you've told me I can't pass in a pandas series, what can I pass in?"

Or maybe, if they're passing in a series maybe we can coerce to a numpy array for them?

Copy link
Member Author

@jcrist jcrist Apr 24, 2019

Or maybe, if they're passing in a series maybe we can coerce to a numpy array for them?

That wouldn't match pandas then.

@jcrist
Copy link
Member Author

@jcrist jcrist commented Apr 24, 2019

Good to merge?

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Apr 24, 2019

+1 from me. I wouldn't mind a more guide-y error message, but this is definitely an improvement.

@jcrist jcrist merged commit 20d523a into dask:master Apr 24, 2019
2 checks passed
@jcrist jcrist deleted the isin-perf-fix branch Apr 24, 2019
jorge-pessoa pushed a commit to jorge-pessoa/dask that referenced this issue May 14, 2019
Previously this was slow for large lists/large npartitions for two
reasons:

- The `values` object was part of every task in the graph, increasing
the serialization cost between workers, and traversal cost when
evaluating on a worker.
- The `values` object is usually a builtin python collection (usually a
list), which coupled with the above would significantly slow down our
optimization passes as a large list was traversed once for every
partition.

We now wrap `values` in `delayed`, avoiding both of these problems.

Also expanded test coverage for `isin`, explicitly forbidding a few odd
arguments we don't support.
@gsakkis
Copy link
Contributor

@gsakkis gsakkis commented May 31, 2019

FYI I was bitten by this after upgrading from 1.2.0 because I was passing a dict_keys object (dict.keys()). It wouldn't be so bad if it raised e.g. a TypeError with a message of the offending type but instead it just hung there, which was tricky to figure out where and why.

@TomAugspurger
Copy link
Member

@TomAugspurger TomAugspurger commented May 31, 2019

@gsakkis
Copy link
Contributor

@gsakkis gsakkis commented May 31, 2019

Neither, they made it not work at all; it hangs there waiting forever

@TomAugspurger
Copy link
Member

@TomAugspurger TomAugspurger commented May 31, 2019

@jcrist
Copy link
Member Author

@jcrist jcrist commented May 31, 2019

@gsakkis can you provide an example that reproduces your issue?

@gsakkis
Copy link
Contributor

@gsakkis gsakkis commented May 31, 2019

It turns out it's happening with the distributed scheduler only. Quick and dirty test:

diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py
index 0e18f510..d82701ef 100644
--- a/dask/dataframe/tests/test_dataframe.py
+++ b/dask/dataframe/tests/test_dataframe.py
@@ -932,6 +932,21 @@ def test_isin():
             d.isin(obj)
 
 
+def test_isin_distributed():
+    import distributed
+    client = distributed.Client()
+
+    b_dict = {1: 'foo', 2: 'bar'}
+
+    # these work
+    d1 = d.isin(set(b_dict)).compute()
+    d2 = d[d['b'].isin(set(b_dict))].compute()
+
+    # these hang on dask>=1.2.1
+    d3 = d.isin(b_dict.keys()).compute()
+    d4 = d[d['b'].isin(b_dict.keys())].compute()
+
+
 def test_len():
     assert len(d) == len(full)
     assert len(d.a) == len(full.a)

@jcrist
Copy link
Member Author

@jcrist jcrist commented May 31, 2019

Cool, thanks, I'll take a look at this later.

@TomAugspurger
Copy link
Member

@TomAugspurger TomAugspurger commented Jun 3, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

4 participants