From cd50afdfb8be0e025b6221a698651a947faa898e Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 5 Nov 2025 09:30:44 -0800 Subject: [PATCH 1/7] refactor: remove intermedia data frame --- python/pyspark/sql/pandas/conversion.py | 56 ++++++++++++++----------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index a4ccf4da6e8a..44a1e6bbf094 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -208,35 +208,43 @@ def toPandas(self) -> "PandasDataFrameLike": # Below is toPandas without Arrow optimization. rows = self.collect() - if len(rows) > 0: - pdf = pd.DataFrame.from_records( - rows, index=range(len(rows)), columns=self.columns # type: ignore[arg-type] - ) - else: - pdf = pd.DataFrame(columns=self.columns) - if len(pdf.columns) > 0: + if len(self.columns) > 0: timezone = sessionLocalTimeZone struct_in_pandas = pandasStructHandlingMode - return pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=( - "row" if struct_in_pandas == "legacy" else struct_in_pandas - ), - error_on_duplicated_field_names=False, - timestamp_utc_localized=False, - )(pser) - for (_, pser), field in zip(pdf.items(), self.schema.fields) - ], - axis="columns", - ) - else: + # Avoid intermediate pandas DataFrame creation by directly converting columns + if len(rows) > 0: + # Extract columns from rows + columns_data = list(zip(*rows)) + pdf = pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=( + "row" if struct_in_pandas == "legacy" else struct_in_pandas + ), + error_on_duplicated_field_names=False, + timestamp_utc_localized=False, + )(pd.Series(col_data)) + for col_data, field in zip(columns_data, self.schema.fields) + ], + axis="columns", + ) + else: + # Empty dataset + pdf = pd.DataFrame( + { + col_name: pd.Series(dtype=object) + for col_name in self.columns + } + ) + pdf.columns = self.columns return pdf + else: + return pd.DataFrame() def toArrow(self) -> "pa.Table": from pyspark.sql.dataframe import DataFrame From 26ad2cc075dd2dd6efc2e3de17693cfb45173662 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 5 Nov 2025 09:32:55 -0800 Subject: [PATCH 2/7] fix: format --- python/pyspark/sql/pandas/conversion.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 44a1e6bbf094..3590cc204f7b 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -235,12 +235,7 @@ def toPandas(self) -> "PandasDataFrameLike": ) else: # Empty dataset - pdf = pd.DataFrame( - { - col_name: pd.Series(dtype=object) - for col_name in self.columns - } - ) + pdf = pd.DataFrame({col_name: pd.Series(dtype=object) for col_name in self.columns}) pdf.columns = self.columns return pdf else: From dd32ab3f2e8312acbbd6d10291df02e67f34f26f Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 6 Nov 2025 13:54:51 -0800 Subject: [PATCH 3/7] fix: empty df code path --- python/pyspark/sql/pandas/conversion.py | 41 +++++++++++++------------ 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 3590cc204f7b..57d200aada5b 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -217,29 +217,32 @@ def toPandas(self) -> "PandasDataFrameLike": if len(rows) > 0: # Extract columns from rows columns_data = list(zip(*rows)) - pdf = pd.concat( - [ - _create_converter_to_pandas( - field.dataType, - field.nullable, - timezone=timezone, - struct_in_pandas=( - "row" if struct_in_pandas == "legacy" else struct_in_pandas - ), - error_on_duplicated_field_names=False, - timestamp_utc_localized=False, - )(pd.Series(col_data)) - for col_data, field in zip(columns_data, self.schema.fields) - ], - axis="columns", - ) + series_list = [pd.Series(col_data) for col_data in columns_data] else: - # Empty dataset - pdf = pd.DataFrame({col_name: pd.Series(dtype=object) for col_name in self.columns}) + # Empty rows - create empty DataFrame and extract empty Series + pdf_temp = pd.DataFrame(columns=self.columns) + series_list = [pser for _, pser in pdf_temp.items()] + + pdf = pd.concat( + [ + _create_converter_to_pandas( + field.dataType, + field.nullable, + timezone=timezone, + struct_in_pandas=( + "row" if struct_in_pandas == "legacy" else struct_in_pandas + ), + error_on_duplicated_field_names=False, + timestamp_utc_localized=False, + )(series) + for series, field in zip(series_list, self.schema.fields) + ], + axis="columns", + ) pdf.columns = self.columns return pdf else: - return pd.DataFrame() + return pd.DataFrame(columns=[], index=range(len(rows))) def toArrow(self) -> "pa.Table": from pyspark.sql.dataframe import DataFrame From 55c2634f64462e94d08ce0396231515930c4f2b7 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:41:58 -0800 Subject: [PATCH 4/7] refactor: simplify arrow to pandas conversion --- python/pyspark/sql/pandas/conversion.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 57d200aada5b..4aa9259414b1 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -213,16 +213,14 @@ def toPandas(self) -> "PandasDataFrameLike": timezone = sessionLocalTimeZone struct_in_pandas = pandasStructHandlingMode - # Avoid intermediate pandas DataFrame creation by directly converting columns + # Extract columns from rows and apply converters if len(rows) > 0: - # Extract columns from rows - columns_data = list(zip(*rows)) - series_list = [pd.Series(col_data) for col_data in columns_data] + # Convert to list of lists (faster than tuples for Series construction) + columns_data = [list(col) for col in zip(*rows)] else: - # Empty rows - create empty DataFrame and extract empty Series - pdf_temp = pd.DataFrame(columns=self.columns) - series_list = [pser for _, pser in pdf_temp.items()] + columns_data = [[] for _ in self.schema.fields] + # Use concat with converted Series (pandas concat is highly optimized) pdf = pd.concat( [ _create_converter_to_pandas( @@ -234,12 +232,12 @@ def toPandas(self) -> "PandasDataFrameLike": ), error_on_duplicated_field_names=False, timestamp_utc_localized=False, - )(series) - for series, field in zip(series_list, self.schema.fields) + )(pd.Series(col_data, dtype=object)) + for col_data, field in zip(columns_data, self.schema.fields) ], - axis="columns", + axis=1, + keys=self.columns, ) - pdf.columns = self.columns return pdf else: return pd.DataFrame(columns=[], index=range(len(rows))) From dc4339216b0b596e60f9083eae308fc02b5a0814 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:54:12 -0800 Subject: [PATCH 5/7] chore: clean up --- python/pyspark/sql/pandas/conversion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 4aa9259414b1..e7c89c315e87 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -220,7 +220,7 @@ def toPandas(self) -> "PandasDataFrameLike": else: columns_data = [[] for _ in self.schema.fields] - # Use concat with converted Series (pandas concat is highly optimized) + # Build DataFrame from columns pdf = pd.concat( [ _create_converter_to_pandas( From 5d7a57827b10e53f432238f285fd902fbb53f164 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 11 Nov 2025 14:20:11 -0800 Subject: [PATCH 6/7] feat: use iterator to replace list --- python/pyspark/sql/pandas/conversion.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index e7c89c315e87..fa0f0aafd3e1 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -215,10 +215,10 @@ def toPandas(self) -> "PandasDataFrameLike": # Extract columns from rows and apply converters if len(rows) > 0: - # Convert to list of lists (faster than tuples for Series construction) - columns_data = [list(col) for col in zip(*rows)] + # Use iterator to avoid materializing intermediate data structure + columns_data = zip(*rows) else: - columns_data = [[] for _ in self.schema.fields] + columns_data = ([] for _ in self.schema.fields) # Build DataFrame from columns pdf = pd.concat( From 6a7b7a38ff68175a70719ade10cc196d39f4542d Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 11 Nov 2025 17:11:07 -0800 Subject: [PATCH 7/7] fix: mypy issue --- python/pyspark/sql/pandas/conversion.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index fa0f0aafd3e1..e90f1c305002 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -18,6 +18,7 @@ from typing import ( Any, Callable, + Iterator, List, Optional, Union, @@ -216,9 +217,9 @@ def toPandas(self) -> "PandasDataFrameLike": # Extract columns from rows and apply converters if len(rows) > 0: # Use iterator to avoid materializing intermediate data structure - columns_data = zip(*rows) + columns_data: Iterator[Any] = iter(zip(*rows)) else: - columns_data = ([] for _ in self.schema.fields) + columns_data = iter([] for _ in self.schema.fields) # Build DataFrame from columns pdf = pd.concat(