Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide minSequenceNr for snapshot deletion #25590

Merged
merged 2 commits into from
Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,16 @@ import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.sharding.Shard.ShardCommand
import akka.actor.Actor

import akka.util.MessageBufferMap

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.cluster.Cluster
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORSetKey
import akka.cluster.ddata.Replicator._
import akka.actor.Stash
import akka.persistence.PersistentActor
import akka.persistence.SnapshotOffer
import akka.persistence.SaveSnapshotSuccess
import akka.persistence.DeleteSnapshotsFailure
import akka.persistence.DeleteMessagesSuccess
import akka.persistence.SaveSnapshotFailure
import akka.persistence.DeleteMessagesFailure
import akka.persistence.DeleteSnapshotsSuccess
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.RecoveryCompleted
import akka.persistence._
import akka.actor.NoSerializationVerificationNeeded

/**
Expand Down Expand Up @@ -531,7 +522,7 @@ private[akka] class PersistentShard(
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
* it's not safe to delete all events immediately because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
Expand All @@ -543,20 +534,25 @@ private[akka] class PersistentShard(
}

case SaveSnapshotFailure(_, reason) ⇒
log.warning("PersistentShard snapshot failure: {}", reason.getMessage)
log.warning("PersistentShard snapshot failure: [{}]", reason.getMessage)

case DeleteMessagesSuccess(toSequenceNr) ⇒
log.debug("PersistentShard messages to {} deleted successfully", toSequenceNr)
deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = toSequenceNr - 1))
val deleteTo = toSequenceNr - 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the low end of the range is a performance opt to not always delete from 0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right, and it's used in Cassandra plugin when akka/akka-persistence-cassandra#394 has been released

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw @johanandren what is the consequence if I want to upgade akka-cluster version to the 2.5.17 or newer (let's say the latest 2.5.23) but with akka-persistence-cassandra-0.62? Is it possible or do I have to use minimum of akka-persistence-cassandra-0.90 ?

log.debug("PersistentShard messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]", toSequenceNr, deleteFrom, deleteTo)
deleteSnapshots(SnapshotSelectionCriteria(
minSequenceNr = deleteFrom,
maxSequenceNr = deleteTo
))

case DeleteMessagesFailure(reason, toSequenceNr) ⇒
log.warning("PersistentShard messages to {} deletion failure: {}", toSequenceNr, reason.getMessage)
log.warning("PersistentShard messages to [{}] deletion failure: [{}]", toSequenceNr, reason.getMessage)

case DeleteSnapshotsSuccess(m) ⇒
log.debug("PersistentShard snapshots matching {} deleted successfully", m)
log.debug("PersistentShard snapshots matching [{}] deleted successfully", m)

case DeleteSnapshotsFailure(m, reason) ⇒
log.warning("PersistentShard snapshots matching {} deletion failure: {}", m, reason.getMessage)
log.warning("PersistentShard snapshots matching [{}] deletion failure: [{}]", m, reason.getMessage)

}: Receive).orElse(super.receiveCommand)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object RemoveInternalClusterShardingDataSpec {
akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec"
akka.cluster.sharding.snapshot-after = 5
akka.cluster.sharding.state-store-mode = persistence
|akka.cluster.sharding.keep-nr-of-batches = 0
akka.cluster.sharding.keep-nr-of-batches = 0
"""

val extractEntityId: ShardRegion.ExtractEntityId = {
Expand Down
4 changes: 3 additions & 1 deletion akka-docs/src/main/paradox/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,9 @@ A persistent actor can delete individual snapshots by calling the `deleteSnapsho
when the snapshot was taken.

To bulk-delete a range of snapshots matching `SnapshotSelectionCriteria`,
persistent actors should use the `deleteSnapshots` method.
persistent actors should use the `deleteSnapshots` method. Depending on the journal used this might be inefficient. It is
best practice to do specific deletes with `deleteSnapshot` or to include a `minSequenceNr` as well as a `maxSequenceNr`
for the `SnapshotSelectionCriteria`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good


### Snapshot status handling

Expand Down