Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] dask.dataframe.DataFrame.merge fails for inner join #4643

Closed
trstovall opened this issue Mar 28, 2019 · 15 comments
Closed

[bug] dask.dataframe.DataFrame.merge fails for inner join #4643

trstovall opened this issue Mar 28, 2019 · 15 comments

Comments

@trstovall
Copy link

Setup

from pandas import DataFrame
from dask.delayed import delayed
from dask.dataframe import from_delayed

A = from_delayed([
  delayed(DataFrame)({'x': range(i, i+5), 'a': range(i, i+5)})
  for i in range(0, 10, 5)
])

B = from_delayed([
  delayed(DataFrame)({'x': range(i, i+5), 'b': range(i, i+5)})
  for i in range(0, 10, 5)
])

C = A.merge(
  B, on=('x',), how='inner'
)

Traceback

>>> C.compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/lib/python3.6/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/base.py", line 398, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/threaded.py", line 76, in get
    pack_exception=pack_exception, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/local.py", line 462, in get_async
    raise_exception(exc, tb)
  File "/conda/lib/python3.6/site-packages/dask/compatibility.py", line 112, in reraise
    raise exc
  File "/conda/lib/python3.6/site-packages/dask/local.py", line 230, in execute_task
    result = _execute_task(task, data)
  File "/conda/lib/python3.6/site-packages/dask/core.py", line 119, in _execute_task
    return func(*args2)
  File "/conda/lib/python3.6/site-packages/dask/optimization.py", line 942, in __call__
    dict(zip(self.inkeys, args)))
  File "/conda/lib/python3.6/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/conda/lib/python3.6/site-packages/dask/core.py", line 119, in _execute_task
    return func(*args2)
  File "/conda/lib/python3.6/site-packages/dask/compatibility.py", line 93, in apply
    return func(*args, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/dataframe/core.py", line 3794, in apply_and_enforce
    df = func(*args, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/dataframe/shuffle.py", line 417, in partitioning_index
    return hash_pandas_object(df, index=False) % int(npartitions)
  File "/conda/lib/python3.6/site-packages/pandas/core/util/hashing.py", line 117, in hash_pandas_object
    h = Series(h, index=obj.index, dtype='uint64', copy=False)
  File "/conda/lib/python3.6/site-packages/pandas/core/series.py", line 262, in __init__
    .format(val=len(data), ind=len(index)))
ValueError: Length of passed values is 0, index implies 5

Version

>>> pandas.__version__
'0.23.4'
>>> dask.__version__
'1.1.4'
@mrocklin
Copy link
Member

mrocklin commented Mar 28, 2019 via email

@trstovall
Copy link
Author

This bug doesn't occur when pandas.DataFrame is replaced by cudf.DataFrame.

@jakirkham
Copy link
Member

jakirkham commented Jun 3, 2019

Any thoughts on this one, @TomAugspurger?

@TomAugspurger
Copy link
Member

Due to pandas-dev/pandas#24318, we need to not call hash_pandas_object on empty data frames. It's not clear what pandas should do in that case, so I think dask needs to work around it.

I suspect shuffle and partitioning_index would both need to be updated to handle empty partitions somehow, but I haven't looked into what they should do (and don't plan to any time soon).

@saravananpsg
Copy link

I am facing the same issue when joining multiple dask data-frames using inner join on a multi-core cpu ?

Version:
Dask - '2.20.0'
Pandas - '0.24.2'

Code Snippet:
dfs=[]
for file in csv_files:
df = pd.read_csv(file)
df = dd.from_pandas(df, npartitions=20)
df['key'] = df.map_partitions(
lambda mdf: mdf.apply(lambda row: get_value(row), axis=1)
).compute(scheduler='processes')
df.set_index('key')
dfs.append(df)

df_merged = reduce(lambda left, right: dd.merge(left, right,
                                                left_index=True, 
                                                right_index=True,
                                                on=['key'],
                                                how='inner', 
                                                suffixes=('', '_y')), dfs)

The same inner join works fine when we pass the compute='sync'; however, compute='processes' yields incorrect results.

@saravananpsg
Copy link

@mrocklin @TomAugspurger any advise on this!

@TomAugspurger
Copy link
Member

@saravananpsg does #4643 (comment) answer your question?

This issue raises a traceback, but you mention incorrect results. Are you sure your issue is the same?

@saravananpsg
Copy link

saravananpsg commented Oct 14, 2020

@TomAugspurger Yes. It may be related to this issue.

In my case, the inner join results are more like a outer join. I am getting additional unmatched rows when joining multiple dataframes on a distributed systems. I tried with/without shuffle ~ with respect to task level(shuffle='task') during merging. No luck though :-(

Also tried to

  • convert back to pandas df and reset the index
  • tried to merge the results in pandas, the results are not the same as expected for inner join

Is there any workaround for this ?

Kindly provide your thoughts on this!

@lfdversluis
Copy link

I see the merge function is still bugged, both for outer and inner. Any way to mitigate this? I tried to use align to make sure the dataframes have the same indices/columns but that seems unsupported atm.

@saravananpsg
Copy link

One way is to break down the functionalities (suppose if you are computing data at different points); then write the files as and when required, by this way the partitioned index can be wiped away.

@jsignell
Copy link
Member

jsignell commented Dec 9, 2020

I think this is fixed on dask master. I just failed to reproduce the MRE on dask master with pandas 1.1.4.

@jakirkham
Copy link
Member

We also just released on Friday. So this may already be fixed in a release. Would make sure to upgrade and see if there’s still an issue.

@jsignell
Copy link
Member

I am going to close this, but feel free to reopen or comment if you can still reproduce.

@lampda
Copy link

lampda commented Jan 8, 2022

Has this bug fixed already?

When I do outer merge, I had this error message:
"ValueError: Length of passed values is 0, index implies 48679."

My pandas is of version '1.1.5' while dask '2021.03.0'

@jakirkham
Copy link
Member

@lampda would suggest raising a new issue with a minimal reproducer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants