Skip to content

Commit

Permalink
Cleanup snapshots and events from before a configured snapshot (akka#828
Browse files Browse the repository at this point in the history
)
  • Loading branch information
chbatey committed Oct 13, 2020
1 parent 6673fef commit 45b769c
Show file tree
Hide file tree
Showing 15 changed files with 785 additions and 110 deletions.
28 changes: 15 additions & 13 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ cassandra-journal {

# Parameter indicating whether the journal keyspace should be auto created.
# Not all Cassandra settings are configurable when using autocreate and for
# full control of the keyspace and table definitions you should create them
# full control of the keyspace and table definitions you should create them
# manually (with a script).
keyspace-autocreate = true

# Parameter indicating whether the journal tables should be auto created
# Not all Cassandra settings are configurable when using autocreate and for
# full control of the keyspace and table definitions you should create them
# full control of the keyspace and table definitions you should create them
# manually (with a script).
tables-autocreate = true

Expand Down Expand Up @@ -151,7 +151,7 @@ cassandra-journal {

# Compaction strategy for the journal table.
# Please refer to the tests for example configurations.
# Refer to http://docs.datastax.com/en/cql/3.1/cql/cql_reference/compactSubprop.html
# Refer to http://docs.datastax.com/en/cql/3.1/cql/cql_reference/compactSubprop.html
# for more information regarding the properties.
table-compaction-strategy {
class = "SizeTieredCompactionStrategy"
Expand Down Expand Up @@ -567,7 +567,7 @@ cassandra-snapshot-store {
protocol-version = ""

# The time to wait before cassandra will remove the tombstones created for deleted entries.
# cfr. gc_grace_seconds table property documentation on
# cfr. gc_grace_seconds table property documentation on
# http://www.datastax.com/documentation/cql/3.1/cql/cql_reference/tabProp.html
gc-grace-seconds = 864000

Expand All @@ -592,7 +592,7 @@ cassandra-snapshot-store {

# Number load attempts when recovering from the latest snapshot fails
# yet older snapshot files are available. Each recovery attempt will try
# to recover using an older than previously failed-on snapshot file
# to recover using an older than previously failed-on snapshot file
# (if any are present). If all attempts fail the recovery will fail and
# the persistent actor will be stopped.
max-load-attempts = 3
Expand All @@ -609,7 +609,7 @@ cassandra-snapshot-store {
# This configures the default settings for all CassandraReadJournal plugin
# instances in the system.
#
# If you use multiple plugin instances you need to create differently named
# If you use multiple plugin instances you need to create differently named
# sections containing only those settings that shall be different from the defaults
# configured here, importing the defaults like so:
#
Expand Down Expand Up @@ -795,13 +795,10 @@ cassandra-plugin-default-dispatcher {

# Configuration of the Cleanup tool.
akka.persistence.cassandra.cleanup {
# Full configuration path of the journal plugin to use. By default it will use
# the default plugin configured with `akka.persistence.journal.plugin`.
journal-plugin = ""

# Full configuration path of the snapshot plugin to use. By default it will use
# the default plugin configured with `akka.persistence.snapshot-store.plugin`.
snapshot-plugin = ""
# Full config path of the journal to use
journal-plugin = "cassandra-journal"
# Full config path of the snapshot store to use
snapshot-plugin = "cassandra-snapshot-store"

# Timeout of individual delete operations. If it takes longer the whole job
# will be aborted with logging of how far it reached.
Expand All @@ -810,4 +807,9 @@ akka.persistence.cassandra.cleanup {
# Log progress after this number of delete operations. Can be set to 1 to log
# progress of each operation.
log-progress-every = 100

# By default no deletes are executed and are instead logged at INFO. Set this to true
# to actually do the deletes
dry-run = true

}
252 changes: 233 additions & 19 deletions core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

package akka.persistence.cassandra.cleanup

import java.lang.{ Integer => JInt, Long => JLong }

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success

import akka.Done
import akka.actor.ActorSystem
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, ActorSystem }
import akka.annotation.ApiMayChange
import akka.event.Logging
import akka.pattern.ask
import akka.persistence.Persistence
import akka.persistence.JournalProtocol.DeleteMessagesTo
import akka.persistence.{ Persistence, SnapshotMetadata }
import akka.persistence.cassandra.journal.CassandraJournal
import akka.persistence.cassandra.snapshot.CassandraSnapshotStore
import akka.persistence.cassandra.session.scaladsl.CassandraSession
import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStore, CassandraSnapshotStoreConfig, CassandraStatements }
import akka.stream.{ Materializer, SystemMaterializer }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.Timeout
import com.datastax.driver.core.Row

/**
* Tool for deleting all events and/or snapshots for a given list of `persistenceIds` without using persistent actors.
Expand All @@ -40,15 +46,203 @@ final class Cleanup(system: ActorSystem, settings: CleanupSettings) {

import settings._
import system.dispatcher
private implicit val mat: Materializer = SystemMaterializer(system).materializer

private val log = Logging(system, getClass)
private val journal = Persistence(system).journalFor(journalPlugin)
private val snapshotStore =
if (snapshotPlugin != "" || system.settings.config.getString("akka.persistence.snapshot-store.plugin").nonEmpty)
Some(Persistence(system).snapshotStoreFor(snapshotPlugin))
else None
private val snapshotStore = Persistence(system).snapshotStoreFor(snapshotPlugin)

private implicit val askTimeout: Timeout = operationTimeout

private lazy val snapshotPluginConfig =
new CassandraSnapshotStoreConfig(system, system.settings.config.getConfig(snapshotPlugin))
private lazy val snapshotStoreSession =
new CassandraSession(
system,
snapshotPluginConfig.sessionProvider,
snapshotPluginConfig.sessionSettings,
system.dispatcher,
log,
"Cleanup",
_ => Future.successful(Done))

private lazy val snapshotStoreStatements = {
new CassandraStatements {
override def snapshotConfig: CassandraSnapshotStoreConfig = snapshotPluginConfig
}
}

private lazy val selectLatestSnapshotsPs =
snapshotStoreSession.prepare(snapshotStoreStatements.selectLatestSnapshotMeta)
private lazy val selectAllSnapshotMetaPs = snapshotStoreSession.prepare(snapshotStoreStatements.selectAllSnapshotMeta)

if (dryRun) {
log.info("Cleanup running in dry run mode. No operations will be executed against the database, only logged")
}

private def issueSnapshotDelete(
persistenceId: String,
maxToKeep: Long,
rows: Seq[Row]): Future[Option[SnapshotMetadata]] = {
log.debug("issueSnapshotDelete [{}] [{}] [{}]", persistenceId, maxToKeep, rows.size)
rows match {
case Nil =>
log.debug("persistence id [{}] has 0 snapshots, no deletes issued", persistenceId)
Future.successful(None)
case fewer if fewer.size < maxToKeep =>
// no delete required, return the oldest snapshot
log.debug("Fewer than snapshots than requested for persistence id [{}], no deletes issued", persistenceId)
Future.successful(
Some(SnapshotMetadata(persistenceId, fewer.last.getLong("sequence_nr"), fewer.last.getLong("timestamp"))))
case more =>
if (log.isDebugEnabled) {
log.debug(
"Latest {} snapshots for persistence id [{}] range from {} to {}",
maxToKeep,
persistenceId,
more.head.getLong("sequence_nr"),
more.last.getLong("sequence_nr"))
}
val result =
SnapshotMetadata(persistenceId, more.last.getLong("sequence_nr"), more.last.getLong("timestamp"))
if (dryRun) {
log.info(
"dry run: CQL: [{}] persistence_id: [{}] sequence_nr [{}]",
snapshotStoreStatements.deleteSnapshotsBefore,
persistenceId,
result.sequenceNr)
Future.successful(Some(result))
} else {
snapshotStoreSession
.executeWrite(snapshotStoreStatements.deleteSnapshotsBefore, persistenceId, result.sequenceNr: JLong)
.map(_ => Some(result))
}

}
}

/**
* Keep all snapshots that occurred after `keepAfter`.
* If fewer than `snapshotsToKeep` occurred after `keepAfter` at least that many
* are kept. Setting this to 1 ensures that at least snapshot is kept even if it
* is older than the `keepAfter`
*
* If only N number of snapshot should be kept prefer overload without timestamp
* as it is more efficient.
*
* The returned snapshot metadata can be used to issue deletes for events older than the oldest
* snapshot.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots
*
*/
def deleteBeforeSnapshot(
persistenceId: String,
snapshotsToKeep: Int,
keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = {
require(snapshotsToKeep >= 1, "must keep at least one snapshot")
require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0")
selectAllSnapshotMetaPs
.flatMap { ps =>
val allRows: Source[Row, NotUsed] = snapshotStoreSession.select(ps.bind(persistenceId))
allRows.zipWithIndex
.takeWhile {
case (row, index) =>
if (row.getLong("timestamp") > keepAfterUnixTimestamp) {
true
} else if (index < snapshotsToKeep) {
true
} else {
false
}
}
.map(_._1)
.runWith(Sink.seq)
}
.flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows))
}

/**
* Keep N snapshots and delete all older snapshots along.
*
* This operation is much cheaper than including the timestamp because it can use the primary key and limit.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots. This can be used to delete events from before the snapshot.
*/
def deleteBeforeSnapshot(persistenceId: String, maxSnapshotsToKeep: Int): Future[Option[SnapshotMetadata]] = {
require(maxSnapshotsToKeep >= 1, "Must keep at least one snapshot")
val snapshots: Future[immutable.Seq[Row]] = selectLatestSnapshotsPs.flatMap { ps =>
snapshotStoreSession.select(ps.bind(persistenceId, maxSnapshotsToKeep: JInt)).runWith(Sink.seq)
}
snapshots.flatMap(rows => issueSnapshotDelete(persistenceId, maxSnapshotsToKeep, rows))
}

/**
* Delete all events before a sequenceNr for the given persistence id.
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*
* @param persistenceId the persistence id to delete for
* @param toSequenceNr sequence nr (inclusive) to delete up to
*/
def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done] = {
sendToJournal((replyTo: ActorRef) => DeleteMessagesTo(persistenceId, toSequenceNr, replyTo))
}

/**
* Deletes all but the last N snapshots and deletes all events before this snapshot
* Does not delete from the tag_views table
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*/
def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int): Future[Done] = {
for {
oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep)
done <- issueDeleteFromSnapshot(oldestSnapshot)
} yield done
}

/**
* Deletes all events for the given persistence id from before the first after keepAfter.
* If there are not enough snapshots to satisfy nrSnapshotsToKeep then snapshots before
* keepAfter will also be kept.
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*/
def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int, keepAfter: Long): Future[Done] = {
for {
oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep, keepAfter)
done <- issueDeleteFromSnapshot(oldestSnapshot)
} yield done
}

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String], nrSnapshotsToKeep: Int): Future[Done] = {
foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep))
}

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(
persistenceIds: immutable.Seq[String],
nrSnapshotsToKeep: Int,
keepAfter: Long): Future[Done] = {
foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep, keepAfter))
}

private def issueDeleteFromSnapshot(snapshot: Option[SnapshotMetadata]): Future[Done] = {
snapshot match {
case Some(snapshotMeta) => deleteEventsTo(snapshotMeta.persistenceId, snapshotMeta.sequenceNr)
case None => Future.successful(Done)
}
}

/**
* Delete everything related to the given list of `persistenceIds`. All events and snapshots are deleted.
*/
Expand All @@ -74,28 +268,21 @@ final class Cleanup(system: ActorSystem, settings: CleanupSettings) {
* Delete all events related to one single `persistenceId`. Snapshots are not deleted.
*/
def deleteAllEvents(persistenceId: String, neverUsePersistenceIdAgain: Boolean): Future[Done] = {
(journal ? CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)).mapTo[Done]
sendToJournal(CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain))
}

/**
* Delete all snapshots related to the given list of `persistenceIds`. Events are not deleted.
*/
def deleteAllSnapshots(persistenceIds: immutable.Seq[String]): Future[Done] = {
if (snapshotStore.isDefined)
foreach(persistenceIds, "deleteAllSnapshots", pid => deleteAllSnapshots(pid))
else
Future.successful(Done)
foreach(persistenceIds, "deleteAllSnapshots", pid => deleteAllSnapshots(pid))
}

/**
* Delete all snapshots related to one single `persistenceId`. Events are not deleted.
*/
def deleteAllSnapshots(persistenceId: String): Future[Done] = {
snapshotStore match {
case Some(snapshotStoreRef) =>
(snapshotStoreRef ? CassandraSnapshotStore.DeleteAllsnapshots(persistenceId)).mapTo[Done]
case None => Future.successful(Done)
}
sendToSnapshotStore(CassandraSnapshotStore.DeleteAllSnapshots(persistenceId))
}

private def foreach(
Expand Down Expand Up @@ -129,4 +316,31 @@ final class Cleanup(system: ActorSystem, settings: CleanupSettings) {
result
}

private def sendToSnapshotStore(msg: Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on snapshot store: {}", msg)
Future.successful(Done)
} else {
(snapshotStore ? msg).map(_ => Done)
}
}

private def sendToJournal(msg: Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on journal: {}", msg)
Future.successful(Done)
} else {
(journal ? msg).map(_ => Done)
}
}

private def sendToJournal(create: ActorRef => Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on journal: {}", create(ActorRef.noSender))
Future.successful(Done)
} else {
import akka.pattern.extended.ask
ask(journal, create).map(_ => Done)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ class CleanupSettings(config: Config) {
val snapshotPlugin: String = config.getString("snapshot-plugin")
val operationTimeout: FiniteDuration = config.getDuration("operation-timeout", TimeUnit.MILLISECONDS).millis
val logProgressEvery: Int = config.getInt("log-progress-every")
val dryRun: Boolean = config.getBoolean("dry-run")
}

0 comments on commit 45b769c

Please sign in to comment.