diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index 097aa2237e..c634867349 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -951,7 +951,7 @@ def split_by_worker( # bytestream such that it cannot be deserialized anymore t = pa.Table.from_pandas(df, preserve_index=True) t = t.sort_by("_worker") - codes = np.asarray(t.select(["_worker"]))[0] + codes = np.asarray(t["_worker"]) t = t.drop(["_worker"]) del df @@ -983,7 +983,7 @@ def split_by_partition(t: pa.Table, column: str) -> dict[Any, pa.Table]: partitions.sort() t = t.sort_by(column) - partition = np.asarray(t.select([column]))[0] + partition = np.asarray(t[column]) splits = np.where(partition[1:] != partition[:-1])[0] + 1 splits = np.concatenate([[0], splits])