Skip to content

[BUG] map_partition of a delayed DataFrame partition #4980

@madsbk

Description

@madsbk

What happened:
Get a cancel error:

concurrent.futures._base.CancelledError: ('f-0f531032308c06167e4aa2b8add1e324', 1)

What you expected to happen:
Get the result of mapping a DataFrame on the function f

Minimal Complete Verifiable Example:

import pandas as pd
import dask
import dask.dataframe as dd
from distributed import Client, LocalCluster, wait

def inc(x):
    return x + 1

def f(df, x):
    print("f working?: ", type(x))
    return x

def main(client):
    delayed_df = dd.from_pandas(
        pd.DataFrame({"a": range(5)}), npartitions=2
    ).to_delayed()
    dl = delayed_df[0].persist()
    # Replaceing the `delayed_df` with the more simple delayed `inc`, everything works
    # dl = dask.delayed(inc)(42)
    wait(dl)

    df: dd.DataFrame = dd.from_pandas(pd.DataFrame({"a": range(5)}), npartitions=2)
    df: dd.DataFrame = df.map_partitions(f, dl, meta=df._meta)
    # print("main() - df.dask.layers: ", df.dask.layers)
    df = df.persist(optimize_graph=False)
    df.compute()

if __name__ == "__main__":
    with LocalCluster(
        scheduler_port=0, asynchronous=False, n_workers=1, nthreads=1, processes=True
    ) as cluster:
        with Client(cluster, asynchronous=False) as client:
            main(client)

Am I doing something stupid here? Doing dl = delayed_df[0].persist() to get a delayed handle to the first partition of df should be fine right?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions