Skip to content

Commit

Permalink
Resolve snapshot check skipped for some events (#30225) (#30226)
Browse files Browse the repository at this point in the history
  • Loading branch information
jneumaier committed Jun 3, 2021
1 parent e43f2be commit 7d1b412
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
5 changes: 3 additions & 2 deletions akka-cluster-sharding/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ akka.cluster.sharding {
state-store-mode = "ddata"

# The shard saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# events. Snapshots are used to reduce recovery times. A snapshot trigger might be delayed if a batch of updates is processed.
# Only used when state-store-mode=persistence
snapshot-after = 1000

# The shard deletes persistent events (messages and snapshots) after doing snapshot
# keeping this number of old persistent batches.
# Batch is of size `snapshot-after`.
# When set to 0 after snapshot is successfully done all events with equal or lower sequence number will be deleted.
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot)
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot).
# If larger than 0, one additional batch of journal messages is kept when state-store-mode=persistence to include messages from delayed snapshots.
keep-nr-of-batches = 2

# Settings for LeastShardAllocationStrategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
(if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) :::
(if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil)
var left = events.size
var saveSnap = false
def persistEventsAndHandleComplete(evts: List[StateChange]): Unit = {
persistAll(evts) { _ =>
left -= 1
saveSnap = saveSnap || isSnapshotNeeded
if (left == 0) {
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
state = state.copy(state.entities.union(started).diff(stopped))
saveSnapshotWhenNeeded()
if (saveSnap) {
saveSnapshot()
}
}
}
}
Expand All @@ -126,7 +130,9 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(

case DeleteMessagesSuccess(toSequenceNr) =>
val deleteTo = toSequenceNr - 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter))
// keeping one additional batch of messages in case snapshotAfter has been delayed to the end of a processed batch
val keepNrOfBatchesWithSafetyBatch = if (keepNrOfBatches == 0) 0 else keepNrOfBatches + 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatchesWithSafetyBatch * snapshotAfter))
log.debug(
"Messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]",
toSequenceNr,
Expand All @@ -151,10 +157,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
}

def saveSnapshotWhenNeeded(): Unit = {
if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
if (isSnapshotNeeded) {
saveSnapshot()
}
}

private def saveSnapshot(): Unit = {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
}

private def isSnapshotNeeded = {
lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0
}
}

0 comments on commit 7d1b412

Please sign in to comment.