chore(storage): generalize virtual stream readers#331
Conversation
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
PR SummaryMedium Risk Overview Updates storage read plumbing ( Reviewed by Cursor Bugbot for commit 6ceee97. Bugbot is set up for automated code reviews on this repo. Configure here. |
WalkthroughThis PR replaces the synchronous ChangesVirtual Stream Reader Infrastructure Refactoring
Sequence Diagram(s)sequenceDiagram
participant Client
participant StorageReaderWorker
participant VirtualStreamReader
participant SingleEventInMemoryStream as NodeState/Gossip<br/>Stream
participant PersistentStorage
Client->>StorageReaderWorker: ReadStreamEventsForward(streamId)
StorageReaderWorker->>VirtualStreamReader: CanReadStream(streamId)
alt Virtual Stream
VirtualStreamReader->>SingleEventInMemoryStream: Match found
StorageReaderWorker->>VirtualStreamReader: ReadForwards(msg, token)
VirtualStreamReader->>SingleEventInMemoryStream: ReadForwards(msg, token)
SingleEventInMemoryStream-->>VirtualStreamReader: ValueTask<ReadStreamEventsForwardCompleted>
VirtualStreamReader-->>StorageReaderWorker: forwarded result
else Persistent Stream
StorageReaderWorker->>PersistentStorage: ReadStreamEventsForward(streamId)
PersistentStorage-->>StorageReaderWorker: result
end
StorageReaderWorker-->>Client: ReadStreamEventsForwardCompleted
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/EventStore.Core/Services/Storage/StorageReaderService.cs (1)
34-67: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd parameter validation for
virtualStreamReader.The constructor validates most injected dependencies with
Ensure.NotNull(), butvirtualStreamReaderis missing this check. For consistency and to fail fast with a clear error message, add validation at line 52:✨ Proposed fix
Ensure.NotNull(bus, "bus"); Ensure.NotNull(subscriber, "subscriber"); Ensure.NotNull(readIndex, "readIndex"); Ensure.NotNull(systemStreams, nameof(systemStreams)); Ensure.Positive(threadCount, "threadCount"); Ensure.NotNull(writerCheckpoint, "writerCheckpoint"); + Ensure.NotNull(virtualStreamReader, nameof(virtualStreamReader));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/StorageReaderService.cs` around lines 34 - 67, The constructor StorageReaderService is missing validation for the virtualStreamReader parameter; add a null-check using Ensure.NotNull(virtualStreamReader, "virtualStreamReader") (matching the style of other deps) near the top of the StorageReaderService constructor so the injected IVirtualStreamReader is validated and fails fast with a clear message.src/EventStore.Core/Services/Storage/StorageReaderWorker.cs (1)
57-76: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd parameter validation for
virtualStreamReader.The constructor validates several injected dependencies with
Ensure.NotNull(), butvirtualStreamReaderis missing this check. For consistency and to fail fast with a clear error message, add validation after line 68:✨ Proposed fix
Ensure.NotNull(publisher, "publisher"); Ensure.NotNull(readIndex, "readIndex"); Ensure.NotNull(systemStreams, nameof(systemStreams)); Ensure.NotNull(writerCheckpoint, "writerCheckpoint"); + Ensure.NotNull(virtualStreamReader, nameof(virtualStreamReader)); _publisher = publisher; _readIndex = readIndex; _systemStreams = systemStreams; _writerCheckpoint = writerCheckpoint; _queueId = queueId; _virtualStreamReader = virtualStreamReader;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/StorageReaderWorker.cs` around lines 57 - 76, Add a null-check for the virtualStreamReader parameter in the StorageReaderWorker constructor by calling Ensure.NotNull(virtualStreamReader, nameof(virtualStreamReader)) alongside the other Ensure.NotNull checks so the constructor fails fast with a clear error when virtualStreamReader is null.
🧹 Nitpick comments (6)
src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs (4)
30-32: ⚡ Quick winUnused
CancellationTokenparameter.The
tokenparameter is accepted but never checked. Even for synchronous operations wrapped inValueTask.FromResult, consider addingtoken.ThrowIfCancellationRequested()at the method start to respect caller cancellation requests.♻️ Proposed fix to respect cancellation
public ValueTask<ClientMessage.ReadStreamEventsForwardCompleted> ReadForwards( ClientMessage.ReadStreamEventsForward msg, CancellationToken token) { + token.ThrowIfCancellationRequested(); ReadStreamResult result;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs` around lines 30 - 32, The ReadForwards method accepts a CancellationToken parameter (token) but never checks it; add an early cancellation check at the start of ReadForwards by calling token.ThrowIfCancellationRequested() so the method honors caller cancellation requests before constructing/returning the ClientMessage.ReadStreamEventsForwardCompleted ValueTask; reference the ReadForwards method and the ClientMessage.ReadStreamEventsForwardCompleted return type and ensure the token.ThrowIfCancellationRequested() call is placed before any work or returning ValueTask.FromResult.
136-136: ⚡ Quick winMissing
streamIdvalidation inGetLastEventNumber.The method accepts a
streamIdparameter but doesn't validate that it matches_streamName. While theVirtualStreamReadershould only call this afterCanReadStreamreturns true, defensive programming suggests adding a guard to prevent incorrect results if called with an unowned stream ID.🛡️ Proposed fix to add validation
-public long GetLastEventNumber(string streamId) => _lastEvent?.EventNumber ?? ExpectedVersion.NoStream; +public long GetLastEventNumber(string streamId) +{ + if (streamId != _streamName) + throw new ArgumentException($"Stream '{streamId}' is not owned by this reader", nameof(streamId)); + return _lastEvent?.EventNumber ?? ExpectedVersion.NoStream; +}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs` at line 136, GetLastEventNumber currently ignores the streamId parameter which can return incorrect data if called for a different stream; add a guard at the start of GetLastEventNumber(string streamId) to validate that streamId equals the instance's _streamName (or throw ArgumentException/ArgumentOutOfRangeException) and only then return _lastEvent?.EventNumber ?? ExpectedVersion.NoStream; this ensures the method refuses or fails fast on unowned stream IDs.
138-138: ⚡ Quick winMissing
streamIdvalidation inGetLastIndexedPosition.The method accepts a
streamIdparameter but doesn't validate that it matches_streamName. Consider adding the same defensive guard as suggested forGetLastEventNumber.🛡️ Proposed fix to add validation
-public long GetLastIndexedPosition(string streamId) => -1; +public long GetLastIndexedPosition(string streamId) +{ + if (streamId != _streamName) + throw new ArgumentException($"Stream '{streamId}' is not owned by this reader", nameof(streamId)); + return -1; +}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs` at line 138, GetLastIndexedPosition currently ignores the streamId parameter; add the same defensive validation used in GetLastEventNumber to ensure streamId equals _streamName (and throw the same exception type/message used there) before returning the position, so the method validates input consistency with the in-memory stream implementation.
82-84: ⚡ Quick winUnused
CancellationTokenparameter.The
tokenparameter is accepted but never checked. Even for synchronous operations wrapped inValueTask.FromResult, consider addingtoken.ThrowIfCancellationRequested()at the method start to respect caller cancellation requests.♻️ Proposed fix to respect cancellation
public ValueTask<ClientMessage.ReadStreamEventsBackwardCompleted> ReadBackwards( ClientMessage.ReadStreamEventsBackward msg, CancellationToken token) { + token.ThrowIfCancellationRequested(); ReadStreamResult result;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs` around lines 82 - 84, The ReadBackwards method accepts a CancellationToken parameter (token) but never observes it; update the ClientMessage.ReadStreamEventsBackward ReadBackwards method to check for cancellation immediately (e.g., call token.ThrowIfCancellationRequested() at the start of the method) before any synchronous work or returning ValueTask.FromResult so the caller's cancellation request is respected.src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs (2)
43-66: ⚡ Quick winConsider checking
CancellationTokenin fallback path.When no reader claims the stream, the fallback returns a synchronous result without checking the
token. For consistency with the delegated path, consider addingtoken.ThrowIfCancellationRequested()before the fallback return.♻️ Proposed fix to check cancellation in fallback
public ValueTask<ClientMessage.ReadStreamEventsBackwardCompleted> ReadBackwards( ClientMessage.ReadStreamEventsBackward msg, CancellationToken token) { if (TryGetReader(msg.EventStreamId, out var reader)) { return reader.ReadBackwards(msg, token); } + token.ThrowIfCancellationRequested(); return ValueTask.FromResult(new ClientMessage.ReadStreamEventsBackwardCompleted(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs` around lines 43 - 66, The fallback path in VirtualStreamReader.ReadBackwards returns a synchronous ValueTask without honoring the CancellationToken; update ReadBackwards so that if TryGetReader(msg.EventStreamId, out var reader) is false it first calls token.ThrowIfCancellationRequested() (or checks token.IsCancellationRequested and returns a canceled ValueTask) before returning the new ClientMessage.ReadStreamEventsBackwardCompleted result, ensuring behavior matches the delegated reader.ReadBackwards(msg, token) path.
18-41: ⚡ Quick winConsider checking
CancellationTokenin fallback path.When no reader claims the stream, the fallback returns a synchronous result without checking the
token. For consistency with the delegated path, consider addingtoken.ThrowIfCancellationRequested()before the fallback return.♻️ Proposed fix to check cancellation in fallback
public ValueTask<ClientMessage.ReadStreamEventsForwardCompleted> ReadForwards( ClientMessage.ReadStreamEventsForward msg, CancellationToken token) { if (TryGetReader(msg.EventStreamId, out var reader)) { return reader.ReadForwards(msg, token); } + token.ThrowIfCancellationRequested(); return ValueTask.FromResult(new ClientMessage.ReadStreamEventsForwardCompleted(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs` around lines 18 - 41, The ReadForwards method currently returns a synchronous fallback ValueTask when TryGetReader fails but doesn't honor the CancellationToken; before returning the fallback ValueTask.FromResult in ReadForwards, call token.ThrowIfCancellationRequested() so cancellation is handled consistently with the delegated reader path (refer to ReadForwards, TryGetReader, CancellationToken token and the fallback ValueTask.FromResult).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/EventStore.Core/Services/Storage/StorageReaderService.cs`:
- Around line 34-67: The constructor StorageReaderService is missing validation
for the virtualStreamReader parameter; add a null-check using
Ensure.NotNull(virtualStreamReader, "virtualStreamReader") (matching the style
of other deps) near the top of the StorageReaderService constructor so the
injected IVirtualStreamReader is validated and fails fast with a clear message.
In `@src/EventStore.Core/Services/Storage/StorageReaderWorker.cs`:
- Around line 57-76: Add a null-check for the virtualStreamReader parameter in
the StorageReaderWorker constructor by calling
Ensure.NotNull(virtualStreamReader, nameof(virtualStreamReader)) alongside the
other Ensure.NotNull checks so the constructor fails fast with a clear error
when virtualStreamReader is null.
---
Nitpick comments:
In `@src/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cs`:
- Around line 30-32: The ReadForwards method accepts a CancellationToken
parameter (token) but never checks it; add an early cancellation check at the
start of ReadForwards by calling token.ThrowIfCancellationRequested() so the
method honors caller cancellation requests before constructing/returning the
ClientMessage.ReadStreamEventsForwardCompleted ValueTask; reference the
ReadForwards method and the ClientMessage.ReadStreamEventsForwardCompleted
return type and ensure the token.ThrowIfCancellationRequested() call is placed
before any work or returning ValueTask.FromResult.
- Line 136: GetLastEventNumber currently ignores the streamId parameter which
can return incorrect data if called for a different stream; add a guard at the
start of GetLastEventNumber(string streamId) to validate that streamId equals
the instance's _streamName (or throw
ArgumentException/ArgumentOutOfRangeException) and only then return
_lastEvent?.EventNumber ?? ExpectedVersion.NoStream; this ensures the method
refuses or fails fast on unowned stream IDs.
- Line 138: GetLastIndexedPosition currently ignores the streamId parameter; add
the same defensive validation used in GetLastEventNumber to ensure streamId
equals _streamName (and throw the same exception type/message used there) before
returning the position, so the method validates input consistency with the
in-memory stream implementation.
- Around line 82-84: The ReadBackwards method accepts a CancellationToken
parameter (token) but never observes it; update the
ClientMessage.ReadStreamEventsBackward ReadBackwards method to check for
cancellation immediately (e.g., call token.ThrowIfCancellationRequested() at the
start of the method) before any synchronous work or returning
ValueTask.FromResult so the caller's cancellation request is respected.
In `@src/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cs`:
- Around line 43-66: The fallback path in VirtualStreamReader.ReadBackwards
returns a synchronous ValueTask without honoring the CancellationToken; update
ReadBackwards so that if TryGetReader(msg.EventStreamId, out var reader) is
false it first calls token.ThrowIfCancellationRequested() (or checks
token.IsCancellationRequested and returns a canceled ValueTask) before returning
the new ClientMessage.ReadStreamEventsBackwardCompleted result, ensuring
behavior matches the delegated reader.ReadBackwards(msg, token) path.
- Around line 18-41: The ReadForwards method currently returns a synchronous
fallback ValueTask when TryGetReader fails but doesn't honor the
CancellationToken; before returning the fallback ValueTask.FromResult in
ReadForwards, call token.ThrowIfCancellationRequested() so cancellation is
handled consistently with the delegated reader path (refer to ReadForwards,
TryGetReader, CancellationToken token and the fallback ValueTask.FromResult).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a6ac1d07-52e0-4e2a-81b6-b015272db955
📒 Files selected for processing (14)
src/EventStore.Core.Tests/Services/Storage/when_cancelling_storage_reader_worker.cssrc/EventStore.Core.XUnit.Tests/Services/Storage/InMemory/VirtualStreamReaderTests.cssrc/EventStore.Core/ClusterVNode.cssrc/EventStore.Core/Services/Storage/InMemory/GossipListenerService.cssrc/EventStore.Core/Services/Storage/InMemory/IInMemoryStreamReader.cssrc/EventStore.Core/Services/Storage/InMemory/IVirtualStreamReader.cssrc/EventStore.Core/Services/Storage/InMemory/InMemoryStreamReader.cssrc/EventStore.Core/Services/Storage/InMemory/NodeStateListenerService.cssrc/EventStore.Core/Services/Storage/InMemory/SingleEventInMemoryStream.cssrc/EventStore.Core/Services/Storage/InMemory/VirtualStreamReader.cssrc/EventStore.Core/Services/Storage/StorageReaderService.cssrc/EventStore.Core/Services/Storage/StorageReaderWorker.cssrc/EventStore.Core/Services/SubscriptionsService.cssrc/EventStore.Core/Services/SystemNames.cs
💤 Files with no reviewable changes (3)
- src/EventStore.Core/Services/Storage/InMemory/IInMemoryStreamReader.cs
- src/EventStore.Core/Services/Storage/InMemory/InMemoryStreamReader.cs
- src/EventStore.Core/Services/SystemNames.cs
Uh oh!
There was an error while loading. Please reload this page.