[fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming#62816
Conversation
…xn commit
For single-table S3 streaming insert jobs running with
enable_insert_strict=false + insert_max_filter_ratio>0, BE correctly
filters bad rows but jobStatistic.filteredRows stays at 0 because the
txn commit path had no channel for filteredRows end-to-end:
- StreamingTaskCommitAttachmentPB has no filtered_rows field.
- LoadStatistic has no filteredRows field; the value only lives as a
local int in AbstractInsertExecutor for strict/ratio checks.
- beforeCommitted() builds the attachment from loadStatistic.get*() so
there is nothing to read.
- updateJobStatisticAndOffset() / updateCloudJobStatisticAndOffset()
accumulate every other stat but skip filteredRows.
The multi-table CDC non-txn path via CommitOffsetRequest already does
the symmetric accumulate into nonTxnJobStatistic correctly; this PR
wires the same channel for the single-table txn path:
1. cloud.proto: add optional int64 filtered_rows = 7 to
StreamingTaskCommitAttachmentPB.
2. StreamingTaskTxnCommitAttachment: add filteredRows field
(@SerializedName("fr")), extend full-args constructor, read from PB
in PB constructor, include in toString().
3. TxnUtil.streamingTaskTxnCommitAttachmentToPb: populate the new PB
field.
4. LoadStatistic: add filteredRows field + getter/setter; expose it in
toJson() so the loadStatistic column surfaces it too.
5. AbstractInsertExecutor.execImpl: after reading filteredRows from
coordinator.getLoadCounters(), persist it into
insertLoadJob.loadStatistic (symmetric to how BrokerLoadJob pushes
DPP_ABNORMAL_ALL into its own job state).
6. StreamingInsertJob.beforeCommitted: pass loadStatistic.getFilteredRows()
into the new attachment constructor arg.
7. StreamingInsertJob.updateJobStatisticAndOffset: accumulate
jobStatistic.setFilteredRows(old + attachment.getFilteredRows()).
8. StreamingInsertJob.updateCloudJobStatisticAndOffset: overwrite
jobStatistic.setFilteredRows(attachment.getFilteredRows()) to match
the existing latest-snapshot semantics of that method.
After this fix, filteredRows is correct for live accumulation, FE
EditLog replay (replayOnCommitted) and cloud MS replay
(replayOnCloudMode), all three paths reading the same PB.
Added regression test test_streaming_insert_job_filtered_rows which
loads example_[0-1].csv into a table with c2 INT NOT NULL (non-parseable
names force every row to be filtered, mirroring the pattern in
test_streaming_mysql_job_errormsg), and asserts
scannedRows=20, filteredRows=20, fileNumber=2 and an empty table.
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Blocking issue:
- Cloud replay/reset still loses
filteredRows.replayOnCloudMode()now readsfilteredRowsfromGetStreamingTaskCommitAttachResponse, but the meta-service code that stores streaming-job progress only accumulates/preservesscanned_rows,load_bytes,num_files, andfile_bytesincloud/src/meta-service/meta_service_txn.cpp. That means the live cloud path can look correct because the commit response echoes the request attachment, while FE restart,replayOnCloudMode(), and offset reset in cloud mode still reloadfilteredRowsas0.
Critical checkpoint conclusions:
- Goal: Partially met. The shared-nothing txn path is fixed, but cloud replay is still incorrect.
- Scope: Small and focused.
- Concurrency: No new locking or thread-safety issue found; the FE changes follow existing callback and write-lock patterns.
- Lifecycle/Persistence: Not fully correct in cloud mode because the new field is not persisted in the cloud streaming-job snapshot.
- Parallel paths: The non-txn CDC path already handled
filteredRows; cloud txn replay remains inconsistent with the other replay paths. - Compatibility: The proto addition is backward-compatible.
- Tests: Added regression coverage for the local txn path, but there is no cloud/meta-service coverage for the new field.
- Observability: FE job/load statistics improve on the direct path, but cloud replay still reports the wrong value.
- Data writes/transactions: No data-visibility or transaction-atomicity regression found beyond incorrect persisted job statistics.
- User focus: No additional user-provided focus points.
I did not find another distinct blocking issue beyond the cloud replay hole.
…aming-job snapshot The previous commit threaded filteredRows from BE through the FE txn commit path, but cloud replay still reloaded 0 because the meta-service streaming-job snapshot did not store filtered_rows: - update_streaming_job_meta() accumulated/initialized only scanned_rows / load_bytes / num_files / file_bytes. - reset_streaming_job_offset() preserved only the same four fields when rewriting the snapshot with a new offset. So in cloud mode, while the live commit response echoed the request attachment correctly, FE restart, replayOnCloudMode() and offset reset all reloaded filtered_rows as 0. Mirror the four-stat handling for the new field: 1. update_streaming_job_meta(): accumulate filtered_rows in the prev-existed branch and initialize it in the first-write branch, matching scanned_rows. 2. reset_streaming_job_offset(): preserve prev_job_info.filtered_rows() alongside the other accumulated stats. Tests: - UpdateStreamingJobMetaFilteredRowsAccumulateTest: two commits then a get, asserts filtered_rows accumulates (30 + 20 = 50) the same way scanned_rows does, then resets offset and verifies filtered_rows is preserved. - GetStreamingTaskCommitAttachTest / ResetStreamingJobOffsetTest: populate filtered_rows on the seed commit and assert it round-trips through get and survives reset_streaming_job_offset. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…on comments - Remove UpdateStreamingJobMetaFilteredRowsAccumulateTest. The filtered_rows round-trip through update / get / reset is already covered by the EXPECT_EQ filtered_rows assertions added to GetStreamingTaskCommitAttachTest and ResetStreamingJobOffsetTest; the new test was duplicating that coverage with extra non-filtered fields whose accumulation is not what this PR changes. - Trim test_streaming_insert_job_filtered_rows.groovy comments: drop the issue number and cross-test reference, keep only the two invariants the reader needs to understand the schema and ratio choice. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/review |
|
run buildall |
There was a problem hiding this comment.
Findings
- High:
AbstractInsertExecutorstill parsesDPP_ABNORMAL_ALLwithInteger.parseIntinto anint(fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java:216).Coordinator/LoadContextaccumulate that counter as along, and this PR now carries it throughlong/int64fields. Once a load filters more thanInteger.MAX_VALUErows, FE will fail before the newloadStatistic/ txn-attachment path is built, so the end-to-endfilteredRowspropagation is still incorrect for large jobs. - Medium:
StreamingInsertJobstill only updatesCOUNTER_STREAMING_JOB_FILTER_ROWSin the non-txn CDC path. After this PR, single-table txn jobs will show non-zerojobStatistic.filteredRows, but the FE metrics endpoint still reportsstreaming_job_filter_rows=0for that same path, and cloud replay never resets the counter to the cumulative value either.
Critical Checkpoints
- Goal / correctness: The PR does wire
filteredRowsthrough the single-table txn attachment, FE replay, and cloud meta-service snapshot/reset paths, and the previously raised cloud replay issue appears fixed in the current diff. However, the large-counter overflow above means the goal is still not fully met end-to-end. - Scope / minimality: The change is small and focused on the filtered-row bookkeeping path.
- Concurrency: No new concurrency model or lock-order risk was introduced in the reviewed paths; FE changes stay under the existing job lock / callback flow and cloud changes stay inside the existing meta-service txn update path.
- Lifecycle / persistence: The new optional proto field is wire-compatible, and the cloud snapshot write/read/reset logic now includes
filtered_rows. I did not find a new static lifecycle issue. - Config: No new configuration items are added.
- Compatibility: Adding optional
filtered_rows = 7is compatible with readers that ignore unknown protobuf fields. - Parallel paths: The multi-table non-txn path already handled filtered rows, and the cloud snapshot path is now updated too. The remaining parity gaps are the int-limited source parsing in the single-table executor and the missing metric update in the txn path.
- Special conditional checks: No new special conditional branch looks incorrect.
- Test coverage: The new S3 regression test plus the meta-service get/reset assertions improve coverage for the intended fix. I did not find coverage for very large filtered-row counters or the metric parity issue.
- Test result files: No
.outresult files were changed. - Observability:
jobs("type"="insert")becomes more accurate, but/metricsis still inconsistent for single-table txn jobs. - Transaction / persistence: Attachment serialization/deserialization and the cloud snapshot round-trip now look aligned in the touched code. No additional failover/persistence gap was found beyond the issues above.
- Data writes / atomicity: This PR changes bookkeeping rather than row visibility; I did not find a new atomicity regression in the reviewed paths.
- FE/BE variable passing: The new
filtered_rowsfield is threaded through the touched FE/cloud paths for single-table streaming commits. - Performance: No meaningful performance regression stood out in the new code.
User focus: no additional user-provided focus points.
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
|
run nonConcurrent |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
Single-table txn commit and cloud replay accumulated jobStatistic.filteredRows
but never touched COUNTER_STREAMING_JOB_FILTER_ROWS, so /metrics still reported
0 while jobs("type"="insert") showed the right value. Mirror the TOTAL_ROWS /
LOAD_BYTES pattern in both paths so the two observation points stay aligned.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
…eaming (#62816) Fix `filteredRows` always reported as 0 in `jobs("type"="insert")` for single-table S3 streaming insert jobs under `enable_insert_strict=false` + `insert_max_filter_ratio>0`. The filter count is now propagated from BE through the txn commit attachment into job statistics, and survives FE EditLog replay and cloud meta-service round-trip. Added regression test `test_streaming_insert_job_filtered_rows`.
What problem does this PR solve?
Problem Summary:
For single-table S3 streaming insert jobs running under
session.enable_insert_strict=false+session.insert_max_filter_ratio>0, BE correctly filters bad rows butjobStatistic.filteredRowsstays at0in thejobs("type"="insert")view. The issue reproduces in the live path (before any restart) and also after FE EditLog replay, because the whole commit chain never carriedfilteredRowsend-to-end for the txn path.Root cause — the single-table txn commit pipeline has no
filteredRowschannel:StreamingTaskCommitAttachmentPBhas nofiltered_rowsfield (onlyscanned_rows / load_bytes / num_files / file_bytes).LoadStatistichas nofilteredRowsfield; the value only lives as a localintinAbstractInsertExecutorfor strict-mode /insert_max_filter_ratiochecks and is never pushed anywhere persistent.StreamingInsertJob.beforeCommitted()builds the attachment fromloadStatistic.get*()— so there is nothing to read even if the attachment class had a field.updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment, boolean)andupdateCloudJobStatisticAndOffset()accumulate every other stat but skipfilteredRows.The multi-table CDC non-txn path (
CommitOffsetRequest→updateNoTxnJobStatisticAndOffset()) already accumulatesfilteredRowsintononTxnJobStatisticcorrectly; the single-table txn path needed the same wiring.Fix
Thread
filteredRowsalong the same channel scanned/loadBytes/fileNumber/fileSize already use:cloud.proto: addoptional int64 filtered_rows = 7toStreamingTaskCommitAttachmentPB.StreamingTaskTxnCommitAttachment: addfilteredRowsfield (@SerializedName("fr")), extend full-args constructor, read from PB in the PB constructor, include intoString().TxnUtil.streamingTaskTxnCommitAttachmentToPb: populate the new PB field.LoadStatistic: addfilteredRowsfield + getter/setter; surface it intoJson()so theloadStatisticcolumn ofjobs("type"="insert")shows it too.AbstractInsertExecutor.execImpl: after readingfilteredRowsfromcoordinator.getLoadCounters(), persist it intoinsertLoadJob.getLoadStatistic().setFilteredRows(...)(symmetric to howBrokerLoadJobpushesDPP_ABNORMAL_ALLinto its own job state).StreamingInsertJob.beforeCommitted: passloadStatistic.getFilteredRows()into the new attachment constructor arg.StreamingInsertJob.updateJobStatisticAndOffset(attachment, isReplay)— live + FE EditLog replay accumulate:jobStatistic.setFilteredRows(old + attachment.getFilteredRows()).StreamingInsertJob.updateCloudJobStatisticAndOffset— cloud MS replay overwrite:jobStatistic.setFilteredRows(attachment.getFilteredRows()), matching the existing latest-snapshot semantics of that method.After the fix all three read paths (live accumulate,
replayOnCommitted,replayOnCloudMode) see the same PB field, sofilteredRowsis correct whether BE or FE is restarted.Added regression test
test_streaming_insert_job_filtered_rows: loadsexample_[0-1].csvinto a table withc2 INT NOT NULL. Non-parseable name strings on a NOT NULL int column force every row to be filtered (mirrors theage int NOT NULL + 'abc'pattern fromtest_streaming_mysql_job_errormsg). AssertsscannedRows=20, filteredRows=20, fileNumber=2, and the target table ends up empty. Before this fix the test fails atfilteredRows == 20(observed0); after the fix it passes.Release note
Fix
filteredRowsalways reported as 0 injobs("type"="insert")for single-table S3 streaming insert jobs underenable_insert_strict=false+insert_max_filter_ratio>0. The filter count is now propagated from BE through the txn commit attachment into job statistics, and survives FE EditLog replay and cloud meta-service round-trip.Check List (For Author)
Test
Behavior changed:
jobs("type"="insert").jobStatistic.filteredRows(andloadStatistic.FilteredRows) now report the actual number of rows filtered by BE on the single-table streaming commit path, instead of always 0.Does this need documentation?
Check List (For Reviewer who merge this PR)