New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Fill sequence number gaps in EventWriter #32095
Conversation
akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventWriter.scala
Outdated
Show resolved
Hide resolved
sequenceNumber) | ||
state.copy(waitingForSeqNrLookup = state.waitingForSeqNrLookup :+ ((repr, replyTo))) | ||
} else if (fillSequenceNumberGaps && state.latestSeqNr == 0L && sequenceNumber != 1L) { | ||
// we haven't looked up latest yet, and incoming is not the well known 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe there could be an optimization to skip this lookup if all prior sequence numbers (from 1 until waitingForWrite) are queued in waitingForReply + waitingForWrite
} | ||
} else if (sequenceNumber < expectedSeqNr) { | ||
context.log.trace2("Duplicate seq nr [{}] for persistence id [{}]", sequenceNumber, persistenceId) | ||
replyTo ! StatusReply.success(WriteAck(persistenceId, sequenceNumber)) // duplicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least nice that we can make use of it for duplicate detection
akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventWriter.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lookin' good
} | ||
} else if (sequenceNumber < expectedSeqNr) { | ||
context.log.trace2("Duplicate seq nr [{}] for persistence id [{}]", sequenceNumber, persistenceId) | ||
replyTo ! StatusReply.success(WriteAck(persistenceId, sequenceNumber)) // duplicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice side effect possibly avoiding db reads on redelivery.
akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventWriter.scala
Outdated
Show resolved
Hide resolved
}.toVector | ||
if (state.idle) { | ||
sendToJournal(1, fillRepr :+ repr) | ||
state.copy(waitingForReply = Map((repr.sequenceNr, (repr, replyTo))), currentTransactionId = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, any particular reason for resetting transaction id here, could it maybe be useful to keep accumulating instead so that ids aren't re-used? (otoh we only really use it and never log the value, so maybe not that useful for correlating/bughunting)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I didn't pay attention to the transaction id yet, or I didn't fully understand it. Isn't it reset (before this PR) since the entry in perPidWriteState was removed when all updates for the pid have been completed? Do we want a more global counter? Wasn't the purpose to ignore old replies from the journal after a restart of this actor? but it was something more, since it's per pid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was just to know if we should discard old instance replies yeah.
pid, | ||
state | ||
.copy(latestSeqNr = maxSeqNr, seqNrlookupInProgress = false, waitingForSeqNrLookup = Vector.empty)) | ||
waiting.foreach { case (repr, replyTo) => handleWrite(repr, replyTo) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought first that we could perhaps batch those into one write (or a few) if a bunch has been queued up, but I realize there could be gaps between the waitingForSeqNrLookup ones as well, so maybe not worth it.
persistenceId, | ||
sequenceNumber) | ||
state.copy(waitingForSeqNrLookup = state.waitingForSeqNrLookup :+ ((repr, replyTo))) | ||
} else if (fillSequenceNumberGaps && state.latestSeqNr == 0L && state.idle && sequenceNumber != 1L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote some tests of the actual filling, and it works out quite nicely. In many cases we don't have to lookup the max when we have already a confirmed seqNr or we have some pending that is supposed to be successful and after that become a confirmed latest seqNr. I'll have to add some tests of the error scenarios for that though.
currentTransactionId = state.currentTransactionId + 1) | ||
} else { | ||
// write in progress for pid, add write to batch and perform once current write completes | ||
if (state.waitingForWriteExceedingMaxBatchSize(settings.maxBatchSize)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I relaxed this check in c8d8fbe, so that it's not counting FilteredPayload. Otherwise it could easily be exceeded by a larger gap. Do you think that is alright @johanandren ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably fine afaics 👍
I have covered everything I can think of. Ready for final review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -0,0 +1,2 @@ | |||
# internal | |||
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.EventWriter*") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this have been 2.9.0-M1.backwards.excludes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that we don't care about checking bin compat with the milestones. When 2.9.0 is out it will be added to https://github.com/akka/akka/blob/main/project/MiMa.scala#L21
akka/akka has a more advanced mima setup than many other repos, which only look for latest tag and compare with that singe version. akka/akka compares with many historical versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad for telling Levi to do this in a previous Akka PR because I had forgotten the other projects are different. I'll PR a fix for that.
Not covering everything yet but first draft for feedback...