Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map_blocks returning pd.DataFrame fails with block_info parameter #11127

Open
joshua-gould opened this issue May 17, 2024 · 4 comments
Open

map_blocks returning pd.DataFrame fails with block_info parameter #11127

joshua-gould opened this issue May 17, 2024 · 4 comments
Labels
needs triage Needs a response from a contributor

Comments

@joshua-gould
Copy link

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd

np.random.seed(42)
a = da.random.random(size=(100, 100), chunks=(10, 10))


def test1(x):
    return pd.DataFrame({"x": np.arange(4), "y": np.arange(4), "z": ["a", "b", "c", "d"]})


def test2(x, block_info=None):
    return pd.DataFrame({"x": np.arange(4), "y": np.arange(4), "z": ["a", "b", "c", "d"]})


meta = dd.utils.make_meta([("x", np.int64), ("y", np.int64), ("z", object)])
df1 = da.map_blocks(test1, a, drop_axis=1, meta=meta).compute()  # works
df2 = da.map_blocks(test2, a, drop_axis=1, meta=meta).compute()  # fails with AttributeError: 'DataFrame' object has no attribute 'chunks'
@github-actions github-actions bot added the needs triage Needs a response from a contributor label May 17, 2024
@quasiben
Copy link
Member

quasiben commented May 22, 2024

Thanks @joshua-gould . I can easily reproduce with what you have above. It seems that when block_info is provided dask is in code path which assumes an Array collection

dask/dask/array/core.py

Lines 901 to 926 in 484fc3f

if has_keyword(func, "block_info"):
starts = {}
num_chunks = {}
shapes = {}
for i, (arg, in_ind) in enumerate(argpairs):
if in_ind is not None:
shapes[i] = arg.shape
if drop_axis:
# We concatenate along dropped axes, so we need to treat them
# as if there is only a single chunk.
starts[i] = [
(
cached_cumsum(arg.chunks[j], initial_zero=True)
if ind in out_ind
else [0, arg.shape[j]]
)
for j, ind in enumerate(in_ind)
]
num_chunks[i] = tuple(len(s) - 1 for s in starts[i])
else:
starts[i] = [
cached_cumsum(c, initial_zero=True) for c in arg.chunks
]
num_chunks[i] = arg.numblocks
out_starts = [cached_cumsum(c, initial_zero=True) for c in out.chunks]

This is obviously a problem as meta is coercing the collection into a resulting Dataframe. I don't think we can just swap out partitions for chunks here

Would it be possible to convert the array to a dataframe and call map_partitions instead:

a.to_dask_dataframe().map_partitions(test2)

@joshua-gould
Copy link
Author

I'm trying to extract the indices and values of a 2-d array where the value > 0. I have a solution below using dask.delayed:

import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.array.core import slices_from_chunks

x = da.random.random((24, 24), chunks=(5, 6))
# numpy solution
_x = x.compute()
_indices = np.where(_x > 0)
df = pd.DataFrame({"value": _x[_indices], "y": _indices[0], "x": _indices[1]})


@dask.delayed
def process_chunk(a, offset):
    indices = np.where(a > 0)
    y = indices[0] + offset[0]
    x = indices[1] + offset[1]
    return pd.DataFrame({"value": a[indices], "y": y, "x": x})


output = []
for s in slices_from_chunks(x.chunks):
    r = process_chunk(x[s], (s[0].start, s[1].start))
    output.append(r)
meta = dd.utils.make_meta([("value", x.dtype), ("y", np.int64), ("x", np.int64)])
ddf = dd.from_delayed(output, meta=meta).compute()

# compare with numpy
df = df.sort_values(["y", "x", "value"]).reset_index(drop=True)
ddf = ddf.sort_values(["y", "x", "value"]).reset_index(drop=True)
pd.testing.assert_frame_equal(df, ddf)

@quasiben
Copy link
Member

Hmm, could you instead do this with nonzero and a mask ?

arr = np.array([[-1, 2, 0], [4, -5, 6], [0, 0, 7]])
arr = da.from_array(arr)
indicies = da.nonzero(arr > 0) # or rely on dispatching with np.nonzero
arr[arr >0]

@joshua-gould
Copy link
Author

Thanks for your response. I'm not sure how to create a dask dataframe using this approach. I tried:

x = da.random.random((24, 24), chunks=(5, 6))
indices = da.where(x > 0)
vals = x.reshape(-1)[indices[0] * x.shape[1] + indices[1]]

ddf = dd.concat(
    [
        dd.from_array(vals, columns=["value"]),
        dd.from_array(da.stack(indices, axis=1, allow_unknown_chunksizes=True), columns=["y", "x"]),
    ],
    axis=1,
).compute()

But I get the warnings:

dask/array/slicing.py:1089: PerformanceWarning: Increasing number of chunks by factor of 20
  p = blockwise(
dask_expr/_concat.py:146: UserWarning: Concatenating dataframes with unknown divisions.
We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

2 participants