ReaderExecutor (1/N): minimal pipeline read path behind use_reader_executor#106570
ReaderExecutor (1/N): minimal pipeline read path behind use_reader_executor#106570CheSema wants to merge 15 commits into
Conversation
First, deliberately minimal step toward replacing the matryoshka of read buffers with a pipeline executor, gated by the experimental `use_reader_executor` query setting (default off). `ReadPipeline::build` routes local-file and object-storage reads that have no cache, decryption, or distributed cache through a new `ReaderExecutor`: it maps a logical read position to a `StoredObject` via `OffsetMap` and serves bytes one block at a time straight from an `ISourceReader` into an owned buffer, exposed to callers through `PipelineReadBuffer`. Any unsupported configuration falls back to the legacy path. No Rope, caches, prefetch, connection reuse, memory tracking, or metrics yet — those arrive in later steps. Tested by gtests (source readers, offset map, executor read correctness, pipeline buffer) and 04316_reader_executor_basic, which also asserts via system.text_log that the executor path is taken. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Workflow [PR], commit [69db38a] Summary: ✅
AI ReviewSummaryThis PR adds an experimental Missing context / blind spots
Findings
Tests
Final VerdictStatus: Minimum required action: preserve |
Drop line-by-line narration, notes that belong in the commit message, and past/comparative-behavior remarks; keep class and key-method/member docs. Write method names as `f` rather than `f()`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review on the experimental `use_reader_executor` path: - `DiskLocal::prepareRead` marks an unstatable file `UnknownSize` so the executor opens it and surfaces the real error (e.g. `FILE_DOESNT_EXIST`) instead of reading it as an empty file. - `ReadPipeline` falls back to the legacy path for object-storage objects of unknown size (`bytes_size == 0`) rather than returning an empty read. - The fallback guard also rejects `async_prefetch`, so a requested read-ahead stage is not silently dropped. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Set `setReadUntilPosition(object_offset + want)` before reading so a remote source fetches exactly the chunk size instead of an open-ended tail that is then cancelled (one cancelled GET per block). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`MergeTreeReaderStream` installs a `ProfileCallback` on the pipeline's buffer to drive `MergeTreeReadPool` slow-read backoff. The read happens in `readNextChunk`, so time it and invoke the inherited `profile_callback` with the served bytes; otherwise enabling `use_reader_executor` silently disables slow-read backoff for local reads. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`StorageLog`/`StorageStripeLog` bound reads to the size snapshotted under the read lock so concurrent appends are not read. `PipelineReadBuffer` inherited the no-op `setReadUntilPosition`, letting those engines read past the bound under `use_reader_executor`. Forward the bound into `ReaderExecutor`, which clamps each chunk and reports EOF at the bound. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`setReadUntilPosition` only bounded future executor chunks; bytes already exposed through `working_buffer` past a newly-narrowed bound were still returned. Shrink the working buffer to the bound so a caller that narrows it after the buffer was filled cannot read beyond it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a ReadPipeline gtest over LocalObjectStorage: a known-size object routes through the executor (PipelineReadBuffer) with matching bytes, and an unknown/zero-size object falls back to the legacy path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`setReadUntilPosition` trimmed the working buffer but left `ReaderExecutor` at the end of the already-read chunk, so widening the bound (or `setReadUntilEnd`) and continuing skipped the bytes between the old bound and the chunk end. Reset the buffer and seek the executor back to the exposed position instead, and cover the extend-after-trim case in the gtest. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The test asserts reads went through the executor via system.text_log, which only holds on local disk. On object-storage storage policies DiskObjectStorage reads use the threadpool async-prefetch stage, the executor falls back, and the activation check returns 0 — a deterministic failure on s3/azure CI configs. Tag the test no-object-storage; object-storage routing is covered by the ReadPipelineExecutorTest gtest. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Instead of tagging the test no-object-storage, disable the stages the executor falls back on — async prefetch (via the threadpool remote read method) and the filesystem cache — so the executor actually engages on object-storage storage policies and the system.text_log activation check holds there. Verified the object-storage "using ReaderExecutor" path fires on an s3 storage policy. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The minimal executor resolves reads through OffsetMap::findObjectAt; the range-splitting OffsetMap::map (and its ByteRange / PhysicalRange) were carried over from the full PR but are not used here. Remove them and point the OffsetMap gtests at findObjectAt. They return when the executor needs multi-object windows. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The executor falls back when a distributed-cache or decryption stage is present (PR implements neither), so the activation check can't hold there — and unlike async prefetch and the filesystem cache, those stages can't be turned off from the test. Tag no-distributed-cache, no-encrypted-storage; the test still runs on local disk and plain object storage, where the executor engages. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
LLVM Coverage Report
Changed lines: Changed C/C++ lines covered by tests: 522/551 (94.74%) | Lost baseline coverage (was covered on master, now uncovered in this PR): 1 line(s) · Uncovered code |
Related: #103706
Related: #103234
Related: #102282
First, deliberately minimal step of splitting #103706 (the full
ReaderExecutor) into small, independently-reviewable PRs.It adds a pipeline read path gated by the experimental
use_reader_executorsetting (default off).ReadPipeline::buildroutes local-file and object-storage reads that have no cache, decryption, or distributed cache through a newReaderExecutor: it maps a logical read position to aStoredObjectviaOffsetMapand serves bytes one block at a time straight from anISourceReaderinto an owned buffer, exposed to callers throughPipelineReadBuffer. Any configuration the executor does not yet handle falls back to the existing read-buffer chain, so behavior is unchanged with the setting off.No
Rope, caches, prefetch, connection reuse, memory tracking, or metrics yet — those arrive in later PRs in the series.Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Add an experimental
use_reader_executorsetting (default off) that routes reads through a new pipeline-basedReaderExecutorinstead of the legacy chain of read buffers.Documentation entry for user-facing changes