From 4e48729a97622e90522cb9d4dab54e23fdff3b70 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 7 Nov 2025 16:44:56 -0800 Subject: [PATCH 1/6] refactor: remove one intermediate temp data frame --- python/pyspark/sql/connect/client/core.py | 52 +++++++++++------------ 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 6f92af394913..1ae856d6cafb 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -989,9 +989,6 @@ def to_pandas( if table.num_rows == 0: pdf = pd.DataFrame(columns=schema.names, index=range(0)) else: - # Rename columns to avoid duplicated column names. - renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)]) - pandas_options = {"coerce_temporal_nanoseconds": True} if self_destruct == "true": # Configure PyArrow to use as little memory as possible: @@ -1005,30 +1002,33 @@ def to_pandas( "use_threads": False, } ) - pdf = renamed_table.to_pandas(**pandas_options) - pdf.columns = schema.names - if len(pdf.columns) > 0: - error_on_duplicated_field_names: bool = False - if struct_in_pandas == "legacy" and any( - _has_type(f.dataType, StructType) for f in schema.fields - ): - error_on_duplicated_field_names = True - struct_in_pandas = "dict" - - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(pser) - for (_, pser), field, pa_field in zip(pdf.items(), schema.fields, table.schema) - ], - axis="columns", - ) + if len(schema.names) > 0: + error_on_duplicated_field_names: bool = False + if struct_in_pandas == "legacy" and any( + _has_type(f.dataType, StructType) for f in schema.fields + ): + error_on_duplicated_field_names = True + struct_in_pandas = "dict" + + pdf = pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(arrow_col.to_pandas(**pandas_options)) + for arrow_col, field in zip(table.columns, schema.fields) + ], + axis="columns", + ) + else: + # empty columns + pdf = table.to_pandas(**pandas_options) + + pdf.columns = schema.names if len(metrics) > 0: pdf.attrs["metrics"] = metrics From d6ebd750095f5d3298e9c92ba56734b37b6dd25f Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 10 Nov 2025 17:26:31 -0800 Subject: [PATCH 2/6] fix: handle empty table --- python/pyspark/sql/connect/client/core.py | 79 +++++++++++------------ 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 1ae856d6cafb..b58e7507acc7 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -986,49 +986,46 @@ def to_pandas( # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the # DataFrame, as it may fail with a segmentation fault. Instead, we create an empty pandas # DataFrame manually with the correct schema. - if table.num_rows == 0: - pdf = pd.DataFrame(columns=schema.names, index=range(0)) - else: - pandas_options = {"coerce_temporal_nanoseconds": True} - if self_destruct == "true": - # Configure PyArrow to use as little memory as possible: - # self_destruct - free columns as they are converted - # split_blocks - create a separate Pandas block for each column - # use_threads - convert one column at a time - pandas_options.update( - { - "self_destruct": True, - "split_blocks": True, - "use_threads": False, - } - ) + pandas_options = {"coerce_temporal_nanoseconds": True} + if self_destruct == "true" and table.num_rows > 0: + # Configure PyArrow to use as little memory as possible: + # self_destruct - free columns as they are converted + # split_blocks - create a separate Pandas block for each column + # use_threads - convert one column at a time + pandas_options.update( + { + "self_destruct": True, + "split_blocks": True, + "use_threads": False, + } + ) - if len(schema.names) > 0: - error_on_duplicated_field_names: bool = False - if struct_in_pandas == "legacy" and any( - _has_type(f.dataType, StructType) for f in schema.fields - ): - error_on_duplicated_field_names = True - struct_in_pandas = "dict" - - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(table.columns, schema.fields) - ], - axis="columns", - ) - else: - # empty columns - pdf = table.to_pandas(**pandas_options) + if len(schema.names) > 0: + error_on_duplicated_field_names: bool = False + if struct_in_pandas == "legacy" and any( + _has_type(f.dataType, StructType) for f in schema.fields + ): + error_on_duplicated_field_names = True + struct_in_pandas = "dict" + + pdf = pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(arrow_col.to_pandas(**pandas_options)) + for arrow_col, field in zip(table.columns, schema.fields) + ], + axis="columns", + ) + else: + # empty columns + pdf = table.to_pandas(**pandas_options) - pdf.columns = schema.names + pdf.columns = schema.names if len(metrics) > 0: pdf.attrs["metrics"] = metrics From ef55e10d3d9741a87850ab0b01307e64c9dca01b Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 10 Nov 2025 23:45:34 -0800 Subject: [PATCH 3/6] fix: test --- python/pyspark/sql/connect/client/core.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index b58e7507acc7..9d1584eaedce 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1008,9 +1008,9 @@ def to_pandas( error_on_duplicated_field_names = True struct_in_pandas = "dict" - pdf = pd.concat( - [ - _create_converter_to_pandas( + pdf = pd.DataFrame( + { + field.name: _create_converter_to_pandas( field.dataType, field.nullable, timezone=timezone, @@ -1018,15 +1018,12 @@ def to_pandas( error_on_duplicated_field_names=error_on_duplicated_field_names, )(arrow_col.to_pandas(**pandas_options)) for arrow_col, field in zip(table.columns, schema.fields) - ], - axis="columns", + } ) else: # empty columns pdf = table.to_pandas(**pandas_options) - pdf.columns = schema.names - if len(metrics) > 0: pdf.attrs["metrics"] = metrics if len(observed_metrics) > 0: From 9efd516035d400140953d8e59a6a44964c60862c Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 11 Nov 2025 17:25:47 -0800 Subject: [PATCH 4/6] fix: handle duplicate names --- python/pyspark/sql/connect/client/core.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 9d1584eaedce..2e6d7389e1f5 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1008,9 +1008,9 @@ def to_pandas( error_on_duplicated_field_names = True struct_in_pandas = "dict" - pdf = pd.DataFrame( - { - field.name: _create_converter_to_pandas( + pdf = pd.concat( + [ + _create_converter_to_pandas( field.dataType, field.nullable, timezone=timezone, @@ -1018,7 +1018,8 @@ def to_pandas( error_on_duplicated_field_names=error_on_duplicated_field_names, )(arrow_col.to_pandas(**pandas_options)) for arrow_col, field in zip(table.columns, schema.fields) - } + ], + axis="columns", ) else: # empty columns From bd77bc27add79fd593dfe6faea37b9d967200bf9 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 11 Nov 2025 19:22:27 -0800 Subject: [PATCH 5/6] fix: handle original names --- python/pyspark/sql/connect/client/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 2e6d7389e1f5..05630701e2a4 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1021,6 +1021,7 @@ def to_pandas( ], axis="columns", ) + pdf.columns = schema.names else: # empty columns pdf = table.to_pandas(**pandas_options) From cb5b0ccba29de70516f3fff8057b47ef0f6579c0 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 11 Nov 2025 19:40:54 -0800 Subject: [PATCH 6/6] fix: handle empty table --- python/pyspark/sql/connect/client/core.py | 55 ++++++++++++++++------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 05630701e2a4..40a271119f1c 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -983,9 +983,13 @@ def to_pandas( schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True) assert schema is not None and isinstance(schema, StructType) - # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the - # DataFrame, as it may fail with a segmentation fault. Instead, we create an empty pandas - # DataFrame manually with the correct schema. + # Rename columns to avoid duplicated column names during processing + temp_col_names = [f"col_{i}" for i in range(len(schema.names))] + table = table.rename_columns(temp_col_names) + + # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type + # values, but we should use datetime.date to match the behavior with when + # Arrow optimization is disabled. pandas_options = {"coerce_temporal_nanoseconds": True} if self_destruct == "true" and table.num_rows > 0: # Configure PyArrow to use as little memory as possible: @@ -1008,19 +1012,38 @@ def to_pandas( error_on_duplicated_field_names = True struct_in_pandas = "dict" - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=error_on_duplicated_field_names, - )(arrow_col.to_pandas(**pandas_options)) - for arrow_col, field in zip(table.columns, schema.fields) - ], - axis="columns", - ) + # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the + # DataFrame, as it may fail with a segmentation fault. + if table.num_rows == 0: + # For empty tables, create empty Series with converters to preserve dtypes + pdf = pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(pd.Series([], name=temp_col_names[i], dtype="object")) + for i, field in enumerate(schema.fields) + ], + axis="columns", + ) + else: + pdf = pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=error_on_duplicated_field_names, + )(arrow_col.to_pandas(**pandas_options)) + for arrow_col, field in zip(table.columns, schema.fields) + ], + axis="columns", + ) + # Restore original column names (including duplicates) pdf.columns = schema.names else: # empty columns