From 6a42779aae6438da60355d4f2d0e8b50a1378835 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 13 Nov 2025 13:41:38 -0800 Subject: [PATCH 01/10] refactor: merge arrow code path for Connect and Classic toPandas Extract shared conversion logic into _convert_arrow_table_to_pandas helper function in conversion.py to avoid code duplication between Classic and Connect. Key changes: - Add _convert_arrow_table_to_pandas helper function in conversion.py - Update Classic toPandas to handle empty tables explicitly (SPARK-51112) - Only apply self_destruct options when table has rows - Connect imports the shared helper from conversion.py This unifies the optimizations from SPARK-53967 and SPARK-54183: - Avoid intermediate pandas DataFrame during conversion - Convert Arrow columns directly to Series with type converters - Better memory efficiency with self_destruct on non-empty tables Co-authored-by: cursor --- python/pyspark/sql/connect/client/core.py | 41 +++------ python/pyspark/sql/pandas/conversion.py | 101 +++++++++++++++++++--- python/pyspark/sql/pandas/types.py | 74 ++++++++++++++++ 3 files changed, 171 insertions(+), 45 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 2a2ac0e6b539..28ce292f034b 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -100,6 +100,7 @@ from pyspark.sql.connect.observation import Observation from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.pandas.types import _create_converter_to_pandas, from_arrow_schema +from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas from pyspark.sql.types import DataType, StructType, _has_type from pyspark.util import PythonEvalType from pyspark.storagelevel import StorageLevel @@ -1030,37 +1031,15 @@ def to_pandas( error_on_duplicated_field_names = True struct_in_pandas = "dict" - # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the - # DataFrame, as it may fail with a segmentation fault. - if table.num_rows == 0: - # For empty tables, create empty Series with converters to preserve dtypes - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(pd.Series([], name=temp_col_names[i], dtype="object")) - for i, field in enumerate(schema.fields) - ], - axis="columns", - ) - else: - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(table.columns, schema.fields) - ], - axis="columns", - ) + pdf = _convert_arrow_table_to_pandas( + table, + schema.fields, + temp_col_names, + timezone, + struct_in_pandas, + error_on_duplicated_field_names, + pandas_options, + ) # Restore original column names (including duplicates) pdf.columns = schema.names else: diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index a4ccf4da6e8a..b41beead34a9 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -36,6 +36,7 @@ MapType, TimestampType, StructType, + StructField, DataType, _create_row, StringType, @@ -53,6 +54,81 @@ from pyspark.sql import DataFrame +def _convert_arrow_table_to_pandas( + arrow_table: "pa.Table", + schema_fields: List["StructField"], + temp_col_names: List[str], + timezone: str, + struct_in_pandas: str, + error_on_duplicated_field_names: bool, + pandas_options: dict, +) -> "PandasDataFrameLike": + """ + Helper function to convert Arrow table columns to a pandas DataFrame. + + This function applies Spark-specific type converters to Arrow columns and concatenates + them into a pandas DataFrame. For empty tables (num_rows == 0), it creates empty Series + with converters to preserve dtypes and avoid potential segmentation faults. + + Parameters + ---------- + arrow_table : pyarrow.Table + The Arrow table to convert + schema_fields : list of StructField + The schema fields corresponding to the columns + temp_col_names : list of str + Temporary column names to use during conversion + timezone : str + The timezone to use for timestamp conversions + struct_in_pandas : str + How to handle struct types in pandas ("dict", "row", or "legacy") + error_on_duplicated_field_names : bool + Whether to error on duplicated field names in structs + pandas_options : dict + Options to pass to Arrow's to_pandas method + + Returns + ------- + pandas.DataFrame + The converted pandas DataFrame + """ + import pandas as pd + from pyspark.sql.pandas.types import _create_converter_to_pandas + + # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the + # DataFrame, as it may fail with a segmentation fault. + if arrow_table.num_rows == 0: + # For empty tables, create empty Series with converters to preserve dtypes + return pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(pd.Series([], name=temp_col_names[i], dtype="object")) + for i, field in enumerate(schema_fields) + ], + axis="columns", + ) + else: + # For non-empty tables, convert arrow columns directly to pandas + return pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(arrow_col.to_pandas(**pandas_options)) + for arrow_col, field in zip(arrow_table.columns, schema_fields) + ], + axis="columns", + ) + + class PandasConversionMixin: """ Mix-in for the conversion from Spark to pandas and PyArrow. Currently, only @@ -150,7 +226,7 @@ def toPandas(self) -> "PandasDataFrameLike": "date_as_object": True, "coerce_temporal_nanoseconds": True, } - if self_destruct: + if self_destruct and table.num_rows > 0: # Configure PyArrow to use as little memory as possible: # self_destruct - free columns as they are converted # split_blocks - create a separate Pandas block for each column @@ -172,24 +248,21 @@ def toPandas(self) -> "PandasDataFrameLike": error_on_duplicated_field_names = True struct_in_pandas = "dict" - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(table.columns, self.schema.fields) - ], - axis="columns", + pdf = _convert_arrow_table_to_pandas( + table, + self.schema.fields, + temp_col_names, + timezone, + struct_in_pandas, + error_on_duplicated_field_names, + pandas_options, ) + # Restore original column names (including duplicates) + pdf.columns = self.columns else: # empty columns pdf = table.to_pandas(**pandas_options) - pdf.columns = self.columns return pdf except Exception as e: diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 9583c8ac7288..441663f2959a 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -1584,3 +1584,77 @@ def convert_pandas_using_numpy_type( np_type = _to_numpy_type(field.dataType) df[field.name] = df[field.name].astype(np_type) return df + + +def _convert_arrow_table_to_pandas( + arrow_table: "pa.Table", + schema_fields: List[StructField], + temp_col_names: List[str], + timezone: str, + struct_in_pandas: str, + error_on_duplicated_field_names: bool, + pandas_options: dict, +) -> "PandasDataFrameLike": + """ + Helper function to convert Arrow table columns to a pandas DataFrame. + + This function applies Spark-specific type converters to Arrow columns and concatenates + them into a pandas DataFrame. For empty tables (num_rows == 0), it creates empty Series + with converters to preserve dtypes and avoid potential segmentation faults. + + Parameters + ---------- + arrow_table : pyarrow.Table + The Arrow table to convert + schema_fields : list of StructField + The schema fields corresponding to the columns + temp_col_names : list of str + Temporary column names to use during conversion + timezone : str + The timezone to use for timestamp conversions + struct_in_pandas : str + How to handle struct types in pandas ("dict", "row", or "legacy") + error_on_duplicated_field_names : bool + Whether to error on duplicated field names in structs + pandas_options : dict + Options to pass to Arrow's to_pandas method + + Returns + ------- + pandas.DataFrame + The converted pandas DataFrame + """ + import pandas as pd + + # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the + # DataFrame, as it may fail with a segmentation fault. + if arrow_table.num_rows == 0: + # For empty tables, create empty Series with converters to preserve dtypes + return pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(pd.Series([], name=temp_col_names[i], dtype="object")) + for i, field in enumerate(schema_fields) + ], + axis="columns", + ) + else: + # For non-empty tables, convert arrow columns directly to pandas + return pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(arrow_col.to_pandas(**pandas_options)) + for arrow_col, field in zip(arrow_table.columns, schema_fields) + ], + axis="columns", + ) From 4ac26563ee1216fdf06dea43528036d0947f7c01 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 13 Nov 2025 13:53:51 -0800 Subject: [PATCH 02/10] revert change on types.py --- python/pyspark/sql/pandas/types.py | 74 ------------------------------ 1 file changed, 74 deletions(-) diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 441663f2959a..9583c8ac7288 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -1584,77 +1584,3 @@ def convert_pandas_using_numpy_type( np_type = _to_numpy_type(field.dataType) df[field.name] = df[field.name].astype(np_type) return df - - -def _convert_arrow_table_to_pandas( - arrow_table: "pa.Table", - schema_fields: List[StructField], - temp_col_names: List[str], - timezone: str, - struct_in_pandas: str, - error_on_duplicated_field_names: bool, - pandas_options: dict, -) -> "PandasDataFrameLike": - """ - Helper function to convert Arrow table columns to a pandas DataFrame. - - This function applies Spark-specific type converters to Arrow columns and concatenates - them into a pandas DataFrame. For empty tables (num_rows == 0), it creates empty Series - with converters to preserve dtypes and avoid potential segmentation faults. - - Parameters - ---------- - arrow_table : pyarrow.Table - The Arrow table to convert - schema_fields : list of StructField - The schema fields corresponding to the columns - temp_col_names : list of str - Temporary column names to use during conversion - timezone : str - The timezone to use for timestamp conversions - struct_in_pandas : str - How to handle struct types in pandas ("dict", "row", or "legacy") - error_on_duplicated_field_names : bool - Whether to error on duplicated field names in structs - pandas_options : dict - Options to pass to Arrow's to_pandas method - - Returns - ------- - pandas.DataFrame - The converted pandas DataFrame - """ - import pandas as pd - - # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the - # DataFrame, as it may fail with a segmentation fault. - if arrow_table.num_rows == 0: - # For empty tables, create empty Series with converters to preserve dtypes - return pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(pd.Series([], name=temp_col_names[i], dtype="object")) - for i, field in enumerate(schema_fields) - ], - axis="columns", - ) - else: - # For non-empty tables, convert arrow columns directly to pandas - return pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(arrow_table.columns, schema_fields) - ], - axis="columns", - ) From 28e755bed86b607123179d48018798c990d9f3cf Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 13 Nov 2025 13:53:51 -0800 Subject: [PATCH 03/10] revert change on types.py --- python/pyspark/sql/connect/client/core.py | 2 +- python/pyspark/sql/pandas/conversion.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 28ce292f034b..fd6e124ef7f0 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -99,7 +99,7 @@ ) from pyspark.sql.connect.observation import Observation from pyspark.sql.connect.utils import get_python_ver -from pyspark.sql.pandas.types import _create_converter_to_pandas, from_arrow_schema +from pyspark.sql.pandas.types import from_arrow_schema from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas from pyspark.sql.types import DataType, StructType, _has_type from pyspark.util import PythonEvalType diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index b41beead34a9..b4940b85caaa 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -250,12 +250,12 @@ def toPandas(self) -> "PandasDataFrameLike": pdf = _convert_arrow_table_to_pandas( table, - self.schema.fields, - temp_col_names, - timezone, - struct_in_pandas, - error_on_duplicated_field_names, - pandas_options, + schema_fields=self.schema.fields, + temp_col_names=temp_col_names, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + pandas_options=pandas_options, ) # Restore original column names (including duplicates) pdf.columns = self.columns From 2037aa8fc8141b42af6d76fdc104b9d47d535171 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 14 Nov 2025 10:19:47 -0800 Subject: [PATCH 04/10] fix: handle comments --- python/pyspark/sql/connect/client/core.py | 14 +++++++------- python/pyspark/sql/pandas/conversion.py | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index fd6e124ef7f0..4fbc3ee45510 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1032,13 +1032,13 @@ def to_pandas( struct_in_pandas = "dict" pdf = _convert_arrow_table_to_pandas( - table, - schema.fields, - temp_col_names, - timezone, - struct_in_pandas, - error_on_duplicated_field_names, - pandas_options, + arrow_table=table, + schema_fields=schema.fields, + temp_col_names=temp_col_names, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + pandas_options=pandas_options, ) # Restore original column names (including duplicates) pdf.columns = schema.names diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index b4940b85caaa..1c65c908e9d9 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -55,6 +55,7 @@ def _convert_arrow_table_to_pandas( + *, arrow_table: "pa.Table", schema_fields: List["StructField"], temp_col_names: List[str], @@ -249,7 +250,7 @@ def toPandas(self) -> "PandasDataFrameLike": struct_in_pandas = "dict" pdf = _convert_arrow_table_to_pandas( - table, + arrow_table=table, schema_fields=self.schema.fields, temp_col_names=temp_col_names, timezone=timezone, From a76043da5b3cbf0c405da06445bc3c0126af13a8 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 14 Nov 2025 14:07:15 -0800 Subject: [PATCH 05/10] fix: mypy --- python/pyspark/sql/pandas/conversion.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 1c65c908e9d9..513f00f3c980 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -59,8 +59,8 @@ def _convert_arrow_table_to_pandas( arrow_table: "pa.Table", schema_fields: List["StructField"], temp_col_names: List[str], - timezone: str, - struct_in_pandas: str, + timezone: Optional[str], + struct_in_pandas: Optional[str], error_on_duplicated_field_names: bool, pandas_options: dict, ) -> "PandasDataFrameLike": @@ -79,10 +79,11 @@ def _convert_arrow_table_to_pandas( The schema fields corresponding to the columns temp_col_names : list of str Temporary column names to use during conversion - timezone : str - The timezone to use for timestamp conversions - struct_in_pandas : str - How to handle struct types in pandas ("dict", "row", or "legacy") + timezone : str or None + The timezone to use for timestamp conversions (can be None if not configured) + struct_in_pandas : str or None + How to handle struct types in pandas ("dict", "row", or "legacy", can be None + if not configured) error_on_duplicated_field_names : bool Whether to error on duplicated field names in structs pandas_options : dict From 331eb415adcadc5dda4f2da3250b7336653cc3e1 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 14 Nov 2025 14:56:30 -0800 Subject: [PATCH 06/10] Remove asterisk from function parameters --- python/pyspark/sql/pandas/conversion.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 513f00f3c980..5956da8a95d4 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -55,7 +55,6 @@ def _convert_arrow_table_to_pandas( - *, arrow_table: "pa.Table", schema_fields: List["StructField"], temp_col_names: List[str], From 9dd85d2be5b2471c516b27d0f417c4b35e3c41a0 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 16 Nov 2025 20:09:50 -0800 Subject: [PATCH 07/10] refactor: further simplify logic --- python/pyspark/sql/connect/client/core.py | 57 ++----- python/pyspark/sql/pandas/conversion.py | 188 +++++++++++----------- 2 files changed, 104 insertions(+), 141 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 4fbc3ee45510..2bba0745d8f6 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -985,8 +985,8 @@ def to_pandas( # Get all related configs in a batch ( timezone, - struct_in_pandas, - self_destruct, + structHandlingMode, + selfDestruct, ) = self.get_configs( "spark.sql.session.timeZone", "spark.sql.execution.pandas.structHandlingMode", @@ -994,7 +994,7 @@ def to_pandas( ) table, schema, metrics, observed_metrics, _ = self._execute_and_fetch( - req, observations, self_destruct == "true" + req, observations, selfDestruct == "true" ) assert table is not None ei = ExecutionInfo(metrics, observed_metrics) @@ -1002,49 +1002,14 @@ def to_pandas( schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True) assert schema is not None and isinstance(schema, StructType) - # Rename columns to avoid duplicated column names during processing - temp_col_names = [f"col_{i}" for i in range(len(schema.names))] - table = table.rename_columns(temp_col_names) - - # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type - # values, but we should use datetime.date to match the behavior with when - # Arrow optimization is disabled. - pandas_options = {"coerce_temporal_nanoseconds": True} - if self_destruct == "true" and table.num_rows > 0: - # Configure PyArrow to use as little memory as possible: - # self_destruct - free columns as they are converted - # split_blocks - create a separate Pandas block for each column - # use_threads - convert one column at a time - pandas_options.update( - { - "self_destruct": True, - "split_blocks": True, - "use_threads": False, - } - ) - - if len(schema.names) > 0: - error_on_duplicated_field_names: bool = False - if struct_in_pandas == "legacy" and any( - _has_type(f.dataType, StructType) for f in schema.fields - ): - error_on_duplicated_field_names = True - struct_in_pandas = "dict" - - pdf = _convert_arrow_table_to_pandas( - arrow_table=table, - schema_fields=schema.fields, - temp_col_names=temp_col_names, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - pandas_options=pandas_options, - ) - # Restore original column names (including duplicates) - pdf.columns = schema.names - else: - # empty columns - pdf = table.to_pandas(**pandas_options) + pdf = _convert_arrow_table_to_pandas( + arrow_table=table, + schema=schema, + timezone=timezone, + struct_handling_mode=structHandlingMode, + date_as_object=False, + self_destruct=selfDestruct == "true", + ) if len(metrics) > 0: pdf.attrs["metrics"] = metrics diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 5956da8a95d4..199b5602d7a3 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -36,7 +36,7 @@ MapType, TimestampType, StructType, - StructField, + _has_type, DataType, _create_row, StringType, @@ -56,37 +56,35 @@ def _convert_arrow_table_to_pandas( arrow_table: "pa.Table", - schema_fields: List["StructField"], - temp_col_names: List[str], - timezone: Optional[str], - struct_in_pandas: Optional[str], - error_on_duplicated_field_names: bool, - pandas_options: dict, + schema: "StructType", + *, + timezone: Optional[str] = None, + struct_handling_mode: Optional[str] = None, + date_as_object: bool = False, + self_destruct: bool = False, ) -> "PandasDataFrameLike": """ Helper function to convert Arrow table columns to a pandas DataFrame. This function applies Spark-specific type converters to Arrow columns and concatenates - them into a pandas DataFrame. For empty tables (num_rows == 0), it creates empty Series - with converters to preserve dtypes and avoid potential segmentation faults. + them into a pandas DataFrame. Parameters ---------- arrow_table : pyarrow.Table The Arrow table to convert - schema_fields : list of StructField - The schema fields corresponding to the columns - temp_col_names : list of str - Temporary column names to use during conversion + schema : StructType + The schema of the DataFrame timezone : str or None The timezone to use for timestamp conversions (can be None if not configured) - struct_in_pandas : str or None + struct_handling_mode : str or None How to handle struct types in pandas ("dict", "row", or "legacy", can be None - if not configured) - error_on_duplicated_field_names : bool - Whether to error on duplicated field names in structs - pandas_options : dict - Options to pass to Arrow's to_pandas method + if not configured). If "legacy", it will be converted to "dict" and error checking + for duplicated field names will be enabled when StructType fields are present. + date_as_object : bool + Whether to convert date values to Python datetime.date objects (default: False) + self_destruct : bool + Whether to enable memory-efficient self-destruct mode for large tables (default: False) Returns ------- @@ -96,38 +94,74 @@ def _convert_arrow_table_to_pandas( import pandas as pd from pyspark.sql.pandas.types import _create_converter_to_pandas + # Build pandas options + # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type + # values, but we should use datetime.date to match the behavior with when + # Arrow optimization is disabled. + pandas_options = {"coerce_temporal_nanoseconds": True} + if date_as_object: + pandas_options["date_as_object"] = True + + # Handle empty columns case + if len(schema.fields) == 0: + return arrow_table.to_pandas(**pandas_options) + + # Rename columns to avoid duplicated column names during processing + temp_col_names = [f"col_{i}" for i in range(len(schema.names))] + arrow_table = arrow_table.rename_columns(temp_col_names) + + # Configure self-destruct mode for memory efficiency + if self_destruct and arrow_table.num_rows > 0: + # Configure PyArrow to use as little memory as possible: + # self_destruct - free columns as they are converted + # split_blocks - create a separate Pandas block for each column + # use_threads - convert one column at a time + pandas_options.update( + { + "self_destruct": True, + "split_blocks": True, + "use_threads": False, + } + ) + + # Handle legacy struct handling mode + error_on_duplicated_field_names = False + if struct_handling_mode == "legacy" and any( + _has_type(f.dataType, StructType) for f in schema.fields + ): + error_on_duplicated_field_names = True + struct_handling_mode = "dict" + # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the # DataFrame, as it may fail with a segmentation fault. if arrow_table.num_rows == 0: - # For empty tables, create empty Series with converters to preserve dtypes - return pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(pd.Series([], name=temp_col_names[i], dtype="object")) - for i, field in enumerate(schema_fields) - ], - axis="columns", + # For empty tables, create empty Series to preserve dtypes + column_data = ( + pd.Series([], name=temp_col_names[i], dtype="object") for i in range(len(schema.fields)) ) else: - # For non-empty tables, convert arrow columns directly to pandas - return pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(arrow_table.columns, schema_fields) - ], - axis="columns", - ) + # For non-empty tables, convert arrow columns directly + column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in arrow_table.columns) + + # Apply Spark-specific type converters to each column + pdf = pd.concat( + ( + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_handling_mode, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(series) + for series, field in zip(column_data, schema.fields) + ), + axis="columns", + ) + + # Restore original column names (including duplicates) + pdf.columns = schema.names + + return pdf class PandasConversionMixin: @@ -205,64 +239,28 @@ def toPandas(self) -> "PandasDataFrameLike": try: import pyarrow as pa - self_destruct = arrowPySparkSelfDestructEnabled == "true" - batches = self._collect_as_arrow(split_batches=self_destruct) + batches = self._collect_as_arrow( + split_batches=arrowPySparkSelfDestructEnabled == "true" + ) - # Rename columns to avoid duplicated column names. - temp_col_names = [f"col_{i}" for i in range(len(self.columns))] if len(batches) > 0: - table = pa.Table.from_batches(batches).rename_columns(temp_col_names) + table = pa.Table.from_batches(batches) else: # empty dataset - table = arrow_schema.empty_table().rename_columns(temp_col_names) + table = arrow_schema.empty_table() # Ensure only the table has a reference to the batches, so that # self_destruct (if enabled) is effective del batches - # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type - # values, but we should use datetime.date to match the behavior with when - # Arrow optimization is disabled. - pandas_options = { - "date_as_object": True, - "coerce_temporal_nanoseconds": True, - } - if self_destruct and table.num_rows > 0: - # Configure PyArrow to use as little memory as possible: - # self_destruct - free columns as they are converted - # split_blocks - create a separate Pandas block for each column - # use_threads - convert one column at a time - pandas_options.update( - { - "self_destruct": True, - "split_blocks": True, - "use_threads": False, - } - ) - - if len(self.columns) > 0: - timezone = sessionLocalTimeZone - struct_in_pandas = pandasStructHandlingMode - - error_on_duplicated_field_names = False - if struct_in_pandas == "legacy": - error_on_duplicated_field_names = True - struct_in_pandas = "dict" - - pdf = _convert_arrow_table_to_pandas( - arrow_table=table, - schema_fields=self.schema.fields, - temp_col_names=temp_col_names, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - pandas_options=pandas_options, - ) - # Restore original column names (including duplicates) - pdf.columns = self.columns - else: - # empty columns - pdf = table.to_pandas(**pandas_options) + pdf = _convert_arrow_table_to_pandas( + arrow_table=table, + schema=self.schema, + timezone=sessionLocalTimeZone, + struct_handling_mode=pandasStructHandlingMode, + date_as_object=True, + self_destruct=arrowPySparkSelfDestructEnabled == "true", + ) return pdf From bd77465301e12d48e012849fce5099e974447a45 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 16 Nov 2025 23:39:53 -0800 Subject: [PATCH 08/10] Remove unused import of _has_type --- python/pyspark/sql/connect/client/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 2bba0745d8f6..c38fd761a64f 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -101,7 +101,7 @@ from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.pandas.types import from_arrow_schema from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas -from pyspark.sql.types import DataType, StructType, _has_type +from pyspark.sql.types import DataType, StructType from pyspark.util import PythonEvalType from pyspark.storagelevel import StorageLevel from pyspark.errors import PySparkValueError, PySparkAssertionError, PySparkNotImplementedError From 1f8049fbff02ea17857647adbd113c28e402be75 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 17 Nov 2025 14:01:44 -0800 Subject: [PATCH 09/10] fix: mypy complain --- python/pyspark/sql/pandas/conversion.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 199b5602d7a3..ac2628177c0d 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -20,7 +20,10 @@ Callable, List, Optional, + Sequence, Union, + Iterable, + cast, no_type_check, overload, TYPE_CHECKING, @@ -145,15 +148,18 @@ def _convert_arrow_table_to_pandas( # Apply Spark-specific type converters to each column pdf = pd.concat( - ( - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_handling_mode, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(series) - for series, field in zip(column_data, schema.fields) + objs=cast( + Sequence[pd.Series], + ( + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_handling_mode, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(series) + for series, field in zip(column_data, schema.fields) + ), ), axis="columns", ) From 1a5fad926f244cb4e525be7896bdcd70ae7138f0 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 17 Nov 2025 14:58:40 -0800 Subject: [PATCH 10/10] fix: format --- python/pyspark/sql/pandas/conversion.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index ac2628177c0d..1d85dc9b12a8 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -22,7 +22,6 @@ Optional, Sequence, Union, - Iterable, cast, no_type_check, overload,