Skip to content

[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator#55967

Closed
szehon-ho wants to merge 8 commits into
apache:masterfrom
szehon-ho:spark-cache-merge-rows-metrics
Closed

[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator#55967
szehon-ho wants to merge 8 commits into
apache:masterfrom
szehon-ho:spark-cache-merge-rows-metrics

Conversation

@szehon-ho
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho commented May 18, 2026

What changes were proposed in this pull request?

Cache SQLMetric references in MergeRowIterator and update them directly in the hot loop. Previously, each row called longMetric("…"), which performs a metrics(name) map lookup on every increment (up to 2–3 lookups per delete/update row). Metrics are lazy val fields so a partition only resolves metrics it actually increments.

This matches the pattern used elsewhere (e.g. FilterEvaluatorFactory passes a SQLMetric into the partition evaluator). The whole-stage codegen path is unchanged; it already resolves metrics once via metricTerm.

codegenBenchmark in SqlBasedBenchmark now accepts optional warmupTime, minTime, and per-case numIters. MergeRowsExecBenchmark uses 7s warmup and a 7s timed window for all whole-stage on/off cases.

Why are the changes needed?

MergeRowsExec updates multiple MERGE metrics per output row on the interpreted path (doExecute / MergeRowIterator). For delete-heavy workloads with little projection work, repeated map lookups were a noticeable fraction of per-row cost. Production MERGE typically runs with whole-stage codegen enabled, but the interpreted path is still used when codegen is disabled or unsupported.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing MergeRowsExec / MERGE tests (CI).

Local benchmark (MergeRowsExecBenchmark, 20M rows, Apple M4 Max, JDK 21). Both sides used the same benchmark harness (7s warmupTime, 7s minTime, wholestageOffNumIters = 0 / wholestageOnNumIters = 0 via extended codegenBenchmark). Compared MergeRowsExec without the cache (1ad4fa420cd, parent of the cache commit) vs with the cache (this PR), checking out only that file between runs.

SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \
  -Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \
  "sql/Test/runMain org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"

Whole-stage off (interpreted path) — best time (ms):

Case Without cache With cache (this PR) Change
matched update only 5475 5238 −4%
not matched insert only 7612 7337 −4%
matched update + not matched insert 5795 4315 −26%
matched delete 2914 546 −81%
conditional clauses 3872 1251 −68%
matched + not matched + not matched by source 3813 1119 −71%
split update (delete + insert) 1844 1400 −24%

Matched-update-only and insert-only are roughly unchanged on the interpreted path in this run; the largest wins are on delete-heavy and multi-metric cases.

Whole-stage on (codegen) — unchanged within noise (e.g. matched delete best ~13 ms; matched update only ~333–338 ms).

Was this patch authored or co-authored using generative AI tooling?

No.

Resolve SQLMetric references once per partition in MergeRowIterator instead
of calling longMetric on every row. This avoids repeated metrics map lookups
on the interpreted path; the codegen path is unchanged.
@szehon-ho szehon-ho changed the title [SQL] Cache SQL metrics in MergeRowsExec interpreted iterator [SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator May 18, 2026
szehon-ho added 4 commits May 18, 2026 16:47
Document why metrics are cached per partition and use longMetric directly
in MergeRowIterator instead of MergeRowsExec.this.
… cases

Use a dedicated benchmark helper with 15s JIT warm-up and a 15s timed
window for whole-stage on/off cases, and support running matched-update-only
in isolation for local A/B testing.
… MergeRowsExec

Use MergeRowsExec.this.longMetric in MergeRowIterator so metric resolution
clearly refers to the outer operator instance.
Run all benchmark cases from runBenchmarkSuite without main-args filtering.
Comment on lines +523 to +534
private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied")
private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted")
private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted")
private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated")
private val numTargetRowsMatchedUpdated =
MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated")
private val numTargetRowsMatchedDeleted =
MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted")
private val numTargetRowsNotMatchedBySourceUpdated =
MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated")
private val numTargetRowsNotMatchedBySourceDeleted =
MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make all these lazy to not look them up when not needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — metrics are now lazy val so we only resolve (and cache) metrics that are actually incremented in a given partition.


// Resolve metrics once per partition; longMetric(name) does a map lookup on each call.
// See SPARK-56933.
private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I believe MergeRowsExec.this.longMetric() can be replaced with just longMetric()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — using longMetric() directly in MergeRowIterator.

Comment on lines +113 to +114
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f }
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the benchmarking infra, but it seems warmupTime parameter of the Benchmark is controlling this already, making these redundant?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed the redundant pre-run calls. MergeRowsExecBenchmark now only uses the extended codegenBenchmark helper (warmupTime / minTime); the Benchmark class already warms up each case before timed iterations.

* Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window so interpreted
* (whole-stage off) results are more stable when comparing metric caching changes.
*/
private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure about the benchmark numbers in the PR description. It says it's comparing origin/master to this PR, but here we are making changes to how these benchmarks are run. Are we doing apples-to-apples comparison here? Maybe it would be better to compare the following two:

  • This PR
  • This PR without the changes in MergeRowsExec

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, it is actually right. i had to modify the benchmark to increase the warmup, in order to get more consistent numbers. But I did run these two cases on the same new benchmark on this pr

  • spark code of origin/master
  • spark code of this pr (with cached metrics)

That being said, let me see if i need to do the benchmark changes

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description with a new A/B run on the same harness for both sides: MergeRowsExec at 1ad4fa420cd (before the cache) vs this PR (with cache), with only that file swapped between runs. Both used 7s warmup and 7s timed window via extended codegenBenchmark. The earlier table compared against origin/master with mixed harness settings and is replaced.

szehon-ho added 2 commits May 19, 2026 16:32
Resolve each SQLMetric on first use per partition and call longMetric()
directly in MergeRowIterator per review feedback.
…enchmark

Add warmupTime, minTime, and optional numIters to codegenBenchmark. Use
7s warmup and timed window in MergeRowsExecBenchmark without a separate
helper or redundant pre-runs.
Copy link
Copy Markdown
Contributor

@ZiyaZa ZiyaZa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left one comment with potential further improvement.


// For group based merge, copy is inserted if row matches no other case
private def incrementCopyMetric(): Unit = longMetric("numTargetRowsCopied") += 1
private def incrementCopyMetric(): Unit = numTargetRowsCopied += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now wondering if these += 1 can also be optimized be further. Doing += on a SQLMetric is technically doing a function call to SQLMetric.add which does some more work in addition to simply adding the value. Maybe JVM is smart enough to optimize it, I'm not sure. Do you think it's worth exploring?

We could replace the new private lazy val SQLMetric fields above with simple integers initially all set to 0, and increment those only. And at the end of applyInstructions, we can lookup and increment all metrics for which the value is > 0.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting idea. Yea I can play around with it as well. How about a separate pr, so as to checkmark the progress?

Comment on lines +63 to +65
warmupTime: FiniteDuration = 2.seconds,
minTime: FiniteDuration = 2.seconds,
minNumIters: Int = 2,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would order these parameters the same way as in Benchmark constructor to prevent confusion:

Suggested change
warmupTime: FiniteDuration = 2.seconds,
minTime: FiniteDuration = 2.seconds,
minNumIters: Int = 2,
minNumIters: Int = 2,
warmupTime: FiniteDuration = 2.seconds,
minTime: FiniteDuration = 2.seconds,

Copy link
Copy Markdown
Member Author

@szehon-ho szehon-ho May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — reordered codegenBenchmark parameters to match Benchmark (minNumIters, warmupTime, minTime)

Reorder minNumIters, warmupTime, and minTime to match Benchmark constructor
order per review. Clarify scaladoc to reference only codegenBenchmark args.
@cloud-fan
Copy link
Copy Markdown
Contributor

thanks, merging to master/4.x/4.2!

@cloud-fan cloud-fan closed this in f08523b May 21, 2026
cloud-fan pushed a commit that referenced this pull request May 21, 2026
…rator

### What changes were proposed in this pull request?

Cache `SQLMetric` references in `MergeRowIterator` and update them directly in the hot loop. Previously, each row called `longMetric("…")`, which performs a `metrics(name)` map lookup on every increment (up to 2–3 lookups per delete/update row). Metrics are `lazy val` fields so a partition only resolves metrics it actually increments.

This matches the pattern used elsewhere (e.g. `FilterEvaluatorFactory` passes a `SQLMetric` into the partition evaluator). The whole-stage codegen path is unchanged; it already resolves metrics once via `metricTerm`.

`codegenBenchmark` in `SqlBasedBenchmark` now accepts optional `warmupTime`, `minTime`, and per-case `numIters`. `MergeRowsExecBenchmark` uses 7s warmup and a 7s timed window for all whole-stage on/off cases.

### Why are the changes needed?

`MergeRowsExec` updates multiple MERGE metrics per output row on the interpreted path (`doExecute` / `MergeRowIterator`). For delete-heavy workloads with little projection work, repeated map lookups were a noticeable fraction of per-row cost. Production MERGE typically runs with whole-stage codegen enabled, but the interpreted path is still used when codegen is disabled or unsupported.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing `MergeRowsExec` / MERGE tests (CI).

**Local benchmark** (`MergeRowsExecBenchmark`, 20M rows, Apple M4 Max, JDK 21). **Both sides used the same benchmark harness** (7s `warmupTime`, 7s `minTime`, `wholestageOffNumIters = 0` / `wholestageOnNumIters = 0` via extended `codegenBenchmark`). Compared `MergeRowsExec` **without** the cache (`1ad4fa420cd`, parent of the cache commit) vs **with** the cache (this PR), checking out only that file between runs.

```bash
SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \
  -Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \
  "sql/Test/runMain org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"
```

**Whole-stage off (interpreted path)** — best time (ms):

| Case | Without cache | With cache (this PR) | Change |
|------|--------------:|---------------------:|-------:|
| matched update only | 5475 | 5238 | −4% |
| not matched insert only | 7612 | 7337 | −4% |
| matched update + not matched insert | 5795 | 4315 | −26% |
| matched delete | 2914 | 546 | −81% |
| conditional clauses | 3872 | 1251 | −68% |
| matched + not matched + not matched by source | 3813 | 1119 | −71% |
| split update (delete + insert) | 1844 | 1400 | −24% |

Matched-update-only and insert-only are roughly unchanged on the interpreted path in this run; the largest wins are on delete-heavy and multi-metric cases.

**Whole-stage on (codegen)** — unchanged within noise (e.g. matched delete best ~13 ms; matched update only ~333–338 ms).

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #55967 from szehon-ho/spark-cache-merge-rows-metrics.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f08523b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 21, 2026
…rator

### What changes were proposed in this pull request?

Cache `SQLMetric` references in `MergeRowIterator` and update them directly in the hot loop. Previously, each row called `longMetric("…")`, which performs a `metrics(name)` map lookup on every increment (up to 2–3 lookups per delete/update row). Metrics are `lazy val` fields so a partition only resolves metrics it actually increments.

This matches the pattern used elsewhere (e.g. `FilterEvaluatorFactory` passes a `SQLMetric` into the partition evaluator). The whole-stage codegen path is unchanged; it already resolves metrics once via `metricTerm`.

`codegenBenchmark` in `SqlBasedBenchmark` now accepts optional `warmupTime`, `minTime`, and per-case `numIters`. `MergeRowsExecBenchmark` uses 7s warmup and a 7s timed window for all whole-stage on/off cases.

### Why are the changes needed?

`MergeRowsExec` updates multiple MERGE metrics per output row on the interpreted path (`doExecute` / `MergeRowIterator`). For delete-heavy workloads with little projection work, repeated map lookups were a noticeable fraction of per-row cost. Production MERGE typically runs with whole-stage codegen enabled, but the interpreted path is still used when codegen is disabled or unsupported.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing `MergeRowsExec` / MERGE tests (CI).

**Local benchmark** (`MergeRowsExecBenchmark`, 20M rows, Apple M4 Max, JDK 21). **Both sides used the same benchmark harness** (7s `warmupTime`, 7s `minTime`, `wholestageOffNumIters = 0` / `wholestageOnNumIters = 0` via extended `codegenBenchmark`). Compared `MergeRowsExec` **without** the cache (`1ad4fa420cd`, parent of the cache commit) vs **with** the cache (this PR), checking out only that file between runs.

```bash
SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \
  -Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \
  "sql/Test/runMain org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"
```

**Whole-stage off (interpreted path)** — best time (ms):

| Case | Without cache | With cache (this PR) | Change |
|------|--------------:|---------------------:|-------:|
| matched update only | 5475 | 5238 | −4% |
| not matched insert only | 7612 | 7337 | −4% |
| matched update + not matched insert | 5795 | 4315 | −26% |
| matched delete | 2914 | 546 | −81% |
| conditional clauses | 3872 | 1251 | −68% |
| matched + not matched + not matched by source | 3813 | 1119 | −71% |
| split update (delete + insert) | 1844 | 1400 | −24% |

Matched-update-only and insert-only are roughly unchanged on the interpreted path in this run; the largest wins are on delete-heavy and multi-metric cases.

**Whole-stage on (codegen)** — unchanged within noise (e.g. matched delete best ~13 ms; matched update only ~333–338 ms).

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #55967 from szehon-ho/spark-cache-merge-rows-metrics.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f08523b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants