Skip to content

[SPARK-55523][SQL] Fix inputMetrics.recordsRead to report rows instead of CachedBatches#54684

Open
moomindani wants to merge 1 commit intoapache:masterfrom
moomindani:SPARK-55523
Open

[SPARK-55523][SQL] Fix inputMetrics.recordsRead to report rows instead of CachedBatches#54684
moomindani wants to merge 1 commit intoapache:masterfrom
moomindani:SPARK-55523

Conversation

@moomindani
Copy link
Contributor

What changes were proposed in this pull request?

InMemoryTableScanExec.doExecute() iterates over CachedBatch objects backed by RDD.getOrCompute, which increments inputMetrics.recordsRead by 1 per CachedBatch on a cache hit — counting batches instead of actual rows.

This fix registers a TaskCompletionListener inside mapPartitionsInternal that corrects the count once all batches in a partition are consumed:

  • Cache hit: getOrCompute added numBatches; the listener adds (rowCount - numBatches) so the final value equals the actual row count.
  • Cache miss: the source scan already incremented recordsRead by rowCount; the listener adds (rowCount - rowCount) = 0, leaving the value unchanged.

No changes to RDD.scala, so non-SQL RDD caching metrics are unaffected.

Why are the changes needed?

Users and monitoring tools relying on TaskMetrics.inputMetrics.recordsRead (e.g. the Spark UI, external observability systems) see incorrect values when reading from a cached DataFrame. For example, 3,605 rows stored in a single CachedBatch would report recordsRead = 1 instead of 3605.

Does this PR introduce any user-facing change?

Yes — inputMetrics.recordsRead for queries reading from a cached DataFrame now reports the correct row count instead of the batch count.

How was this patch tested?

Added a test in InMemoryColumnarQuerySuite that caches 1000 rows, materializes the cache, then reads from cache and asserts inputMetrics.recordsRead == 1000 via a SparkListener.

Notes

doExecuteColumnar() has the same underlying issue (batch-level counting via getOrCompute) and could be addressed as a follow-up.

…d of CachedBatches

### What changes were proposed in this pull request?

`InMemoryTableScanExec.doExecute()` iterates over `CachedBatch` objects from
`RDD.getOrCompute`, which increments `inputMetrics.recordsRead` by 1 per
`CachedBatch` on a cache hit — counting batches instead of actual rows.

This fix registers a `TaskCompletionListener` inside `mapPartitionsInternal`
that corrects the count once all batches in a partition are consumed:
- On cache hit: `getOrCompute` added `numBatches`; the listener adds
  `(rowCount - numBatches)` so the final value equals the actual row count.
- On cache miss: the source scan already added `rowCount` to `recordsRead`;
  the listener adds `(rowCount - rowCount) = 0`, leaving the value unchanged.

No changes to `RDD.scala`, so non-SQL RDD caching is unaffected.

### Why are the changes needed?

Users and monitoring tools relying on `TaskMetrics.inputMetrics.recordsRead`
(e.g. the Spark UI, external observability systems) see incorrect values when
reading from a cached DataFrame. For example, 3,605 rows stored in a single
`CachedBatch` would report `recordsRead = 1` instead of `3605`.

### Does this PR introduce _any_ user-facing change?

Yes — `inputMetrics.recordsRead` for queries reading from a cached DataFrame
now reports the correct row count instead of the batch count.

### How was this patch tested?

Added a test in `InMemoryColumnarQuerySuite` that caches 1000 rows,
materializes the cache, then reads from cache and asserts
`inputMetrics.recordsRead == 1000` via a `SparkListener`.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant