feat(sentry-streams): Add native Rust batch step for PyAnyMessage#307
feat(sentry-streams): Add native Rust batch step for PyAnyMessage#307
Conversation
Implement Batch (row accumulation, flush-time list build under GIL) and BatchStep (ProcessingStrategy: watermarks, backpressure, route matching). Pipeline Batch operator uses the Rust path instead of only the Python reduce delegate. Only PyAnyMessage streaming rows are supported; RawMessage is rejected with InvalidMessage for broker-originated inputs. Add split unit tests: batch_tests exercise Batch in isolation; step_tests focus on strategy behavior and forwarding. Made-with: Cursor
Complete the watermark and backpressure tests, drop the broken broker rewrap helper in favor of explicit committable maps, and annotate batch offset maps for type inference. Add a test-only setter to exercise MessageRejected when a batch is held. Refs GH-307 Made-with: Cursor
After a successful batch submit, push buffered and synthetic watermarks into pending_watermarks and drain them in poll (poll next, merge commit, submit; MessageRejected re-queues to the front) matching PythonAdapter's transformed message pattern. Refs GH-307 Made-with: Cursor
Batch now takes committable maps and Py<PyAnyMessage> in from_initial/append so the struct avoids unreachable payload branches. Rename build_stacked_message to flush. Collapse pending_downstream and pending_watermarks into a single outbound deque with one drain_outbound (poll+merge+submit+requeue). BatchStep::submit decomposes the routed message after a RawMessage guard. Refs GH-307 Made-with: Cursor
Queue failed batch flushes on outbound with push_front like other MessageRejected paths. Use stalled_batch to keep submit gating (outbound can hold non-stall work). Remove retry_carried_batch; drain_outbound clears the stall bit after a successful submit. Refs GH-307 Made-with: Cursor
try_emit_batch no longer poll+submits the batched message; it push_backs the batch, sets stalled_batch, and enqueues the watermark tail so a single path handles delivery and backpressure. Refs GH-307 Made-with: Cursor
1d018cf to
160fef4
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 160fef4. Configure here.
evanh
left a comment
There was a problem hiding this comment.
Couple comments but overall seems good to me.
| } | ||
|
|
||
| pub fn flush(&self) -> Result<Message<RoutedValue>, StrategyError> { | ||
| if self.elements.is_empty() { |
There was a problem hiding this comment.
Is this even possible? Could this no-op instead of returning an error?
| Ok(()) => { | ||
| if self.pending_batch { | ||
| self.pending_batch = false; | ||
| } |
There was a problem hiding this comment.
Bug: The pending_batch flag is cleared before all associated watermarks are sent, which can cause message reordering under backpressure.
Severity: HIGH
Suggested Fix
The pending_batch flag should only be cleared after all messages in the outbound queue for a given batch (including watermarks) have been successfully delivered. Move the logic that sets self.pending_batch = false to a point after the drain_outbound loop has confirmed the queue is empty.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: sentry_streams/src/batch_step.rs#L210-L213
Potential issue: The `pending_batch` flag is cleared prematurely in the `drain_outbound`
function. When a batch is flushed, it queues a batch message and associated watermarks,
and sets `pending_batch` to `true`. The current logic resets `pending_batch` to `false`
as soon as the main batch message is successfully submitted. If a subsequent watermark
is rejected due to downstream backpressure, `pending_batch` is already `false`. This
allows new messages to be submitted and form a new batch, which can be processed before
the pending watermark from the previous batch, violating message ordering guarantees.

Add a Rust
Batch+BatchStepimplementation for pipeline batching: accumulatePy<PyAnyMessage>per route, flush on size or time, emit a singlePyAnyMessagewhose payload is a list of row payloads, then drive buffered and synthetic watermarks and existing backpressure behavior.Behavior
RuntimeOperator::Batchbuildsbuild_batch_stepin Rust (wired fromrust_arroyo/reduce()when the batch config is native).PyStreamingMessage::PyAnyMessagerows are accepted;RawMessageis rejected withInvalidMessagefor broker messages so offsets surface correctly.Py<PyAnyMessage>afterMessage::into_payload).Tests
tests::batch:Batchin isolation (list output, committable merge,should_flush).tests::step:BatchStepasProcessingStrategy(route forward, flush count, raw rejection).Python stubs and adapter glue updated for the Batch operator path where applicable.
Made with Cursor