Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible bug in using apply in dask dataframes #2774

Closed
sachinruk opened this issue Oct 13, 2017 · 7 comments
Closed

Possible bug in using apply in dask dataframes #2774

sachinruk opened this issue Oct 13, 2017 · 7 comments

Comments

@sachinruk
Copy link

sachinruk commented Oct 13, 2017

This is a crosspost from https://stackoverflow.com/questions/46720983/incompatibility-of-apply-in-dask-and-pandas-dataframes. Would appreciate it if it could be answered there if it isn't a bug.

A sample of the triggers column in my Dask dataframe looks like the following:

0    [Total Traffic, DNS, UDP]
1                    [TCP RST]
2              [Total Traffic]
3                 [IP Private]
4                       [ICMP]
Name: triggers, dtype: object

I wish to create a one hot encoded version of the above arrays (putting a 1 against the DNS column in row 1 for example) by doing the following. pop_triggers contains all possible values of triggers.

for trig in pop_triggers:
    df[trig] = df.triggers.apply(lambda x: 1 if trig in x else 0)

However, the Total Traffic, DNS etc. columns all contain the value 0 and not 1 for the relevant value. When I copy it into a pandas dataframe and do the same operation, they get the expected value.

a = df[[ 'Total Traffic', 'UDP', 'NTP Amplification', 'triggers', 'ICMP']].head()
for trig in pop_triggers:
    a[trig] = a.triggers.apply(lambda x: 1 if trig in x else 0)

What am I missing here? Is it because dask is lazy that somehow it's not filling in the values as expected?

Possible bug below:
I investigated some places where the flag was set in the first place (which turned out to be far less than I expected, and got some really weird results. See below:

df2 = df[df['Total Traffic']==1]
df2[['triggers']+pop_triggers].head()

output:

        triggers	Total Traffic	UDP	DNS
9380	[ICMP, IP null, IP Private, TCP null, TCP SYN,...	1	1	1
9388	[ICMP, IP null, IP Private, TCP null, TCP SYN,...	1	1	1
19714	[ICMP, IP null, IP Private, UDP, NTP Amplifica...	1	1	1
21556	[IP null]	1	1	1
21557	[IP null]	1	1	1

Minimal working example:

triggers = [['Total Traffic', 'DNS', 'UDP'],['TCP RST'],['Total Traffic'],['IP Private'],['ICMP']]*10
df2 = dd.from_pandas(pd.DataFrame({'triggers':triggers}), npartitions=16)
pop_triggers= ['Total Traffic', 'UDP', 'DNS', 'TCP SYN', 'TCP null', 'ICMP']
for trig in pop_triggers:
    df2[trig] = df2.triggers.apply(lambda x: 1 if trig in x else 0)
df2.head()

Output:

triggers	Total Traffic	UDP	DNS	TCP SYN	TCP null	ICMP
0	[Total Traffic, DNS, UDP]	0	0	0	0	0	0
1	[TCP RST]	0	0	0	0	0	0
2	[Total Traffic]	0	0	0	0	0	0
3	[IP Private]	0	0	0	0	0	0
@TomAugspurger
Copy link
Member

The metadata is maybe getting messed up, since you're repeatedly assigning a dd.Series whose .name is triggers. Adding a meta=(trigger, 'bool') solves the issue (for me anyway).

In [50]: xs = [df2.triggers.apply(lambda x: trigger in x, meta=(trigger, 'bool')) for trigger in pop_triggers]

In [51]: dd.concat(ss, axis=1).compute()
Out[51]:
    Total Traffic    UDP    DNS  TCP SYN  TCP null   ICMP
0           False  False  False    False     False  False
1           False  False  False    False     False  False
2           False  False  False    False     False  False
3           False  False  False    False     False  False
4            True   True   True     True      True   True
..            ...    ...    ...      ...       ...    ...
45          False  False  False    False     False  False
46          False  False  False    False     False  False
47          False  False  False    False     False  False
48          False  False  False    False     False  False
49           True   True   True     True      True   True

[50 rows x 6 columns]

FYI, if you can avoid it, you don't want to be storing lists in pandas / dask dataframes. Typically it's better to do that kind of pre-processing outside pandas.

I won't have a chance to look into this bug more until later next week. Feel free to have a look at what's going on if you're interested.

@aavanian
Copy link

@TomAugspurger if you look at your results, you will see something's wrong (in addition to the metadata issue). It seems that the trigger variable used in the lambda remains mutable until the compute() call and so at compute() time, the lambda is evaluated against the last value that trigger held.

If you implement the lambda as a function, I guess the explicit closure bypass the issue (see my answer to OP's SO question, link above).

I'm not familiar enough with dask's internals to say if it's a limitation or a bug.

@TomAugspurger
Copy link
Member

TomAugspurger commented Oct 16, 2017 via email

@jcrist
Copy link
Member

jcrist commented Oct 16, 2017

It seems that the trigger variable used in the lambda remains mutable until the compute() call
...
I'm not familiar enough with dask's internals to say if it's a limitation or a bug.

This isn't a bug, this is just how python works. Closures evaluate based on the defining scope, if you change the value of trig in that scope then the closure will evaluate differently.

In [1]: [(lambda: a)() for a in range(3)] # call immediately
Out[1]: [0, 1, 2]

In [2]: funcs = [lambda: a for a in range(3)] # call later

In [3]: [f() for f in funcs]
Out[3]: [2, 2, 2]

The issue here is that this code would run fine in pandas, since there is an evaluation in each loop, but in dask all the evaluations are delayed until later, and thus all use the same value for trig.

The solution you posted on stackoverflow using args= is one way to deal with this, although note that the function is the same in all applications (and thus the definition doesn't need to be in a loop). If that's not performant enough I'd swap out the apply/assign loop and just use map_partitions directly on a single function that does the loop/assign internally.

Also note that in general it's best to avoid creating closures when working with dask, as they don't serialize well and may degrade performance if using the distributed scheduler.

@aavanian
Copy link

Ok, thanks for clarification. It was still not 100% clear at first reading but using an explicit function and loop, the function defined in the loop is rewritten over and over. It's indeed when going out of scope that the functions become distinct from each other being bound with their closure. So my solution worked but not for the reason I thought...

Well noted for closure under dask, I would have been weary of using that any way, too many layers of complexity!

As for map_partitions, unrelated to this, it should generally be more performant than apply, right (assuming no changes to index or divisions)?

@jcrist
Copy link
Member

jcrist commented Oct 16, 2017

As for map_partitions, unrelated to this, it should generally be more performant than apply, right (assuming no changes to index or divisions)?

Yes, but the speedup could be negligible or significant depending on task. Most dask dataframe operations are implemented using a core set of functions (map_partitions being one of them). The reasons for maybe rewriting to use map_partitions directly are:

  • Fewer tasks in the graph. The dask scheduler has a pretty small overhead per-task, but for very large graphs this can be an issue.
  • Faster graph build time. There is a bit of overhead to apply each step, which could be skipped. This again is usually negligible.
  • Avoiding intermediates. Each assign/apply call may create intermediate arrays which may be copied. Doing things all in one step may reduce memory usage.

In general I recommend sticking with the pandas api until you find tasks that are slow, and then drop down to using the dask-specific operations to optimize (e.g. map_partitions).

@jcrist
Copy link
Member

jcrist commented Jan 30, 2018

Closing, as this is not really a bug in dask but more an issue with how python's closures work. Feel free to reopen if you disagree.

@jcrist jcrist closed this as completed Jan 30, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants