Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not process datasets created by the older version of Dask #11160

Open
dbalabka opened this issue Jun 4, 2024 · 9 comments
Open

Can not process datasets created by the older version of Dask #11160

dbalabka opened this issue Jun 4, 2024 · 9 comments
Labels
needs triage Needs a response from a contributor

Comments

@dbalabka
Copy link

dbalabka commented Jun 4, 2024

Describe the issue:
After upgrading the Dask from 2023.9.3 to the latest version 2024.5.2 or 2024.4.1, we can not load the existing parquet files created by the previous version.

I'm getting an error during to_parquet operation (when dask-expr is enabled):

.../.venv/lib/python3.10/site-packages/dask_expr/_collection.py:301: UserWarning: Dask annotations {'retries': 5} detected. Annotations will be ignored when using query-planning.
  warnings.warn(
Traceback (most recent call last):
  File ".../demand_forecasting/dask/data.py", line 292, in _write_to_gcs
    ddf.to_parquet(url, **kwargs)
  File ".../.venv/lib/python3.10/site-packages/dask_expr/_collection.py", line 3266, in to_parquet
    return to_parquet(self, path, **kwargs)
  File ".../.venv/lib/python3.10/site-packages/dask_expr/io/parquet.py", line 653, in to_parquet
    out = out.compute(**compute_kwargs)
  File ".../.venv/lib/python3.10/site-packages/dask_expr/_collection.py", line 476, in compute
    return DaskMethodsMixin.compute(out, **kwargs)
  File ".../.venv/lib/python3.10/site-packages/dask/base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File ".../.venv/lib/python3.10/site-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/dask/dataframe/dispatch.py", line 68, in concat
  File "/opt/conda/lib/python3.10/site-packages/dask/dataframe/backends.py", line 688, in concat_pandas
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 489, in _get_ndims
TypeError: cannot concatenate object of type '<class 'tuple'>'; only Series and DataFrame objs are valid

Disabling the dask-expr leads to different error during repartition operation:

Traceback (most recent call last):
  File ".../dataset/partition.py", line 142, in repartition
    ddf = ddf.repartition(partition_size=partition_size)
  File ".../.venv/lib/python3.10/site-packages/dask/dataframe/core.py", line 1802, in repartition
    return repartition_size(self, partition_size)
  File ".../.venv/lib/python3.10/site-packages/dask/dataframe/core.py", line 8104, in repartition_size
    mem_usages = df.map_partitions(total_mem_usage, deep=True).compute()
  File ".../.venv/lib/python3.10/site-packages/dask/base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File ".../.venv/lib/python3.10/site-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/dask/dataframe/shuffle.py", line 839, in partitioning_index
  File "/opt/conda/lib/python3.10/site-packages/dask/dataframe/backends.py", line 523, in hash_object_pandas
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/common.py", line 573, in require_length_match
ValueError: Length of values (0) does not match length of index (12295809)

The dataset consists of parquet 3 files (e.g., dataset.parquet/part.X.parquet) with the following Dask dtypes:

Col1, Int64
Col2, Int16
Col3, Int32
Col4, datetime64[us]

Pandas dtypes:

Col1, int64[pyarrow]
Col2, int16[pyarrow]
Col3, int32[pyarrow]
Col4, timestamp[ns][pyarrow]

The index is named as __null_dask_index__.

The important observation is that TypeError error disappears if I take only part of the dataset as follows:

ddf.loc[:100000]

However, disabling the dask-expr still leads to an error:

ValueError: Length of values (0) does not match length of index (100001)

Minimal Complete Verifiable Example:

  1. Use any parquet data
  2. Try to shuffle using a not existing column and export the data:
ddf = (dd
       .read_parquet('gs://.../....parquet')
       .shuffle(on='does_not_exist', npartitions=64)
       .repartition(partition_size='100MB')
       .to_parquet('data_test.parquet'))

Anything else we need to know?:

Environment:

  • Dask version: 2024.5.2
  • Python version: 3.10
  • Operating System: WSL, Ubuntu 22.04
  • Install method (conda, pip, source): poetry
pandas==2.2.2
pyarrow==14.0.2
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Jun 4, 2024
@fjetter
Copy link
Member

fjetter commented Jun 4, 2024

what pyarrow version did you use to create the dataset and which one are you using to read the dataset?

@dbalabka
Copy link
Author

dbalabka commented Jun 4, 2024

@fjetter, thanks for a quick reply. We installed pyarrow 14.0.2.

The important observation is that TypeError error disappears if I take only part of the dataset as follows:

ddf.loc[:100000]

However, disabling the dask-expr still leads to an error:

ValueError: Length of values (0) does not match length of index (100001)

(I've added a note to the description)

@dbalabka
Copy link
Author

@fjetter , in case of TypeError: cannot concatenate object of type '<class 'tuple'>'; only Series and DataFrame objs are valid I'm getting the following values here:
https://github.com/dask/dask/blob/main/dask/dataframe/backends.py#L688

I've added the debug code on all nodes as follows:

                from dask.distributed import print
                print(dfs3[0], dfs3[1], dfs3, join)
                out = pd.concat(dfs3, join=join, sort=False)

dfs3[0] - is DataFrame
dfs3[1] - is tuple with the following value:

('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 4)

dfs3 is a list that looks like as follows:

[ .. .[48081 rows x 4 columns], ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 4), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 5), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 6), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 7), ('repa
rtition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 8), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 9), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 10), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 11), ('repartition-split-100000000-fe2ba6
93bab4c1021c4766fe26e0d5dc', 12), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 13), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 14), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 15), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc',
 16), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 17), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 18), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 19), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 20), ('repartition-split-10
0000000-fe2ba693bab4c1021c4766fe26e0d5dc', 21), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 22), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 23), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 24), ('repartition-split-100000000-fe2ba693bab4c1021c47
66fe26e0d5dc', 25), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 26), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 27), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 28), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 29), ('repart
ition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 30), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 31), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 32), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 33), ('repartition-split-100000000-fe2ba6
93bab4c1021c4766fe26e0d5dc', 34), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 35), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 36), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 37), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc',
 38), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 39), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 40), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 41), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 42), ('repartition-split-10
0000000-fe2ba693bab4c1021c4766fe26e0d5dc', 43), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 44), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 45), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 46), ('repartition-split-100000000-fe2ba693bab4c1021c47
66fe26e0d5dc', 47), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 48), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 49), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 50), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 51), ('repart
ition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 52), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 53), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 54), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 55), ('repartition-split-100000000-fe2ba6
93bab4c1021c4766fe26e0d5dc', 56), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 57), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 58), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 59), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 60), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 61), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 62), ('repartition-split-100000000-fe2ba693bab4c1021c4766fe26e0d5dc', 63)]

@dbalabka
Copy link
Author

@fjetter , in case of ValueError: Length of values (0) does not match length of index (12295809) I'm getting the following values here:
https://github.com/dask/dask/blob/main/dask/dataframe/backends.py#L523-L525

I've added the debug code on all nodes as follows:

@hash_object_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
def hash_object_pandas(
    obj, index=True, encoding="utf8", hash_key=None, categorize=True
):
    from dask.distributed import print
    print(obj)
    print(index)
    print(encoding)
    print(hash_key)
    print(categorize)
    return pd.util.hash_pandas_object(
        obj, index=index, encoding=encoding, hash_key=hash_key, categorize=categorize
    )

Here is an output:

Empty DataFrame
Columns: []
Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, ...]

[12295809 rows x 0 columns]
False
utf8
None
True

@dbalabka
Copy link
Author

@fjetter , the problem is that I'm shuffling using the existing column. Dask should provide a proper error message in this case. I've added preproduction steps. Please see:

  1. Use any parquet data
  2. Try to shuffle using a non-existing column and export the data:
ddf = (dd
       .read_parquet('gs://.../....parquet')
       .shuffle(on='does_not_exist', npartitions=64)
       .repartition(partition_size='100MB')
       .to_parquet('data_test.parquet'))

@fjetter
Copy link
Member

fjetter commented Jun 12, 2024

Thanks for chasing this down! I agre this should raise, I opened #11174 to address this

@dbalabka
Copy link
Author

dbalabka commented Jun 12, 2024

@fjetter , the problem isn't strictly related to dask-expr. The different problem appears even when dask-expr is off using dask.config.set({"dataframe.query-planning": False}) as described in the ticket's description.

@fjetter
Copy link
Member

fjetter commented Jun 12, 2024

indeed, I transferred the issue to dask/dask

@dbalabka
Copy link
Author

@fjetter , after spending some time testing the solution, I've found that there are two separate problems:

Problem 1
The following problem still appears when the dask-expr is enabled, and I don't have a reproduction step yet.
TypeError: cannot concatenate object of type '<class 'tuple'>'; only Series and DataFrame objs are valid
Please see more details here: #11160 (comment)

Problem 2
The following problem is gone and appears only when dask-expr is disabled and incorrect on=<column> provided for shuffle() method:
ValueError: Length of values (0) does not match length of index (100001)
Please see #11160 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

2 participants