Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaning up of snapshots and events before snapshots based on time and a # of snapshots #828

Merged
merged 1 commit into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ akka.persistence.cassandra {
# Log progress after this number of delete operations. Can be set to 1 to log
# progress of each operation.
log-progress-every = 100

# By default no deletes are executed and are instead logged at INFO. Set this to true
# to actually do the deletes
dry-run = true
chbatey marked this conversation as resolved.
Show resolved Hide resolved
}

# Configuration of the Reconciler tool.
Expand Down
240 changes: 229 additions & 11 deletions core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@

package akka.persistence.cassandra.cleanup

import java.lang.{ Integer => JInt, Long => JLong }

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success

import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, ClassicActorSystemProvider }
import akka.annotation.ApiMayChange
import akka.event.Logging
import akka.pattern.ask
import akka.persistence.Persistence
import akka.persistence.JournalProtocol.DeleteMessagesTo
import akka.persistence.{ Persistence, SnapshotMetadata }
import akka.persistence.cassandra.PluginSettings
import akka.persistence.cassandra.journal.CassandraJournal
import akka.persistence.cassandra.reconciler.Reconciliation
import akka.persistence.cassandra.reconciler.ReconciliationSettings
import akka.persistence.cassandra.snapshot.CassandraSnapshotStore
import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStatements, CassandraSnapshotStore }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.Timeout
import com.datastax.oss.driver.api.core.cql.Row

/**
* Tool for deleting all events and/or snapshots for a given list of `persistenceIds` without using persistent actors.
Expand All @@ -42,18 +48,196 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
systemProvider,
new CleanupSettings(systemProvider.classicSystem.settings.config.getConfig("akka.persistence.cassandra.cleanup")))

private val system = systemProvider.classicSystem
private implicit val system = systemProvider.classicSystem
import settings._
import system.dispatcher

private val log = Logging(system, getClass)
private val journal = Persistence(system).journalFor(pluginLocation + ".journal")
private lazy val snapshotStore = Persistence(system).snapshotStoreFor(pluginLocation + ".snapshot")

// operations on journal, snapshotStore and tagViews should be only be done when dry-run = false
private val journal: ActorRef = Persistence(system).journalFor(pluginLocation + ".journal")
private lazy val snapshotStore: ActorRef = Persistence(system).snapshotStoreFor(pluginLocation + ".snapshot")
private lazy val tagViewsReconciliation = new Reconciliation(
system,
new ReconciliationSettings(system.settings.config.getConfig(pluginLocation + ".reconciler")))

private implicit val askTimeout: Timeout = operationTimeout

private lazy val session: CassandraSession =
CassandraSessionRegistry(system).sessionFor(pluginLocation)

private lazy val pluginSettings = PluginSettings(system, system.settings.config.getConfig(pluginLocation))
private lazy val statements = new CassandraSnapshotStatements(pluginSettings.snapshotSettings)
private lazy val selectLatestSnapshotsPs = session.prepare(statements.selectLatestSnapshotMeta)
private lazy val selectAllSnapshotMetaPs = session.prepare(statements.selectAllSnapshotMeta)

if (dryRun) {
log.info("Cleanup running in dry run mode. No operations will be executed against the database, only logged")
}

private def issueSnapshotDelete(
persistenceId: String,
maxToKeep: Long,
rows: Seq[Row]): Future[Option[SnapshotMetadata]] = {
log.debug("issueSnapshotDelete [{}] [{}] [{}]", persistenceId, maxToKeep, rows.size)
rows match {
case Nil =>
log.debug("persistence id [{}] has 0 snapshots, no deletes issued", persistenceId)
Future.successful(None)
case fewer if fewer.size < maxToKeep =>
// no delete required, return the oldest snapshot
log.debug("Fewer than snapshots than requested for persistence id [{}], no deletes issued", persistenceId)
Future.successful(
Some(SnapshotMetadata(persistenceId, fewer.last.getLong("sequence_nr"), fewer.last.getLong("timestamp"))))
case more =>
if (log.isDebugEnabled) {
log.debug(
"Latest {} snapshots for persistence id [{}] range from {} to {}",
maxToKeep,
persistenceId,
more.head.getLong("sequence_nr"),
more.last.getLong("sequence_nr"))
}
val result =
SnapshotMetadata(persistenceId, more.last.getLong("sequence_nr"), more.last.getLong("timestamp"))
if (dryRun) {
log.info(
"dry run: CQL: [{}] persistence_id: [{}] sequence_nr [{}]",
statements.deleteSnapshotsBefore,
persistenceId,
result.sequenceNr)
Future.successful(Some(result))
} else {
session
.executeWrite(statements.deleteSnapshotsBefore, persistenceId, result.sequenceNr: JLong)
.map(_ => Some(result))
}

}
}

/**
* Keep all snapshots that occurred after `keepAfter`.
* If fewer than `snapshotsToKeep` occurred after `keepAfter` at least that many
* are kept. Setting this to 1 ensures that at least snapshot is kept even if it
* is older than the `keepAfter`
*
* If only N number of snapshot should be kept prefer overload without timestamp
* as it is more efficient.
*
* The returned snapshot metadata can be used to issue deletes for events older than the oldest
* snapshot.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots
*
*/
def deleteBeforeSnapshot(
persistenceId: String,
snapshotsToKeep: Int,
keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = {
require(snapshotsToKeep >= 1, "must keep at least one snapshot")
require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0")
selectAllSnapshotMetaPs
.flatMap { ps =>
val allRows: Source[Row, NotUsed] = session.select(ps.bind(persistenceId))
allRows.zipWithIndex
.takeWhile {
case (row, index) =>
if (row.getLong("timestamp") > keepAfterUnixTimestamp) {
true
} else if (index < snapshotsToKeep) {
true
} else {
false
}
}
chbatey marked this conversation as resolved.
Show resolved Hide resolved
.map(_._1)
.runWith(Sink.seq)
}
.flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows))
}

/**
* Keep N snapshots and delete all older snapshots along.
*
* This operation is much cheaper than including the timestamp because it can use the primary key and limit.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots. This can be used to delete events from before the snapshot.
*/
def deleteBeforeSnapshot(persistenceId: String, maxSnapshotsToKeep: Int): Future[Option[SnapshotMetadata]] = {
require(maxSnapshotsToKeep >= 1, "Must keep at least one snapshot")
val snapshots: Future[immutable.Seq[Row]] = selectLatestSnapshotsPs.flatMap { ps =>
session.select(ps.bind(persistenceId, maxSnapshotsToKeep: JInt)).runWith(Sink.seq)
}
snapshots.flatMap(rows => issueSnapshotDelete(persistenceId, maxSnapshotsToKeep, rows))
}

/**
* Delete all events before a sequenceNr for the given persistence id.
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*
* @param persistenceId the persistence id to delete for
* @param toSequenceNr sequence nr (inclusive) to delete up to
*/
def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done] = {
sendToJournal(replyTo => DeleteMessagesTo(persistenceId, toSequenceNr, replyTo))
}
johanandren marked this conversation as resolved.
Show resolved Hide resolved

/**
* Deletes all but the last N snapshots and deletes all events before this snapshot
* Does not delete from the tag_views table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps additional caveat about consequences for this? Events by tag would fail if you run it and see a tag for a message that is not there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it'll fail, it doesn't reconcile that way around. Only messages to tag_views table. I will add warnings tho

*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*/
def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int): Future[Done] = {
for {
oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep)
done <- issueDeleteFromSnapshot(oldestSnapshot)
} yield done
}

/**
* Deletes all events for the given persistence id from before the first after keepAfter.
* If there are not enough snapshots to satisfy nrSnapshotsToKeep then snapshots before
* keepAfter will also be kept.
*
* WARNING: deleting events is generally discouraged in event sourced systems.
* once deleted the event by tag view can not be re-built
*/
def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int, keepAfter: Long): Future[Done] = {
for {
oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep, keepAfter)
done <- issueDeleteFromSnapshot(oldestSnapshot)
} yield done
}

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String], nrSnapshotsToKeep: Int): Future[Done] = {
foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep))
}

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(
persistenceIds: immutable.Seq[String],
nrSnapshotsToKeep: Int,
keepAfter: Long): Future[Done] = {
foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep, keepAfter))
}

private def issueDeleteFromSnapshot(snapshot: Option[SnapshotMetadata]): Future[Done] = {
snapshot match {
case Some(snapshotMeta) => deleteEventsTo(snapshotMeta.persistenceId, snapshotMeta.sequenceNr)
case None => Future.successful(Done)
}
}

/**
* Delete everything related to the given list of `persistenceIds`. All events, tagged events, and
* snapshots are deleted.
Expand Down Expand Up @@ -85,7 +269,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
* Delete all events related to one single `persistenceId`. Snapshots are not deleted.
*/
def deleteAllEvents(persistenceId: String, neverUsePersistenceIdAgain: Boolean): Future[Done] = {
(journal ? CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)).mapTo[Done]
sendToJournal(CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain))
}

/**
Expand All @@ -104,7 +288,14 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
tagViewsReconciliation
.tagsForPersistenceId(persistenceId)
.flatMap { tags =>
Future.sequence(tags.map(tag => tagViewsReconciliation.deleteTagViewForPersistenceIds(Set(persistenceId), tag)))
Future.sequence(tags.map { tag =>
if (dryRun) {
log.info("dry run. Delete [{}] tag view for persistence id: [{}]", tag, persistenceId)
Future.successful(Done)
} else {
tagViewsReconciliation.deleteTagViewForPersistenceIds(Set(persistenceId), tag)
}
})
}
.map(_ => Done)
}
Expand All @@ -120,7 +311,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
* Delete all snapshots related to one single `persistenceId`. Events are not deleted.
*/
def deleteAllSnapshots(persistenceId: String): Future[Done] = {
(snapshotStore ? CassandraSnapshotStore.DeleteAllsnapshots(persistenceId)).mapTo[Done]
sendToSnapshotStore(CassandraSnapshotStore.DeleteAllSnapshots(persistenceId))
}

private def foreach(
Expand Down Expand Up @@ -154,4 +345,31 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
result
}

private def sendToSnapshotStore(msg: Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on snapshot store: {}", msg)
Future.successful(Done)
} else {
(snapshotStore ? msg).map(_ => Done)
}
}

private def sendToJournal(msg: Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on journal: {}", msg)
Future.successful(Done)
} else {
(journal ? msg).map(_ => Done)
}
}

private def sendToJournal(create: ActorRef => Any): Future[Done] = {
if (dryRun) {
log.info("dry run: Operation on journal: {}", create(ActorRef.noSender))
Future.successful(Done)
} else {
import akka.pattern.extended.ask
ask(journal, create).map(_ => Done)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ class CleanupSettings(config: Config) {
val pluginLocation: String = config.getString("plugin-location")
val operationTimeout: FiniteDuration = config.getDuration("operation-timeout", TimeUnit.MILLISECONDS).millis
val logProgressEvery: Int = config.getInt("log-progress-every")
val dryRun: Boolean = config.getBoolean("dry-run")
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ import akka.persistence.cassandra.FutureDone
AND sequence_nr <= ?
"""

def deleteSnapshotsBefore =
s"""
DELETE FROM ${tableName}
WHERE persistence_id = ?
AND sequence_nr < ?
"""

def selectSnapshot = s"""
SELECT * FROM ${tableName} WHERE
persistence_id = ? AND
Expand All @@ -79,6 +86,19 @@ import akka.persistence.cassandra.FutureDone
${limit.map(l => s"LIMIT ${l}").getOrElse("")}
"""

def selectLatestSnapshotMeta =
s"""SELECT persistence_id, sequence_nr, timestamp FROM ${tableName} WHERE
persistence_id = ?
ORDER BY sequence_nr DESC
LIMIT ?
"""

def selectAllSnapshotMeta =
s"""SELECT sequence_nr, timestamp FROM ${tableName} WHERE
persistence_id = ?
ORDER BY sequence_nr DESC
"""
chbatey marked this conversation as resolved.
Show resolved Hide resolved

private def tableName = s"${snapshotSettings.keyspace}.${snapshotSettings.table}"

/**
Expand Down
Loading