Skip to content

Commit

Permalink
Support cupy.ndarray to cudf.DataFrame dispatching in `dask.dataf…
Browse files Browse the repository at this point in the history
…rame` (#9579)
  • Loading branch information
rjzamora committed Dec 16, 2022
1 parent d943293 commit 0d8e12b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 1 deletion.
30 changes: 30 additions & 0 deletions dask/dataframe/backends.py
Expand Up @@ -15,6 +15,7 @@
union_categoricals,
)

from dask.array.core import Array
from dask.array.dispatch import percentile_lookup
from dask.array.percentile import _percentile
from dask.backends import CreationDispatch, DaskBackendEntrypoint
Expand All @@ -30,6 +31,7 @@
is_categorical_dtype_dispatch,
make_meta_dispatch,
make_meta_obj,
meta_lib_from_array,
meta_nonempty,
pyarrow_schema_dispatch,
tolist_dispatch,
Expand Down Expand Up @@ -433,6 +435,18 @@ def _nonempty_series(s, idx=None):
return out


@meta_lib_from_array.register(Array)
def _meta_lib_from_array_da(x):
# Use x._meta for dask arrays
return meta_lib_from_array(x._meta)


@meta_lib_from_array.register(np.ndarray)
def _meta_lib_from_array_numpy(x):
# numpy -> pandas
return pd


@union_categoricals_dispatch.register(
(pd.DataFrame, pd.Series, pd.Index, pd.Categorical)
)
Expand Down Expand Up @@ -713,3 +727,19 @@ class PandasBackendEntrypoint(DataFrameBackendEntrypoint):
@percentile_lookup.register_lazy("cudf")
def _register_cudf():
import dask_cudf # noqa: F401


@meta_lib_from_array.register_lazy("cupy")
def _register_cupy_to_cudf():
# Handle cupy.ndarray -> cudf.DataFrame dispatching
try:
import cudf
import cupy

@meta_lib_from_array.register(cupy.ndarray)
def meta_lib_from_array_cupy(x):
# cupy -> cudf
return cudf

except ImportError:
pass
1 change: 1 addition & 0 deletions dask/dataframe/dispatch.py
Expand Up @@ -13,6 +13,7 @@
make_meta_dispatch = Dispatch("make_meta_dispatch")
make_meta_obj = Dispatch("make_meta_obj")
meta_nonempty = Dispatch("meta_nonempty")
meta_lib_from_array = Dispatch("meta_lib_from_array")
hash_object_dispatch = Dispatch("hash_object_dispatch")
group_split_dispatch = Dispatch("group_split_dispatch")
get_parallel_type = Dispatch("get_parallel_type")
Expand Down
3 changes: 2 additions & 1 deletion dask/dataframe/io/io.py
Expand Up @@ -24,6 +24,7 @@
has_parallel_type,
new_dd_object,
)
from dask.dataframe.dispatch import meta_lib_from_array
from dask.dataframe.io.utils import DataFrameIOFunction
from dask.dataframe.utils import (
check_meta,
Expand Down Expand Up @@ -58,7 +59,7 @@ def _meta_from_array(x, columns=None, index=None, meta=None):
index = index._meta

if meta is None:
meta = pd.DataFrame()
meta = meta_lib_from_array(x).DataFrame()

if getattr(x.dtype, "names", None) is not None:
# record array has named columns
Expand Down
22 changes: 22 additions & 0 deletions dask/dataframe/io/tests/test_io.py
Expand Up @@ -481,6 +481,28 @@ def test_from_dask_array_unknown_width_error():
dd.from_dask_array(dx)


@pytest.mark.gpu
@pytest.mark.parametrize(
"array_backend, df_backend",
[("cupy", "cudf"), ("numpy", "pandas")],
)
def test_from_array_dispatching(array_backend, df_backend):
# Check array -> dataframe dispatching
array_lib = pytest.importorskip(array_backend)
df_lib = pytest.importorskip(df_backend)

with config.set({"array.backend": array_backend}):
darr = da.ones(10)
assert isinstance(darr._meta, array_lib.ndarray)

ddf1 = dd.from_array(darr) # Invokes `from_dask_array`
ddf2 = dd.from_array(darr.compute())

assert isinstance(ddf1._meta, df_lib.Series)
assert isinstance(ddf2._meta, df_lib.Series)
assert_eq(ddf1, ddf2)


def test_to_bag():
a = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
Expand Down
1 change: 1 addition & 0 deletions docs/source/how-to/selecting-the-collection-backend.rst
Expand Up @@ -128,6 +128,7 @@ Dask-Dataframe compute-based dispatch functions (as defined in ``dask.dataframe.
- is_categorical_dtype_dispatch
- make_meta_dispatch
- make_meta_obj
- meta_lib_from_array
- meta_nonempty
- pyarrow_schema_dispatch
- tolist_dispatch
Expand Down

0 comments on commit 0d8e12b

Please sign in to comment.