Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 57 additions & 38 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,52 +983,71 @@ def to_pandas(
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
assert schema is not None and isinstance(schema, StructType)

# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
# DataFrame, as it may fail with a segmentation fault. Instead, we create an empty pandas
# DataFrame manually with the correct schema.
if table.num_rows == 0:
pdf = pd.DataFrame(columns=schema.names, index=range(0))
else:
# Rename columns to avoid duplicated column names.
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])

pandas_options = {"coerce_temporal_nanoseconds": True}
if self_destruct == "true":
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)
pdf = renamed_table.to_pandas(**pandas_options)
pdf.columns = schema.names
# Rename columns to avoid duplicated column names during processing
temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
table = table.rename_columns(temp_col_names)

# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {"coerce_temporal_nanoseconds": True}
if self_destruct == "true" and table.num_rows > 0:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)

if len(pdf.columns) > 0:
if len(schema.names) > 0:
error_on_duplicated_field_names: bool = False
if struct_in_pandas == "legacy" and any(
_has_type(f.dataType, StructType) for f in schema.fields
):
error_on_duplicated_field_names = True
struct_in_pandas = "dict"

pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(pser)
for (_, pser), field, pa_field in zip(pdf.items(), schema.fields, table.schema)
],
axis="columns",
)
# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
# DataFrame, as it may fail with a segmentation fault.
if table.num_rows == 0:
# For empty tables, create empty Series with converters to preserve dtypes
pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(pd.Series([], name=temp_col_names[i], dtype="object"))
for i, field in enumerate(schema.fields)
],
axis="columns",
)
else:
pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(arrow_col.to_pandas(**pandas_options))
for arrow_col, field in zip(table.columns, schema.fields)
],
axis="columns",
)
# Restore original column names (including duplicates)
pdf.columns = schema.names
else:
# empty columns
pdf = table.to_pandas(**pandas_options)

if len(metrics) > 0:
pdf.attrs["metrics"] = metrics
Expand Down