Skip to content
84 changes: 14 additions & 70 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@
)
from pyspark.sql.connect.observation import Observation
from pyspark.sql.connect.utils import get_python_ver
from pyspark.sql.pandas.types import _create_converter_to_pandas, from_arrow_schema
from pyspark.sql.types import DataType, StructType, _has_type
from pyspark.sql.pandas.types import from_arrow_schema
from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas
from pyspark.sql.types import DataType, StructType
from pyspark.util import PythonEvalType
from pyspark.storagelevel import StorageLevel
from pyspark.errors import PySparkValueError, PySparkAssertionError, PySparkNotImplementedError
Expand Down Expand Up @@ -984,88 +985,31 @@ def to_pandas(
# Get all related configs in a batch
(
timezone,
struct_in_pandas,
self_destruct,
structHandlingMode,
selfDestruct,
) = self.get_configs(
"spark.sql.session.timeZone",
"spark.sql.execution.pandas.structHandlingMode",
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled",
)

table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
req, observations, self_destruct == "true"
req, observations, selfDestruct == "true"
)
assert table is not None
ei = ExecutionInfo(metrics, observed_metrics)

schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
assert schema is not None and isinstance(schema, StructType)

# Rename columns to avoid duplicated column names during processing
temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
table = table.rename_columns(temp_col_names)

# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {"coerce_temporal_nanoseconds": True}
if self_destruct == "true" and table.num_rows > 0:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)

if len(schema.names) > 0:
error_on_duplicated_field_names: bool = False
if struct_in_pandas == "legacy" and any(
_has_type(f.dataType, StructType) for f in schema.fields
):
error_on_duplicated_field_names = True
struct_in_pandas = "dict"

# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
# DataFrame, as it may fail with a segmentation fault.
if table.num_rows == 0:
# For empty tables, create empty Series with converters to preserve dtypes
pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(pd.Series([], name=temp_col_names[i], dtype="object"))
for i, field in enumerate(schema.fields)
],
axis="columns",
)
else:
pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(arrow_col.to_pandas(**pandas_options))
for arrow_col, field in zip(table.columns, schema.fields)
],
axis="columns",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not also unify the # Restore original column names (including duplicates) pdf.columns = schema.names else: # empty columns pdf = table.to_pandas(**pandas_options) logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes after reading more, I moved more common logic into the helper function to unify them. Thanks!

# Restore original column names (including duplicates)
pdf.columns = schema.names
else:
# empty columns
pdf = table.to_pandas(**pandas_options)
pdf = _convert_arrow_table_to_pandas(
arrow_table=table,
schema=schema,
timezone=timezone,
struct_handling_mode=structHandlingMode,
date_as_object=False,
self_destruct=selfDestruct == "true",
)

if len(metrics) > 0:
pdf.attrs["metrics"] = metrics
Expand Down
181 changes: 129 additions & 52 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
Callable,
List,
Optional,
Sequence,
Union,
cast,
no_type_check,
overload,
TYPE_CHECKING,
Expand All @@ -36,6 +38,7 @@
MapType,
TimestampType,
StructType,
_has_type,
DataType,
_create_row,
StringType,
Expand All @@ -53,6 +56,119 @@
from pyspark.sql import DataFrame


def _convert_arrow_table_to_pandas(
arrow_table: "pa.Table",
schema: "StructType",
*,
timezone: Optional[str] = None,
struct_handling_mode: Optional[str] = None,
date_as_object: bool = False,
self_destruct: bool = False,
) -> "PandasDataFrameLike":
"""
Helper function to convert Arrow table columns to a pandas DataFrame.

This function applies Spark-specific type converters to Arrow columns and concatenates
them into a pandas DataFrame.

Parameters
----------
arrow_table : pyarrow.Table
The Arrow table to convert
schema : StructType
The schema of the DataFrame
timezone : str or None
The timezone to use for timestamp conversions (can be None if not configured)
struct_handling_mode : str or None
How to handle struct types in pandas ("dict", "row", or "legacy", can be None
if not configured). If "legacy", it will be converted to "dict" and error checking
for duplicated field names will be enabled when StructType fields are present.
date_as_object : bool
Whether to convert date values to Python datetime.date objects (default: False)
self_destruct : bool
Whether to enable memory-efficient self-destruct mode for large tables (default: False)

Returns
-------
pandas.DataFrame
The converted pandas DataFrame
"""
import pandas as pd
from pyspark.sql.pandas.types import _create_converter_to_pandas

# Build pandas options
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {"coerce_temporal_nanoseconds": True}
if date_as_object:
pandas_options["date_as_object"] = True

# Handle empty columns case
if len(schema.fields) == 0:
return arrow_table.to_pandas(**pandas_options)

# Rename columns to avoid duplicated column names during processing
temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
arrow_table = arrow_table.rename_columns(temp_col_names)

# Configure self-destruct mode for memory efficiency
if self_destruct and arrow_table.num_rows > 0:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)

# Handle legacy struct handling mode
error_on_duplicated_field_names = False
if struct_handling_mode == "legacy" and any(
_has_type(f.dataType, StructType) for f in schema.fields
):
error_on_duplicated_field_names = True
struct_handling_mode = "dict"

# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
# DataFrame, as it may fail with a segmentation fault.
if arrow_table.num_rows == 0:
# For empty tables, create empty Series to preserve dtypes
column_data = (
pd.Series([], name=temp_col_names[i], dtype="object") for i in range(len(schema.fields))
)
else:
# For non-empty tables, convert arrow columns directly
column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in arrow_table.columns)

# Apply Spark-specific type converters to each column
pdf = pd.concat(
objs=cast(
Sequence[pd.Series],
(
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_handling_mode,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(series)
for series, field in zip(column_data, schema.fields)
),
),
axis="columns",
)

# Restore original column names (including duplicates)
pdf.columns = schema.names

return pdf


class PandasConversionMixin:
"""
Mix-in for the conversion from Spark to pandas and PyArrow. Currently, only
Expand Down Expand Up @@ -128,68 +244,29 @@ def toPandas(self) -> "PandasDataFrameLike":
try:
import pyarrow as pa

self_destruct = arrowPySparkSelfDestructEnabled == "true"
batches = self._collect_as_arrow(split_batches=self_destruct)
batches = self._collect_as_arrow(
split_batches=arrowPySparkSelfDestructEnabled == "true"
)

# Rename columns to avoid duplicated column names.
temp_col_names = [f"col_{i}" for i in range(len(self.columns))]
if len(batches) > 0:
table = pa.Table.from_batches(batches).rename_columns(temp_col_names)
table = pa.Table.from_batches(batches)
else:
# empty dataset
table = arrow_schema.empty_table().rename_columns(temp_col_names)
table = arrow_schema.empty_table()

# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches

# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {
"date_as_object": True,
"coerce_temporal_nanoseconds": True,
}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)

if len(self.columns) > 0:
timezone = sessionLocalTimeZone
struct_in_pandas = pandasStructHandlingMode

error_on_duplicated_field_names = False
if struct_in_pandas == "legacy":
error_on_duplicated_field_names = True
struct_in_pandas = "dict"

pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(arrow_col.to_pandas(**pandas_options))
for arrow_col, field in zip(table.columns, self.schema.fields)
],
axis="columns",
)
else:
# empty columns
pdf = table.to_pandas(**pandas_options)
pdf = _convert_arrow_table_to_pandas(
arrow_table=table,
schema=self.schema,
timezone=sessionLocalTimeZone,
struct_handling_mode=pandasStructHandlingMode,
date_as_object=True,
self_destruct=arrowPySparkSelfDestructEnabled == "true",
)

pdf.columns = self.columns
return pdf

except Exception as e:
Expand Down