From 87fef7c2829c806a0a0b469d5c64c60ce76c51ca Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Jun 2023 15:18:45 +0200 Subject: [PATCH 1/7] Add types mapper --- distributed/shuffle/_arrow.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index 6a0bc81e8d..b2da72eb94 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -46,6 +46,7 @@ def check_minimal_arrow_version() -> None: def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: + import pandas as pd import pyarrow as pa file = BytesIO(data) @@ -56,6 +57,15 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: shards.append(sr.read_all()) table = pa.concat_tables(shards) df = table.to_pandas(self_destruct=True) + + def default_types_mapper(pyarrow_dtype: pa.DataType) -> object: + # Avoid converting strings from `string[pyarrow]` to `string[python]` + # if we have *some* `string[pyarrow]` + if pyarrow_dtype == pa.string() and pd.StringDtype("pyarrow") in meta.values: + return pd.StringDtype("pyarrow") + return None + + df = table.to_pandas(self_destruct=True, types_mapper=default_types_mapper) return df.astype(meta.dtypes) From 3f46581e689008f4b0c6e9613f31c239405353f8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 12 Jun 2023 10:00:01 +0200 Subject: [PATCH 2/7] large_string --- distributed/shuffle/_arrow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index b2da72eb94..3bede63218 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -61,7 +61,10 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: def default_types_mapper(pyarrow_dtype: pa.DataType) -> object: # Avoid converting strings from `string[pyarrow]` to `string[python]` # if we have *some* `string[pyarrow]` - if pyarrow_dtype == pa.string() and pd.StringDtype("pyarrow") in meta.values: + if ( + pyarrow_dtype in {pa.large_string(), pa.string()} + and pd.StringDtype("pyarrow") in meta.values + ): return pd.StringDtype("pyarrow") return None From 21f129d6a0c6f17c127523e675e9648ae441517c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 12 Jun 2023 16:43:15 +0200 Subject: [PATCH 3/7] Minor --- distributed/shuffle/_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index 3bede63218..ebe95af246 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -63,7 +63,7 @@ def default_types_mapper(pyarrow_dtype: pa.DataType) -> object: # if we have *some* `string[pyarrow]` if ( pyarrow_dtype in {pa.large_string(), pa.string()} - and pd.StringDtype("pyarrow") in meta.values + and pd.StringDtype("pyarrow") in meta.dtypes.values ): return pd.StringDtype("pyarrow") return None From 684d98870de8947f334284831080e350a12b6d66 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 12 Jun 2023 17:56:48 +0200 Subject: [PATCH 4/7] [skip-caching] From 38898f7b80375acf8175125dcc9c3a3df9e6e5db Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 12 Jun 2023 18:01:23 +0200 Subject: [PATCH 5/7] Copy only if necessary --- distributed/shuffle/_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index ebe95af246..f00bd05cae 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -69,7 +69,7 @@ def default_types_mapper(pyarrow_dtype: pa.DataType) -> object: return None df = table.to_pandas(self_destruct=True, types_mapper=default_types_mapper) - return df.astype(meta.dtypes) + return df.astype(meta.dtypes, copy=False) def list_of_buffers_to_table(data: list[bytes]) -> pa.Table: From f793bcc6e11478add801c9e7a4fc9d8dfeda832c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 13 Jun 2023 12:31:05 +0200 Subject: [PATCH 6/7] [skip-caching] From 8e93913af526283a8a5804ed85c53bb630d51682 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 14 Jun 2023 15:40:25 +0200 Subject: [PATCH 7/7] [skip-caching]