Skip to content

Commit

Permalink
[Data] Add Dataset.to_dask() parameter to toggle consistent metadat…
Browse files Browse the repository at this point in the history
…a check (ray-project#37163)

Currently, `Dataset.to_dask()` checks that the metadata be consistent across resulting partitions, via `DaskDataFrame.from_delayed(verify_meta=True)`. This can sometimes raise errors such as

```
ValueError: Metadata mismatch found in `from_delayed`.
```
To improve flexibility in using the method, we expose the `verify_meta` parameter to `Dataset.to_dask()`, which allows the user to skip the aforementioned metadata check.

Signed-off-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
scottjlee authored and harborn committed Aug 17, 2023
1 parent 42fe136 commit 2941c82
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
4 changes: 4 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3621,6 +3621,7 @@ def to_dask(
Tuple[Any],
None,
] = None,
verify_meta: bool = True,
) -> "dask.DataFrame":
"""Convert this :class:`~ray.data.Dataset` into a
`Dask DataFrame <https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.html#dask.dataframe.DataFrame>`_.
Expand All @@ -3642,6 +3643,8 @@ def to_dask(
tuple of ``(name, dtype)`` can be used.
By default, this is inferred from the underlying Dataset schema,
with this argument supplying an optional override.
verify_meta: If True, Dask will check that the partitions have consistent
metadata. Defaults to True.
Returns:
A `Dask DataFrame`_ created from this dataset.
Expand Down Expand Up @@ -3714,6 +3717,7 @@ def block_to_df(block: Block):
ddf = dd.from_delayed(
[block_to_df(block) for block in self.get_internal_block_refs()],
meta=meta,
verify_meta=verify_meta,
)
return ddf

Expand Down
15 changes: 15 additions & 0 deletions python/ray/data/tests/test_ecosystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ def test_to_dask(ray_start_regular_shared, ds_format):
# Implicit Dask-on-Ray.
assert df.equals(ddf.compute())

# Test case with blocks which have different schema, where we must
# skip the metadata check in order to avoid a Dask metadata mismatch error.
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"three": [4, 5, 6], "four": ["e", "f", "g"]})
df = pd.concat([df1, df2])
ds = ray.data.from_pandas([df1, df2])
if ds_format == "arrow":
ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None)
ddf = ds.to_dask(verify_meta=False)

# Explicit Dask-on-Ray
assert df.equals(ddf.compute(scheduler=ray_dask_get))
# Implicit Dask-on-Ray.
assert df.equals(ddf.compute())


def test_to_dask_tensor_column_cast_pandas(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Dask
Expand Down

0 comments on commit 2941c82

Please sign in to comment.