Skip to content

Commit

Permalink
Cleanup snapshots and events from before a configured snapshot (akka#828
Browse files Browse the repository at this point in the history
)
  • Loading branch information
chbatey committed Oct 12, 2020
1 parent d3ea5bb commit 5d7837e
Show file tree
Hide file tree
Showing 15 changed files with 744 additions and 90 deletions.
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Expand Up @@ -505,6 +505,10 @@ akka.persistence.cassandra {
# Log progress after this number of delete operations. Can be set to 1 to log
# progress of each operation.
log-progress-every = 100

# By default no deletes are executed and are instead logged at INFO. Set this to true
# to actually do the deletes
dry-run = true
}

# Configuration of the Reconciler tool.
Expand Down
240 changes: 229 additions & 11 deletions core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
Expand Up @@ -4,22 +4,28 @@

package akka.persistence.cassandra.cleanup

import java.lang.{ Integer => JInt, Long => JLong }

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success

import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, ClassicActorSystemProvider }
import akka.annotation.ApiMayChange
import akka.event.Logging
import akka.pattern.ask
import akka.persistence.Persistence
import akka.persistence.JournalProtocol.DeleteMessagesTo
import akka.persistence.{ Persistence, SnapshotMetadata }
import akka.persistence.cassandra.PluginSettings
import akka.persistence.cassandra.journal.CassandraJournal
import akka.persistence.cassandra.reconciler.Reconciliation
import akka.persistence.cassandra.reconciler.ReconciliationSettings
import akka.persistence.cassandra.snapshot.CassandraSnapshotStore
import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStatements, CassandraSnapshotStore }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.Timeout
import com.datastax.oss.driver.api.core.cql.Row

/**
* Tool for deleting all events and/or snapshots for a given list of `persistenceIds` without using persistent actors.
Expand All @@ -42,18 +48,196 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
systemProvider,
new CleanupSettings(systemProvider.classicSystem.settings.config.getConfig("akka.persistence.cassandra.cleanup")))

private val system = systemProvider.classicSystem
private implicit val system = systemProvider.classicSystem
import settings._
import system.dispatcher

private val log = Logging(system, getClass)
private val journal = Persistence(system).journalFor(pluginLocation + ".journal")
private lazy val snapshotStore = Persistence(system).snapshotStoreFor(pluginLocation + ".snapshot")

// operations on journal, snapshotStore and tagViews should be only be done when dry-run = false
private val journal: ActorRef = Persistence(system).journalFor(pluginLocation + ".journal")
private lazy val snapshotStore: ActorRef = Persistence(system).snapshotStoreFor(pluginLocation + ".snapshot")
private lazy val tagViewsReconciliation = new Reconciliation(
system,
new ReconciliationSettings(system.settings.config.getConfig(pluginLocation + ".reconciler")))

private implicit val askTimeout: Timeout = operationTimeout

private lazy val session: CassandraSession =
CassandraSessionRegistry(system).sessionFor(pluginLocation)

private lazy val pluginSettings = PluginSettings(system, system.settings.config.getConfig(pluginLocation))
private lazy val statements = new CassandraSnapshotStatements(pluginSettings.snapshotSettings)
private lazy val selectLatestSnapshotsPs = session.prepare(statements.selectLatestSnapshotMeta)
private lazy val selectAllSnapshotMetaPs = session.prepare(statements.selectAllSnapshotMeta)

if (dryRun) {
log.info("Cleanup running in dry run mode. No operations will be executed against the database, only logged")
}

private def issueSnapshotDelete(
persistenceId: String,
maxToKeep: Long,
rows: Seq[Row]): Future[Option[SnapshotMetadata]] = {
log.debug("issueSnapshotDelete [{}] [{}] [{}]", persistenceId, maxToKeep, rows.size)
rows match {
case Nil =>
log.debug("persistence id [{}] has 0 snapshots, no deletes issued", persistenceId)
Future.successful(None)
case fewer if fewer.size < maxToKeep =>
// no delete required, return the oldest snapshot
log.debug("Fewer than snapshots than requested for persistence id [{}], no deletes issued", persistenceId)
Future.successful(
Some(SnapshotMetadata(persistenceId, fewer.last.getLong("sequence_nr"), fewer.last.getLong("timestamp"))))
case more =>
if (log.isDebugEnabled) {
log.debug(
"Latest {} snapshots for persistence id [{}] range from {} to {}",
maxToKeep,
persistenceId,
more.head.getLong("sequence_nr"),
more.last.getLong("sequence_nr"))
}
val result =
SnapshotMetadata(persistenceId, more.last.getLong("sequence_nr"), more.last.getLong("timestamp"))
if (dryRun) {
log.info(
"dry run: CQL: [{}] persistence_id: [{}] sequence_nr [{}]",
statements.deleteSnapshotsBefore,
persistenceId,
result.sequenceNr)
Future.successful(Some(result))
} else {
session
.executeWrite(statements.deleteSnapshotsBefore, persistenceId, result.sequenceNr: JLong)
.map(_ => Some(result))
}

}
}

/**
* Keep all snapshots that occurred after `keepAfter`.
* If fewer than `snapshotsToKeep` occurred after `keepAfter` at least that many
* are kept. Setting this to 1 ensures that at least snapshot is kept even if it
* is older than the `keepAfter`
*
* If only N number of snapshot should be kept prefer overload without timestamp
* as it is more efficient.
*
* The returned snapshot metadata can be used to issue deletes for events older than the oldest
* snapshot.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots
*
*/
def deleteBeforeSnapshot(
persistenceId: String,
snapshotsToKeep: Int,
keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = {
require(snapshotsToKeep >= 1, "must keep at least one snapshot")
require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0")
selectAllSnapshotMetaPs
.flatMap { ps =>
val allRows: Source[Row, NotUsed] = session.select(ps.bind(persistenceId))
allRows.zipWithIndex
.takeWhile {
case (row, index) =>
if (row.getLong("timestamp") > keepAfterUnixTimestamp) {
true
} else if (index < snapshotsToKeep) {
true
} else {
false
}
}
.map(_._1)
.runWith(Sink.seq)
}
.flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows))
}

/**
* Keep N snapshots and delete all older snapshots along.
*
* This operation is much cheaper than including the timestamp because it can use the primary key and limit.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots. This can be used to delete events from before the snapshot.
*/
def deleteBeforeSnapshot(persistenceId: String, maxSnapshotsToKeep: Int): Future[Option[SnapshotMetadata]] = {
require(maxSnapshotsToKeep >= 1, "Must keep at least one snapshot")
val snapshots: Future[immutable.Seq[Row]] = selectLatestSnapshotsPs.flatMap { ps =>
session.select(ps.bind(persistenceId, maxSnapshotsToKeep: JInt)).runWith(Sink.seq)
}
snapshots.flatMap(rows => issueSnapshotDelete(persistenceId, maxSnapshotsToKeep, rows))
}

/**
* Delete all events before a sequenceNr for the given persistence id.
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*
* @param persistenceId the persistence id to delete for
* @param toSequenceNr sequence nr (inclusive) to delete up to
*/
def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done] = {
sendToJournal(replyTo => DeleteMessagesTo(persistenceId, toSequenceNr, replyTo))
}

/**
* Deletes all but the last N snapshots and deletes all events before this snapshot
* Does not delete from the tag_views table
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*/
def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int): Future[Done] = {
for {
oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep)
done <- issueDeleteFromSnapshot(oldestSnapshot)
} yield done
}

/**
* Deletes all events for the given persistence id from before the first after keepAfter.
* If there are not enough snapshots to satisfy nrSnapshotsToKeep then snapshots before
* keepAfter will also be kept.
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*/
def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int, keepAfter: Long): Future[Done] = {
for {
oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep, keepAfter)
done <- issueDeleteFromSnapshot(oldestSnapshot)
} yield done
}

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String], nrSnapshotsToKeep: Int): Future[Done] = {
foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep))
}

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(
persistenceIds: immutable.Seq[String],
nrSnapshotsToKeep: Int,
keepAfter: Long): Future[Done] = {
foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep, keepAfter))
}

private def issueDeleteFromSnapshot(snapshot: Option[SnapshotMetadata]): Future[Done] = {
snapshot match {
case Some(snapshotMeta) => deleteEventsTo(snapshotMeta.persistenceId, snapshotMeta.sequenceNr)
case None => Future.successful(Done)
}
}

/**
* Delete everything related to the given list of `persistenceIds`. All events, tagged events, and
* snapshots are deleted.
Expand Down Expand Up @@ -85,7 +269,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
* Delete all events related to one single `persistenceId`. Snapshots are not deleted.
*/
def deleteAllEvents(persistenceId: String, neverUsePersistenceIdAgain: Boolean): Future[Done] = {
(journal ? CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)).mapTo[Done]
sendToJournal(CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain))
}

/**
Expand All @@ -104,7 +288,14 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
tagViewsReconciliation
.tagsForPersistenceId(persistenceId)
.flatMap { tags =>
Future.sequence(tags.map(tag => tagViewsReconciliation.deleteTagViewForPersistenceIds(Set(persistenceId), tag)))
Future.sequence(tags.map { tag =>
if (dryRun) {
log.info("dry run. Delete [{}] tag view for persistence id: [{}]", tag, persistenceId)
Future.successful(Done)
} else {
tagViewsReconciliation.deleteTagViewForPersistenceIds(Set(persistenceId), tag)
}
})
}
.map(_ => Done)
}
Expand All @@ -120,7 +311,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
* Delete all snapshots related to one single `persistenceId`. Events are not deleted.
*/
def deleteAllSnapshots(persistenceId: String): Future[Done] = {
(snapshotStore ? CassandraSnapshotStore.DeleteAllsnapshots(persistenceId)).mapTo[Done]
sendToSnapshotStore(CassandraSnapshotStore.DeleteAllSnapshots(persistenceId))
}

private def foreach(
Expand Down Expand Up @@ -154,4 +345,31 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
result
}

private def sendToSnapshotStore(msg: Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on snapshot store: {}", msg)
Future.successful(Done)
} else {
(snapshotStore ? msg).map(_ => Done)
}
}

private def sendToJournal(msg: Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on journal: {}", msg)
Future.successful(Done)
} else {
(journal ? msg).map(_ => Done)
}
}

private def sendToJournal(create: ActorRef => Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on journal: {}", create(ActorRef.noSender))
Future.successful(Done)
} else {
import akka.pattern.extended.ask
ask(journal, create).map(_ => Done)
}
}
}
Expand Up @@ -17,4 +17,5 @@ class CleanupSettings(config: Config) {
val pluginLocation: String = config.getString("plugin-location")
val operationTimeout: FiniteDuration = config.getDuration("operation-timeout", TimeUnit.MILLISECONDS).millis
val logProgressEvery: Int = config.getInt("log-progress-every")
val dryRun: Boolean = config.getBoolean("dry-run")
}
Expand Up @@ -65,6 +65,13 @@ import akka.persistence.cassandra.FutureDone
AND sequence_nr <= ?
"""

def deleteSnapshotsBefore =
s"""
DELETE FROM ${tableName}
WHERE persistence_id = ?
AND sequence_nr < ?
"""

def selectSnapshot = s"""
SELECT * FROM ${tableName} WHERE
persistence_id = ? AND
Expand All @@ -79,6 +86,19 @@ import akka.persistence.cassandra.FutureDone
${limit.map(l => s"LIMIT ${l}").getOrElse("")}
"""

def selectLatestSnapshotMeta =
s"""SELECT persistence_id, sequence_nr, timestamp FROM ${tableName} WHERE
persistence_id = ?
ORDER BY sequence_nr DESC
LIMIT ?
"""

def selectAllSnapshotMeta =
s"""SELECT sequence_nr, timestamp FROM ${tableName} WHERE
persistence_id = ?
ORDER BY sequence_nr DESC
"""

private def tableName = s"${snapshotSettings.keyspace}.${snapshotSettings.table}"

/**
Expand Down

0 comments on commit 5d7837e

Please sign in to comment.