Skip to content

Conversation

@Yicong-Huang
Copy link
Contributor

@Yicong-Huang Yicong-Huang commented Nov 10, 2025

What changes were proposed in this pull request?

This PR optimizes the to_pandas() method in Spark Connect client to avoid creating an intermediate pandas DataFrame during Arrow to pandas conversion.

Key changes:

  • Convert Arrow columns directly to pandas Series using arrow_col.to_pandas() instead of converting the entire table first with table.to_pandas()
  • Eliminate temporary column renaming (col_0, col_1, etc.) since we no longer create an intermediate DataFrame
  • Apply Spark-specific type converters directly to each Series without going through an intermediate DataFrame

Why are the changes needed?

This optimization brings Spark Connect's to_pandas() implementation in line with the regular Spark DataFrame optimization made in PR #52680 (SPARK-53967).

Benefits:

  1. Reduced memory usage: Eliminates allocation of intermediate DataFrame
  2. Better performance: Fewer data copies, better memory locality
  3. Consistency: Makes Spark Connect code path match the optimized regular Spark DataFrame path

Does this PR introduce any user-facing change?

No. This is a pure performance optimization with no API or behavior changes.

How was this patch tested?

Benchmark setup (for manual testing):

  • 1M rows × 102 columns
  • Mixed types: ~25 complex columns (Date, Timestamp, Struct) + ~77 simple columns (Int, Double, String)
  • Batch size: 5,000 rows per batch
  • Config: Arrow enabled, self-destruct enabled
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
import time

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")  # Small batches: 5k rows (~1.5MB/batch)

# Large dataset: 1M rows with MIXED data types
df = spark.range(1000000).select(
    sf.col("id"),
    (sf.col("id") % 2).alias("key"), 
    sf.col("id").alias("v")
)

# Add various column types to test conversion performance. These types need Spark-specific conversion:
df = df.withColumns({
    "date_col_1": sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % sf.lit(365)).cast("int")),
    "date_col_2": sf.date_add(sf.to_date(sf.lit("2023-01-01")), (sf.col("id") % sf.lit(180)).cast("int")),
    "timestamp_col": sf.current_timestamp(),
    "struct_col_1": sf.struct(sf.col("id").cast("long").alias("a"), (sf.col("id") * sf.lit(2)).cast("long").alias("b")),
    "struct_col_2": sf.struct((sf.col("id") % sf.lit(10)).cast("int").alias("x"), (sf.col("id") / sf.lit(100.0)).alias("y")),
    "array_col": sf.array(sf.lit(1), sf.lit(2), sf.lit(3)),
    "double_col_1": sf.col("id") / sf.lit(3.14),
    "double_col_2": sf.col("id") * sf.lit(1.5) + sf.lit(100),
    "int_col": (sf.col("id") % sf.lit(1000)).cast("int"),
})

# Add more mixed columns - some simple, some complex
for i in range(45):
    if i % 5 == 0:
        df = df.withColumn(f"mixed_{i}", 
            sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % sf.lit(i + 1)).cast("int")))
    elif i % 5 == 1:
        df = df.withColumn(f"mixed_{i}", 
            sf.struct(sf.lit(i).alias("idx"), (sf.col("id") % sf.lit(i + 1)).cast("long").alias("val")))
    elif i % 5 == 2:
        df = df.withColumn(f"mixed_{i}", 
            sf.concat(sf.lit(f"str_{i}_"), (sf.col("id") % sf.lit(100)).cast("string")))
    else:
        df = df.withColumn(f"mixed_{i}", (sf.col("id") * sf.lit(i) + sf.lit(i)) % sf.lit(1000))

# Add some constant strings for variety
for i in range(45):
    df = df.withColumn(f"const_{i}", sf.lit(f"c{i}"))

df = df.drop("id")
df.cache()
df.count()

# Warm up
pdf = df.toPandas()
del pdf

# Benchmark
start = time.perf_counter()
total_rows = 0
total_sum = 0

for i in range(20):
    # Convert to pandas
    pdf = df.toPandas()
    total_rows += len(pdf)
    total_sum += pdf['v'].sum()
    del pdf
    
    if (i + 1) % 5 == 0:
        elapsed = time.perf_counter() - start
        print(f"  {i + 1}/20 completed ({elapsed:.1f}s elapsed, ~{elapsed/(i+1):.2f}s per iteration)")

elapsed = time.perf_counter() - start

Manual benchmarking results: 6.5% improvement with mixed data types (dates, timestamps, structs, arrays, and simple types)

  • Before: 129.3s for 20 iterations (6.46s per iteration)
  • After: 120.9s for 20 iterations (6.04s per iteration)

Was this patch authored or co-authored using generative AI tooling?

Yes. Co-Genreated-by Cursor

@HyukjinKwon HyukjinKwon changed the title [SPARK-54183] remove one intermediate temp data frame [SPARK-54183][PYTHON] Remove one intermediate temp data frame Nov 10, 2025
@Yicong-Huang Yicong-Huang changed the title [SPARK-54183][PYTHON] Remove one intermediate temp data frame [SPARK-54183][SQL][PYTHON][CONNECT] Avoid one intermediate temp data frame during spark connect toPandas() Nov 10, 2025
zhengruifeng
zhengruifeng previously approved these changes Nov 11, 2025
@zhengruifeng zhengruifeng dismissed their stale review November 11, 2025 07:39

test failure

@zhengruifeng
Copy link
Contributor

the test failure seems related @Yicong-Huang

Copy link
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably, we can consolidate this code path with the arrow-one in clasic #52680

@zhengruifeng zhengruifeng changed the title [SPARK-54183][SQL][PYTHON][CONNECT] Avoid one intermediate temp data frame during spark connect toPandas() [SPARK-54183][PYTHON][CONNECT] Avoid one intermediate temp data frame during spark connect toPandas() Nov 12, 2025
@zhengruifeng
Copy link
Contributor

merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants