-
Notifications
You must be signed in to change notification settings - Fork 639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Incorrect Calculation of Checkpoint Interval on Filtered Subcriptions #2608
Fix Incorrect Calculation of Checkpoint Interval on Filtered Subcriptions #2608
Conversation
This works now, but I still have concerns about the fact the interval ends up being 1 = every 32 etc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thefringeninja I may be missing something - it seems that the previous calculation was correct to me:
It doesn't seem we ever need to multiply the checkpoint interval multiplier by the search window size because when we obtain a FilteredReadAllResult
with a completed.Events of size 0, it means we have already processed maxSearchWindow events and we can just increment the _checkpointInterval by one and check if it's divisible by the multiplier (like it was done previously)
Assuming the previous calculation was correct, then as proposed in this PR here, this part could still be improved (with a few changes):
if (completed.Events.Length != 0) {
_checkpointInterval = 0;
} else {
if (++_checkpointInterval >= _checkpointIntervalMultiplier) {
_buffer.Enqueue((default, _nextPosition));
_checkpointInterval = 0;
}
}
Note: We can remove the modulo operation, the unchecked block and avoid any risk of overflow by resetting the counter. it appears that the ++ operation must also be done prior to the >= check (previously %) otherwise it will be off by one.
6350556
to
a363a97
Compare
a363a97
to
0c14794
Compare
if (_checkpointInterval++ % _checkpointIntervalMultiplier == 0) { | ||
_buffer.Enqueue((default, _nextPosition)); | ||
if (completed.Events.Length == 0) { | ||
unchecked { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can safely remove this as @shaan1337 suggested in his review as well.
0c14794
to
77632d5
Compare
@thefringeninja thanks for making the changes.
I will post some updates if I figure out what's happening for issue 1 and 2. |
The last empty page sent to indicate the end of stream was also being checkpointed. |
77632d5
to
4876cff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the following scenario:
- 1 event in
test
- 10 events in
abcd
- Subscribing to all with a stream prefix of
test
, max search window of 1 and checkpoint interval of 100
I get more checkpoints than I would expect (at least 10) and i'm getting the same event in the event appeared multiple times.
If the number of events in the $all stream is less than 100, I would only expect one checkpoint call when it switches to live. otherwise i would expect 1 checkpoint call every 100 events, and 1 when it switches to live. I am guessing that is not what you are expecting. Also it might be there is an off by one error with next position that is coming back from the AllReader if you are getting the same event multiple times. |
4876cff
to
2300815
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got another anomaly this morning
All the events in the database
0@$$$scavenges - Position:C:236/P:236
0@test - Position:C:421/P:421
0@abcd - Position:C:518/P:518
1@abcd - Position:C:617/P:617
2@abcd - Position:C:716/P:716
3@abcd - Position:C:815/P:815
4@abcd - Position:C:914/P:914
5@abcd - Position:C:1013/P:1013
6@abcd - Position:C:1112/P:1112
7@abcd - Position:C:1211/P:1211
8@abcd - Position:C:1310/P:1310
9@abcd - Position:C:1409/P:1409
Subscribing to $all filtered with the following options:
maxSearchWindow : 1
checkpointInterval : 10
Stream Prefix : "test"
I got the following on Event Appeared
:
event position: C:421/P:421, event stream id: test, event number: 0
and then the following checkpoints
Checkpoint Reached: C:1310/P:0
Checkpoint Reached: C:421/P:421
I noticed that the checkpoints are in the wrong order. I get the last expected checkpoint first.
Side note: The prepare position is 0 in the first checkpoint.
2300815
to
aab754b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The results looks reasonable now, waiting for @hayley-jean to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with the checkpoints is resolved, however there is still an issue because the checkpoint positions use the next position in the $all stream. Given that subscribing to $all is exclusive from the start position, you would skip an event if you were to re-subscribe from the last checkpoint.
An easy way to test is to write a few events that won't pass the filter, then create the subscription.
Take the checkpoint that's provided when the subscription goes live, stop the subscription, then write an event that passes the filter. Restart the subscription from the previous checkpoint, and the expected event is skipped.
aab754b
to
dd1c8b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am currently seeing
Error while processing message "EventStore.Core.Messages.ClientMessage+FilteredReadAllEventsForward" in queued handler '"StorageReaderQueue #3"'.
System.ArgumentOutOfRangeException: The commit position cannot be less than the prepare position (Parameter 'commitPosition')
at EventStore.Core.Services.Transport.Grpc.Position..ctor(UInt64 commitPosition, UInt64 preparePosition) in C:\source\EventStore\src\EventStore.Core\Services\Transport\Grpc\Position.cs:line 49
at EventStore.Core.Services.Transport.Grpc.Position.FromInt64(Int64 commitPosition, Int64 preparePosition) in C:\source\EventStore\src\EventStore.Core\Services\Transport\Grpc\Position.cs:line 20
at EventStore.Core.Services.Transport.Grpc.Enumerators.AllSubscriptionFiltered.CatchupAllSubscription.<>c__DisplayClass23_0.<MoveNextAsync>g__OnMessage|0(Message message) in C:\source\EventStore\src\EventStore.Core\Services\Transport\Grpc\Enumerators.AllSubscriptionFiltered.cs:line 287
at EventStore.Core.Messaging.CallbackEnvelope.ReplyWith[T](T message) in C:\source\EventStore\src\EventStore.Core\Messaging\CallbackEnvelope.cs:line 15
at EventStore.Core.Services.Storage.StorageReaderWorker.EventStore.Core.Bus.IHandle<EventStore.Core.Messages.ClientMessage.FilteredReadAllEventsForward>.Handle(FilteredReadAllEventsForward msg) in C:\source\EventStore\src\EventStore.Core\Services\Storage\StorageReaderWorker.cs:line 184
at EventStore.Core.Bus.MessageHandler`1.TryHandle(Message message) in C:\source\EventStore\src\EventStore.Core\Bus\MessageHandler.cs:line 30
at EventStore.Core.Bus.InMemoryBus.Publish(Message message) in C:\source\EventStore\src\EventStore.Core\Bus\InMemoryBus.cs:line 285
at EventStore.Core.Bus.InMemoryBus.Handle(Message message) in C:\source\EventStore\src\EventStore.Core\Bus\InMemoryBus.cs:line 279
at EventStore.Core.Bus.QueuedHandlerThreadPool.ReadFromQueue(Object o) in C:\source\EventStore\src\EventStore.Core\Bus\QueuedHandlerThreadPool.cs:line 119
This looks to be related to the recent addition of converting the PrevPosition
using Position.FromInt64
dd1c8b1
to
1014c25
Compare
Fixed: wrong calculation of checkpoint interval for filtered subscriptions