diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 2a2ac0e6b539..c38fd761a64f 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -99,8 +99,9 @@ ) from pyspark.sql.connect.observation import Observation from pyspark.sql.connect.utils import get_python_ver -from pyspark.sql.pandas.types import _create_converter_to_pandas, from_arrow_schema -from pyspark.sql.types import DataType, StructType, _has_type +from pyspark.sql.pandas.types import from_arrow_schema +from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas +from pyspark.sql.types import DataType, StructType from pyspark.util import PythonEvalType from pyspark.storagelevel import StorageLevel from pyspark.errors import PySparkValueError, PySparkAssertionError, PySparkNotImplementedError @@ -984,8 +985,8 @@ def to_pandas( # Get all related configs in a batch ( timezone, - struct_in_pandas, - self_destruct, + structHandlingMode, + selfDestruct, ) = self.get_configs( "spark.sql.session.timeZone", "spark.sql.execution.pandas.structHandlingMode", @@ -993,7 +994,7 @@ def to_pandas( ) table, schema, metrics, observed_metrics, _ = self._execute_and_fetch( - req, observations, self_destruct == "true" + req, observations, selfDestruct == "true" ) assert table is not None ei = ExecutionInfo(metrics, observed_metrics) @@ -1001,71 +1002,14 @@ def to_pandas( schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True) assert schema is not None and isinstance(schema, StructType) - # Rename columns to avoid duplicated column names during processing - temp_col_names = [f"col_{i}" for i in range(len(schema.names))] - table = table.rename_columns(temp_col_names) - - # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type - # values, but we should use datetime.date to match the behavior with when - # Arrow optimization is disabled. - pandas_options = {"coerce_temporal_nanoseconds": True} - if self_destruct == "true" and table.num_rows > 0: - # Configure PyArrow to use as little memory as possible: - # self_destruct - free columns as they are converted - # split_blocks - create a separate Pandas block for each column - # use_threads - convert one column at a time - pandas_options.update( - { - "self_destruct": True, - "split_blocks": True, - "use_threads": False, - } - ) - - if len(schema.names) > 0: - error_on_duplicated_field_names: bool = False - if struct_in_pandas == "legacy" and any( - _has_type(f.dataType, StructType) for f in schema.fields - ): - error_on_duplicated_field_names = True - struct_in_pandas = "dict" - - # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the - # DataFrame, as it may fail with a segmentation fault. - if table.num_rows == 0: - # For empty tables, create empty Series with converters to preserve dtypes - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(pd.Series([], name=temp_col_names[i], dtype="object")) - for i, field in enumerate(schema.fields) - ], - axis="columns", - ) - else: - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(table.columns, schema.fields) - ], - axis="columns", - ) - # Restore original column names (including duplicates) - pdf.columns = schema.names - else: - # empty columns - pdf = table.to_pandas(**pandas_options) + pdf = _convert_arrow_table_to_pandas( + arrow_table=table, + schema=schema, + timezone=timezone, + struct_handling_mode=structHandlingMode, + date_as_object=False, + self_destruct=selfDestruct == "true", + ) if len(metrics) > 0: pdf.attrs["metrics"] = metrics diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index a4ccf4da6e8a..1d85dc9b12a8 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -20,7 +20,9 @@ Callable, List, Optional, + Sequence, Union, + cast, no_type_check, overload, TYPE_CHECKING, @@ -36,6 +38,7 @@ MapType, TimestampType, StructType, + _has_type, DataType, _create_row, StringType, @@ -53,6 +56,119 @@ from pyspark.sql import DataFrame +def _convert_arrow_table_to_pandas( + arrow_table: "pa.Table", + schema: "StructType", + *, + timezone: Optional[str] = None, + struct_handling_mode: Optional[str] = None, + date_as_object: bool = False, + self_destruct: bool = False, +) -> "PandasDataFrameLike": + """ + Helper function to convert Arrow table columns to a pandas DataFrame. + + This function applies Spark-specific type converters to Arrow columns and concatenates + them into a pandas DataFrame. + + Parameters + ---------- + arrow_table : pyarrow.Table + The Arrow table to convert + schema : StructType + The schema of the DataFrame + timezone : str or None + The timezone to use for timestamp conversions (can be None if not configured) + struct_handling_mode : str or None + How to handle struct types in pandas ("dict", "row", or "legacy", can be None + if not configured). If "legacy", it will be converted to "dict" and error checking + for duplicated field names will be enabled when StructType fields are present. + date_as_object : bool + Whether to convert date values to Python datetime.date objects (default: False) + self_destruct : bool + Whether to enable memory-efficient self-destruct mode for large tables (default: False) + + Returns + ------- + pandas.DataFrame + The converted pandas DataFrame + """ + import pandas as pd + from pyspark.sql.pandas.types import _create_converter_to_pandas + + # Build pandas options + # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type + # values, but we should use datetime.date to match the behavior with when + # Arrow optimization is disabled. + pandas_options = {"coerce_temporal_nanoseconds": True} + if date_as_object: + pandas_options["date_as_object"] = True + + # Handle empty columns case + if len(schema.fields) == 0: + return arrow_table.to_pandas(**pandas_options) + + # Rename columns to avoid duplicated column names during processing + temp_col_names = [f"col_{i}" for i in range(len(schema.names))] + arrow_table = arrow_table.rename_columns(temp_col_names) + + # Configure self-destruct mode for memory efficiency + if self_destruct and arrow_table.num_rows > 0: + # Configure PyArrow to use as little memory as possible: + # self_destruct - free columns as they are converted + # split_blocks - create a separate Pandas block for each column + # use_threads - convert one column at a time + pandas_options.update( + { + "self_destruct": True, + "split_blocks": True, + "use_threads": False, + } + ) + + # Handle legacy struct handling mode + error_on_duplicated_field_names = False + if struct_handling_mode == "legacy" and any( + _has_type(f.dataType, StructType) for f in schema.fields + ): + error_on_duplicated_field_names = True + struct_handling_mode = "dict" + + # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the + # DataFrame, as it may fail with a segmentation fault. + if arrow_table.num_rows == 0: + # For empty tables, create empty Series to preserve dtypes + column_data = ( + pd.Series([], name=temp_col_names[i], dtype="object") for i in range(len(schema.fields)) + ) + else: + # For non-empty tables, convert arrow columns directly + column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in arrow_table.columns) + + # Apply Spark-specific type converters to each column + pdf = pd.concat( + objs=cast( + Sequence[pd.Series], + ( + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_handling_mode, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(series) + for series, field in zip(column_data, schema.fields) + ), + ), + axis="columns", + ) + + # Restore original column names (including duplicates) + pdf.columns = schema.names + + return pdf + + class PandasConversionMixin: """ Mix-in for the conversion from Spark to pandas and PyArrow. Currently, only @@ -128,68 +244,29 @@ def toPandas(self) -> "PandasDataFrameLike": try: import pyarrow as pa - self_destruct = arrowPySparkSelfDestructEnabled == "true" - batches = self._collect_as_arrow(split_batches=self_destruct) + batches = self._collect_as_arrow( + split_batches=arrowPySparkSelfDestructEnabled == "true" + ) - # Rename columns to avoid duplicated column names. - temp_col_names = [f"col_{i}" for i in range(len(self.columns))] if len(batches) > 0: - table = pa.Table.from_batches(batches).rename_columns(temp_col_names) + table = pa.Table.from_batches(batches) else: # empty dataset - table = arrow_schema.empty_table().rename_columns(temp_col_names) + table = arrow_schema.empty_table() # Ensure only the table has a reference to the batches, so that # self_destruct (if enabled) is effective del batches - # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type - # values, but we should use datetime.date to match the behavior with when - # Arrow optimization is disabled. - pandas_options = { - "date_as_object": True, - "coerce_temporal_nanoseconds": True, - } - if self_destruct: - # Configure PyArrow to use as little memory as possible: - # self_destruct - free columns as they are converted - # split_blocks - create a separate Pandas block for each column - # use_threads - convert one column at a time - pandas_options.update( - { - "self_destruct": True, - "split_blocks": True, - "use_threads": False, - } - ) - - if len(self.columns) > 0: - timezone = sessionLocalTimeZone - struct_in_pandas = pandasStructHandlingMode - - error_on_duplicated_field_names = False - if struct_in_pandas == "legacy": - error_on_duplicated_field_names = True - struct_in_pandas = "dict" - - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(table.columns, self.schema.fields) - ], - axis="columns", - ) - else: - # empty columns - pdf = table.to_pandas(**pandas_options) + pdf = _convert_arrow_table_to_pandas( + arrow_table=table, + schema=self.schema, + timezone=sessionLocalTimeZone, + struct_handling_mode=pandasStructHandlingMode, + date_as_object=True, + self_destruct=arrowPySparkSelfDestructEnabled == "true", + ) - pdf.columns = self.columns return pdf except Exception as e: