-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: unexpected events when controlling replication with consumer filter #1133
Conversation
entityRefC.ask(TestEntity.UpdateItem("C", 3, _)).futureValue | ||
|
||
(1 to 20).foreach { n => | ||
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the unexpected failure. The additional event is received by B from C.
- B receives event from C because that event was originally scoped for B
- Rejecting unknown sequence number [4] for pid [items|two|DCC] (might be accepted later)
- triggers a replay request
- C will replay all events, including the last scope change, and thereby B will receive all events
Have to think about a solution, but one way might be to have an upper sequence number on the replay requests, at least when filters are defined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another observation is that it seems to be triggered by a backtracking event, because there is a delay before this happens and I have confirmed that this delay lines up with the backtracking.behind-current-time
* filters are not used in replay requests, because the change of consumer filter is typically what triggers a replay request (missing preceding events) * but that means that a replica may receive unexpected events, so apply the filters after the sequence number that triggered the replay
014d7e5
to
64470a8
Compare
@@ -75,7 +75,14 @@ message FilterReq { | |||
|
|||
// Replay events for given entities. | |||
message ReplayReq { | |||
repeated PersistenceIdSeqNr persistence_id_offset = 1; | |||
repeated ReplayPersistenceId replay_persistence_ids = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a wire compatible change, but I will verify with a sample to be sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about that, the original was a string + uint64, now it is a message (length delimited) + uint64 so old version messages can't be parsed out of data serialized from this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, it's not compatible (I tried with the sample)
Would have to keep the old persistence_id_offset as it was, and add the new replay_persistence_ids, then include the data in both, and read from both. Not sure if it's worth the trouble? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to the wire compatible solution in e10c881
Not too messy.
/** | ||
* Explicit request to replay events for given entities. | ||
*/ | ||
final case class ReplayWithFilter(streamId: String, replayPersistenceIds: Set[ReplayPersistenceId]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replay
is public, so I added a new one instead of changing
|
||
// Note: we do not apply the filter before filterAfterSeqNr as that may be what triggered the replay. | ||
// replicatedEventOriginFilter not used for replay. | ||
if (env.sequenceNr < filterAfterSeqNr || producerFilter(env) && filter.matches(env)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is where the new filterAfterSeqNr is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
filter is typically what triggers a replay request (missing preceding events)
apply the filters after the sequence number that triggered the replay