Skip to content

[GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R overhead#11419

Merged
zhztheplayer merged 9 commits intoapache:mainfrom
zhztheplayer:wip-delta-write-stats
Feb 13, 2026
Merged

[GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R overhead#11419
zhztheplayer merged 9 commits intoapache:mainfrom
zhztheplayer:wip-delta-write-stats

Conversation

@zhztheplayer
Copy link
Member

@zhztheplayer zhztheplayer commented Jan 14, 2026

Description

Currently, there is a C2R converter to get all rows from a columnar batch being written to calculate and gather Delta file statistics. This is inefficient given we expected all operations related to write can be offloaded to native.

The patch adds a native job statistics tracker for Delta write to eliminate this C2R overhead. This tracker is backed by an asynchronous barrier-enabled Velox aggregation task (for more information about Velox task barrier, see this), where all the data to write is globally aggregated into statistics rows, each of which is for one single written Parquet file.

Existing tests under directory backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test can cover the change.

Depends on #11405

Related issue: #10215

Also fixes #11514

Performance

The PR is benchmarked by writing TPC-DS SF10 tables in Delta format. Resource is 8 cores + 20 GiB RAM. Typical size of each written file is ~10 MiB.

Vanilla Before After
467.8s 478.4s 290s

Before this PR, Gluten's speedup on Delta write is observed at -2.22%.
After this PR, Gluten's speedup on Delta write is observed at 61.31%.

Detailed benchmark results per table are as follows:

perf

@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Jan 14, 2026
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 07d26c7 to f7fe766 Compare January 14, 2026 16:10
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 8cb9dab to e23699c Compare January 15, 2026 10:11
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

3 similar comments
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer
Copy link
Member Author

Run Gluten Clickhouse CI on x86

@zhztheplayer
Copy link
Member Author

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

2 similar comments
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from bd9fce1 to 12d26ba Compare January 23, 2026 13:53
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 12d26ba to 05a72e8 Compare January 23, 2026 16:35
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 05a72e8 to 1a28083 Compare January 23, 2026 16:36
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 1a28083 to fad9fdd Compare January 23, 2026 16:39
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from fad9fdd to f9fca33 Compare January 27, 2026 10:59
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

3 similar comments
@zhztheplayer
Copy link
Member Author

Run Gluten Clickhouse CI on x86

@zhztheplayer
Copy link
Member Author

Run Gluten Clickhouse CI on x86

@zhztheplayer
Copy link
Member Author

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 4182a6f to 9e0028f Compare January 28, 2026 09:47
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions
Copy link

Run Gluten Clickhouse CI on x86

…R overhead

The patch adds a native job statistics tracker for Delta write to eliminate C2R overhead.

More PR description WIP.
@zhztheplayer zhztheplayer force-pushed the wip-delta-write-stats branch from 3d26b84 to d463e92 Compare February 5, 2026 17:40
@github-actions
Copy link

github-actions bot commented Feb 5, 2026

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer changed the title [GLUTEN-10215][VL] Delta write: Native statistics tracker to avoid C2R overhead [GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R overhead Feb 5, 2026
@github-actions
Copy link

github-actions bot commented Feb 6, 2026

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer marked this pull request as ready for review February 6, 2026 14:01
@FelixYBW
Copy link
Contributor

See if it's useful:

Analysis of PR #11419 - Partition Key Generation Issue

PR Overview

Title: [GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R overhead

Purpose: Adds a native job statistics tracker for Delta write to eliminate Columnar-to-Row (C2R) conversion overhead by using Velox's native aggregation capabilities.

Issue: Wrong Partition Key Generation

Root Cause Analysis

The PR introduces a new native statistics tracker (GlutenDeltaJobStatsNativeTracker) that has an empty implementation for the newPartition method:

override def newPartition(partitionValues: InternalRow): Unit = {}

Location in patch: Line 633

Problem Explanation

Before the PR:

The fallback tracker properly delegates partition information:

override def newPartition(partitionValues: InternalRow): Unit =
  delegate.newPartition(partitionValues)

After the PR:

The native tracker ignores partition values:

override def newPartition(partitionValues: InternalRow): Unit = {}

Why This Causes Wrong Partition Keys

  1. Missing Partition Context: When newPartition() is called with partition values, the native tracker doesn't store or propagate this information to the underlying Delta statistics collection mechanism.

  2. File-to-Partition Mapping Lost: The tracker creates accumulators per file path but doesn't associate them with their partition values:

    override def newFile(filePath: String): Unit = {
      accumulators.getOrElseUpdate(
        filePath,
        new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, statsColExpr)
      )
    }
  3. Statistics Without Partition Info: When statistics are collected, they lack the partition context needed to generate correct partition keys in the Delta log.

Impact

  • Partition columns are not correctly tracked in the statistics
  • Delta Lake metadata may have incorrect or missing partition information
  • Query performance degradation due to incorrect partition pruning
  • Data correctness issues if partition-based operations rely on this metadata

Solution

Option 1: Store and Use Partition Values (Recommended)

private class GlutenDeltaTaskStatsNativeTracker(
    delegate: WriteTaskStatsTracker,
    dataCols: Seq[Attribute],
    statsColExpr: Expression,
    resultThreadRunner: ThreadPoolExecutor)
  extends WriteTaskStatsTracker {
  
  private val accumulators = mutable.Map[String, VeloxTaskStatsAccumulator]()
  private val fileToPartition = mutable.Map[String, InternalRow]()  // ADD THIS
  private val evaluator = NativePlanEvaluator.create(
    BackendsApiManager.getBackendName,
    Map.empty[String, String].asJava)

  override def newPartition(partitionValues: InternalRow): Unit = {
    // Store current partition values for subsequent file operations
    currentPartitionValues = partitionValues  // ADD THIS
  }

  override def newFile(filePath: String): Unit = {
    accumulators.getOrElseUpdate(
      filePath,
      new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, statsColExpr)
    )
    // Associate file with its partition
    if (currentPartitionValues != null) {  // ADD THIS
      fileToPartition(filePath) = currentPartitionValues.copy()
    }
  }

  override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
    // Use fileToPartition mapping when building statistics
    // to ensure correct partition keys
    // ... implementation needs to pass partition info to delegate
  }
}

Option 2: Delegate to Underlying Tracker

If the native tracker doesn't need to handle partitions directly:

override def newPartition(partitionValues: InternalRow): Unit = {
  delegate.newPartition(partitionValues)
}

Option 3: Hybrid Approach

Store partition values AND delegate:

private var currentPartitionValues: InternalRow = _

override def newPartition(partitionValues: InternalRow): Unit = {
  currentPartitionValues = partitionValues
  delegate.newPartition(partitionValues)
}

Testing Recommendations

  1. Add partition-specific tests:

    test("native stats tracker preserves partition values") {
      // Write data with multiple partitions
      // Verify partition keys in Delta log match expected values
    }
  2. Verify statistics correctness:

    test("partition statistics are correctly computed") {
      // Check min/max values per partition
      // Verify row counts per partition
    }
  3. Test partition pruning:

    test("queries with partition filters use correct statistics") {
      // Write partitioned data
      // Query with partition filter
      // Verify only relevant partitions are scanned
    }

Related Code Sections

Comparison with Other Trackers

  1. Fallback Tracker (Line 411-413):

    override def newPartition(partitionValues: InternalRow): Unit =
      delegate.newPartition(partitionValues)

    ✅ Correctly delegates

  2. Row Counting Tracker (Line 491-492):

    override def newPartition(partitionValues: InternalRow): Unit =
      delegate.newPartition(partitionValues)

    ✅ Correctly delegates

  3. Native Tracker (Line 633):

    override def newPartition(partitionValues: InternalRow): Unit = {}

    PROBLEM: Empty implementation

Conclusion

The issue is in the GlutenDeltaJobStatsNativeTracker.GlutenDeltaTaskStatsNativeTracker class where the newPartition() method has an empty implementation. This causes partition values to be lost, resulting in incorrect partition key generation in the Delta Lake metadata.

Fix: Implement proper partition value handling in the native tracker, either by:

  1. Storing and using partition values internally
  2. Delegating to the underlying tracker
  3. Both (recommended for robustness)

The fix should ensure partition values are correctly associated with files and propagated to the final statistics.

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer
Copy link
Member Author

See if it's useful:

Analysis of PR #11419 - Partition Key Generation Issue

PR Overview

Title: [GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R overhead

Purpose: Adds a native job statistics tracker for Delta write to eliminate Columnar-to-Row (C2R) conversion overhead by using Velox's native aggregation capabilities.

Issue: Wrong Partition Key Generation

Root Cause Analysis

The PR introduces a new native statistics tracker (GlutenDeltaJobStatsNativeTracker) that has an empty implementation for the newPartition method:

override def newPartition(partitionValues: InternalRow): Unit = {}

Location in patch: Line 633

Problem Explanation

Before the PR:

The fallback tracker properly delegates partition information:

override def newPartition(partitionValues: InternalRow): Unit =
  delegate.newPartition(partitionValues)

After the PR:

The native tracker ignores partition values:

override def newPartition(partitionValues: InternalRow): Unit = {}

Why This Causes Wrong Partition Keys

  1. Missing Partition Context: When newPartition() is called with partition values, the native tracker doesn't store or propagate this information to the underlying Delta statistics collection mechanism.
  2. File-to-Partition Mapping Lost: The tracker creates accumulators per file path but doesn't associate them with their partition values:
    override def newFile(filePath: String): Unit = {
      accumulators.getOrElseUpdate(
        filePath,
        new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, statsColExpr)
      )
    }
  3. Statistics Without Partition Info: When statistics are collected, they lack the partition context needed to generate correct partition keys in the Delta log.

Impact

  • Partition columns are not correctly tracked in the statistics
  • Delta Lake metadata may have incorrect or missing partition information
  • Query performance degradation due to incorrect partition pruning
  • Data correctness issues if partition-based operations rely on this metadata

Solution

Option 1: Store and Use Partition Values (Recommended)

private class GlutenDeltaTaskStatsNativeTracker(
    delegate: WriteTaskStatsTracker,
    dataCols: Seq[Attribute],
    statsColExpr: Expression,
    resultThreadRunner: ThreadPoolExecutor)
  extends WriteTaskStatsTracker {
  
  private val accumulators = mutable.Map[String, VeloxTaskStatsAccumulator]()
  private val fileToPartition = mutable.Map[String, InternalRow]()  // ADD THIS
  private val evaluator = NativePlanEvaluator.create(
    BackendsApiManager.getBackendName,
    Map.empty[String, String].asJava)

  override def newPartition(partitionValues: InternalRow): Unit = {
    // Store current partition values for subsequent file operations
    currentPartitionValues = partitionValues  // ADD THIS
  }

  override def newFile(filePath: String): Unit = {
    accumulators.getOrElseUpdate(
      filePath,
      new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, statsColExpr)
    )
    // Associate file with its partition
    if (currentPartitionValues != null) {  // ADD THIS
      fileToPartition(filePath) = currentPartitionValues.copy()
    }
  }

  override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
    // Use fileToPartition mapping when building statistics
    // to ensure correct partition keys
    // ... implementation needs to pass partition info to delegate
  }
}

Option 2: Delegate to Underlying Tracker

If the native tracker doesn't need to handle partitions directly:

override def newPartition(partitionValues: InternalRow): Unit = {
  delegate.newPartition(partitionValues)
}

Option 3: Hybrid Approach

Store partition values AND delegate:

private var currentPartitionValues: InternalRow = _

override def newPartition(partitionValues: InternalRow): Unit = {
  currentPartitionValues = partitionValues
  delegate.newPartition(partitionValues)
}

Testing Recommendations

  1. Add partition-specific tests:
    test("native stats tracker preserves partition values") {
      // Write data with multiple partitions
      // Verify partition keys in Delta log match expected values
    }
  2. Verify statistics correctness:
    test("partition statistics are correctly computed") {
      // Check min/max values per partition
      // Verify row counts per partition
    }
  3. Test partition pruning:
    test("queries with partition filters use correct statistics") {
      // Write partitioned data
      // Query with partition filter
      // Verify only relevant partitions are scanned
    }

Related Code Sections

Comparison with Other Trackers

  1. Fallback Tracker (Line 411-413):

    override def newPartition(partitionValues: InternalRow): Unit =
      delegate.newPartition(partitionValues)

    ✅ Correctly delegates

  2. Row Counting Tracker (Line 491-492):

    override def newPartition(partitionValues: InternalRow): Unit =
      delegate.newPartition(partitionValues)

    ✅ Correctly delegates

  3. Native Tracker (Line 633):

    override def newPartition(partitionValues: InternalRow): Unit = {}

    PROBLEM: Empty implementation

Conclusion

The issue is in the GlutenDeltaJobStatsNativeTracker.GlutenDeltaTaskStatsNativeTracker class where the newPartition() method has an empty implementation. This causes partition values to be lost, resulting in incorrect partition key generation in the Delta Lake metadata.

Fix: Implement proper partition value handling in the native tracker, either by:

  1. Storing and using partition values internally
  2. Delegating to the underlying tracker
  3. Both (recommended for robustness)

The fix should ensure partition values are correctly associated with files and propagated to the final statistics.

@FelixYBW Just saw this. This was actually not the root cause, override def newPartition(partitionValues: InternalRow): Unit = { } was intentionally left empty because the vanilla Delta stats writer also does nothing in this method.

The problem was with VeloxBlockStripes and I've already pushed the fix. Now everything should be fine.

@FelixYBW
Copy link
Contributor

Confirmed, perf gain is close to iceberg now:
image

@zhztheplayer zhztheplayer merged commit d45c8f9 into apache:main Feb 13, 2026
61 of 62 checks passed
ReemaAlzaid pushed a commit to ReemaAlzaid/incubator-gluten that referenced this pull request Feb 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[VL] Add task barrier support for NativePlanEvaluator to make Velox task reusable

2 participants