From 45b769cac1d7ba89b9d4ec18ad4bd471bcdf1ee7 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Mon, 12 Oct 2020 14:03:37 +0100 Subject: [PATCH] Cleanup snapshots and events from before a configured snapshot (#828) --- core/src/main/resources/reference.conf | 28 +- .../cassandra/cleanup/Cleanup.scala | 252 ++++++++++++++-- .../cassandra/cleanup/CleanupSettings.scala | 1 + .../snapshot/CassandraSnapshotStore.scala | 148 +++++----- .../snapshot/CassandraStatements.scala | 20 ++ .../persistence/cassandra/CassandraSpec.scala | 9 + .../cassandra/cleanup/CleanupSpec.scala | 275 +++++++++++++++++- .../cassandra/query/DirectWriting.scala | 28 +- docs/src/main/paradox/cleanup.md | 31 ++ docs/src/main/paradox/events-by-tag.md | 18 ++ docs/src/main/paradox/index.md | 1 + docs/src/main/paradox/journal.md | 2 +- docs/src/main/paradox/snapshots.md | 2 +- .../java/jdoc/cleanup/CleanupDocExample.java | 44 +++ .../scala/doc/cleanup/CleanupDocExample.scala | 36 +++ 15 files changed, 785 insertions(+), 110 deletions(-) create mode 100644 docs/src/main/paradox/cleanup.md create mode 100644 docs/src/test/java/jdoc/cleanup/CleanupDocExample.java create mode 100644 docs/src/test/scala/doc/cleanup/CleanupDocExample.scala diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index bc52bfd23..ce7f57a86 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -49,13 +49,13 @@ cassandra-journal { # Parameter indicating whether the journal keyspace should be auto created. # Not all Cassandra settings are configurable when using autocreate and for - # full control of the keyspace and table definitions you should create them + # full control of the keyspace and table definitions you should create them # manually (with a script). keyspace-autocreate = true # Parameter indicating whether the journal tables should be auto created # Not all Cassandra settings are configurable when using autocreate and for - # full control of the keyspace and table definitions you should create them + # full control of the keyspace and table definitions you should create them # manually (with a script). tables-autocreate = true @@ -151,7 +151,7 @@ cassandra-journal { # Compaction strategy for the journal table. # Please refer to the tests for example configurations. - # Refer to http://docs.datastax.com/en/cql/3.1/cql/cql_reference/compactSubprop.html + # Refer to http://docs.datastax.com/en/cql/3.1/cql/cql_reference/compactSubprop.html # for more information regarding the properties. table-compaction-strategy { class = "SizeTieredCompactionStrategy" @@ -567,7 +567,7 @@ cassandra-snapshot-store { protocol-version = "" # The time to wait before cassandra will remove the tombstones created for deleted entries. - # cfr. gc_grace_seconds table property documentation on + # cfr. gc_grace_seconds table property documentation on # http://www.datastax.com/documentation/cql/3.1/cql/cql_reference/tabProp.html gc-grace-seconds = 864000 @@ -592,7 +592,7 @@ cassandra-snapshot-store { # Number load attempts when recovering from the latest snapshot fails # yet older snapshot files are available. Each recovery attempt will try - # to recover using an older than previously failed-on snapshot file + # to recover using an older than previously failed-on snapshot file # (if any are present). If all attempts fail the recovery will fail and # the persistent actor will be stopped. max-load-attempts = 3 @@ -609,7 +609,7 @@ cassandra-snapshot-store { # This configures the default settings for all CassandraReadJournal plugin # instances in the system. # -# If you use multiple plugin instances you need to create differently named +# If you use multiple plugin instances you need to create differently named # sections containing only those settings that shall be different from the defaults # configured here, importing the defaults like so: # @@ -795,13 +795,10 @@ cassandra-plugin-default-dispatcher { # Configuration of the Cleanup tool. akka.persistence.cassandra.cleanup { - # Full configuration path of the journal plugin to use. By default it will use - # the default plugin configured with `akka.persistence.journal.plugin`. - journal-plugin = "" - - # Full configuration path of the snapshot plugin to use. By default it will use - # the default plugin configured with `akka.persistence.snapshot-store.plugin`. - snapshot-plugin = "" + # Full config path of the journal to use + journal-plugin = "cassandra-journal" + # Full config path of the snapshot store to use + snapshot-plugin = "cassandra-snapshot-store" # Timeout of individual delete operations. If it takes longer the whole job # will be aborted with logging of how far it reached. @@ -810,4 +807,9 @@ akka.persistence.cassandra.cleanup { # Log progress after this number of delete operations. Can be set to 1 to log # progress of each operation. log-progress-every = 100 + + # By default no deletes are executed and are instead logged at INFO. Set this to true + # to actually do the deletes + dry-run = true + } diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala index 5501d50a0..ca0e47b29 100644 --- a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala +++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala @@ -4,20 +4,26 @@ package akka.persistence.cassandra.cleanup +import java.lang.{ Integer => JInt, Long => JLong } + import scala.collection.immutable import scala.concurrent.Future import scala.util.Failure import scala.util.Success - -import akka.Done -import akka.actor.ActorSystem +import akka.{ Done, NotUsed } +import akka.actor.{ ActorRef, ActorSystem } import akka.annotation.ApiMayChange import akka.event.Logging import akka.pattern.ask -import akka.persistence.Persistence +import akka.persistence.JournalProtocol.DeleteMessagesTo +import akka.persistence.{ Persistence, SnapshotMetadata } import akka.persistence.cassandra.journal.CassandraJournal -import akka.persistence.cassandra.snapshot.CassandraSnapshotStore +import akka.persistence.cassandra.session.scaladsl.CassandraSession +import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStore, CassandraSnapshotStoreConfig, CassandraStatements } +import akka.stream.{ Materializer, SystemMaterializer } +import akka.stream.scaladsl.{ Sink, Source } import akka.util.Timeout +import com.datastax.driver.core.Row /** * Tool for deleting all events and/or snapshots for a given list of `persistenceIds` without using persistent actors. @@ -40,15 +46,203 @@ final class Cleanup(system: ActorSystem, settings: CleanupSettings) { import settings._ import system.dispatcher + private implicit val mat: Materializer = SystemMaterializer(system).materializer private val log = Logging(system, getClass) private val journal = Persistence(system).journalFor(journalPlugin) - private val snapshotStore = - if (snapshotPlugin != "" || system.settings.config.getString("akka.persistence.snapshot-store.plugin").nonEmpty) - Some(Persistence(system).snapshotStoreFor(snapshotPlugin)) - else None + private val snapshotStore = Persistence(system).snapshotStoreFor(snapshotPlugin) + private implicit val askTimeout: Timeout = operationTimeout + private lazy val snapshotPluginConfig = + new CassandraSnapshotStoreConfig(system, system.settings.config.getConfig(snapshotPlugin)) + private lazy val snapshotStoreSession = + new CassandraSession( + system, + snapshotPluginConfig.sessionProvider, + snapshotPluginConfig.sessionSettings, + system.dispatcher, + log, + "Cleanup", + _ => Future.successful(Done)) + + private lazy val snapshotStoreStatements = { + new CassandraStatements { + override def snapshotConfig: CassandraSnapshotStoreConfig = snapshotPluginConfig + } + } + + private lazy val selectLatestSnapshotsPs = + snapshotStoreSession.prepare(snapshotStoreStatements.selectLatestSnapshotMeta) + private lazy val selectAllSnapshotMetaPs = snapshotStoreSession.prepare(snapshotStoreStatements.selectAllSnapshotMeta) + + if (dryRun) { + log.info("Cleanup running in dry run mode. No operations will be executed against the database, only logged") + } + + private def issueSnapshotDelete( + persistenceId: String, + maxToKeep: Long, + rows: Seq[Row]): Future[Option[SnapshotMetadata]] = { + log.debug("issueSnapshotDelete [{}] [{}] [{}]", persistenceId, maxToKeep, rows.size) + rows match { + case Nil => + log.debug("persistence id [{}] has 0 snapshots, no deletes issued", persistenceId) + Future.successful(None) + case fewer if fewer.size < maxToKeep => + // no delete required, return the oldest snapshot + log.debug("Fewer than snapshots than requested for persistence id [{}], no deletes issued", persistenceId) + Future.successful( + Some(SnapshotMetadata(persistenceId, fewer.last.getLong("sequence_nr"), fewer.last.getLong("timestamp")))) + case more => + if (log.isDebugEnabled) { + log.debug( + "Latest {} snapshots for persistence id [{}] range from {} to {}", + maxToKeep, + persistenceId, + more.head.getLong("sequence_nr"), + more.last.getLong("sequence_nr")) + } + val result = + SnapshotMetadata(persistenceId, more.last.getLong("sequence_nr"), more.last.getLong("timestamp")) + if (dryRun) { + log.info( + "dry run: CQL: [{}] persistence_id: [{}] sequence_nr [{}]", + snapshotStoreStatements.deleteSnapshotsBefore, + persistenceId, + result.sequenceNr) + Future.successful(Some(result)) + } else { + snapshotStoreSession + .executeWrite(snapshotStoreStatements.deleteSnapshotsBefore, persistenceId, result.sequenceNr: JLong) + .map(_ => Some(result)) + } + + } + } + + /** + * Keep all snapshots that occurred after `keepAfter`. + * If fewer than `snapshotsToKeep` occurred after `keepAfter` at least that many + * are kept. Setting this to 1 ensures that at least snapshot is kept even if it + * is older than the `keepAfter` + * + * If only N number of snapshot should be kept prefer overload without timestamp + * as it is more efficient. + * + * The returned snapshot metadata can be used to issue deletes for events older than the oldest + * snapshot. + * + * @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots + * + */ + def deleteBeforeSnapshot( + persistenceId: String, + snapshotsToKeep: Int, + keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = { + require(snapshotsToKeep >= 1, "must keep at least one snapshot") + require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0") + selectAllSnapshotMetaPs + .flatMap { ps => + val allRows: Source[Row, NotUsed] = snapshotStoreSession.select(ps.bind(persistenceId)) + allRows.zipWithIndex + .takeWhile { + case (row, index) => + if (row.getLong("timestamp") > keepAfterUnixTimestamp) { + true + } else if (index < snapshotsToKeep) { + true + } else { + false + } + } + .map(_._1) + .runWith(Sink.seq) + } + .flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows)) + } + + /** + * Keep N snapshots and delete all older snapshots along. + * + * This operation is much cheaper than including the timestamp because it can use the primary key and limit. + * + * @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots. This can be used to delete events from before the snapshot. + */ + def deleteBeforeSnapshot(persistenceId: String, maxSnapshotsToKeep: Int): Future[Option[SnapshotMetadata]] = { + require(maxSnapshotsToKeep >= 1, "Must keep at least one snapshot") + val snapshots: Future[immutable.Seq[Row]] = selectLatestSnapshotsPs.flatMap { ps => + snapshotStoreSession.select(ps.bind(persistenceId, maxSnapshotsToKeep: JInt)).runWith(Sink.seq) + } + snapshots.flatMap(rows => issueSnapshotDelete(persistenceId, maxSnapshotsToKeep, rows)) + } + + /** + * Delete all events before a sequenceNr for the given persistence id. + * + * WARNING: deleting events is generally discouraged in event sourced systems. + * once deleted the event by tag view can not be re-built + * + * @param persistenceId the persistence id to delete for + * @param toSequenceNr sequence nr (inclusive) to delete up to + */ + def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done] = { + sendToJournal((replyTo: ActorRef) => DeleteMessagesTo(persistenceId, toSequenceNr, replyTo)) + } + + /** + * Deletes all but the last N snapshots and deletes all events before this snapshot + * Does not delete from the tag_views table + * + * WARNING: deleting events is generally discouraged in event sourced systems. + * once deleted the event by tag view can not be re-built + */ + def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int): Future[Done] = { + for { + oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep) + done <- issueDeleteFromSnapshot(oldestSnapshot) + } yield done + } + + /** + * Deletes all events for the given persistence id from before the first after keepAfter. + * If there are not enough snapshots to satisfy nrSnapshotsToKeep then snapshots before + * keepAfter will also be kept. + * + * WARNING: deleting events is generally discouraged in event sourced systems. + * once deleted the event by tag view can not be re-built + */ + def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int, keepAfter: Long): Future[Done] = { + for { + oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep, keepAfter) + done <- issueDeleteFromSnapshot(oldestSnapshot) + } yield done + } + + /** + * See single persistenceId overload for what is done for each persistence id + */ + def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String], nrSnapshotsToKeep: Int): Future[Done] = { + foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep)) + } + + /** + * See single persistenceId overload for what is done for each persistence id + */ + def cleanupBeforeSnapshot( + persistenceIds: immutable.Seq[String], + nrSnapshotsToKeep: Int, + keepAfter: Long): Future[Done] = { + foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep, keepAfter)) + } + + private def issueDeleteFromSnapshot(snapshot: Option[SnapshotMetadata]): Future[Done] = { + snapshot match { + case Some(snapshotMeta) => deleteEventsTo(snapshotMeta.persistenceId, snapshotMeta.sequenceNr) + case None => Future.successful(Done) + } + } + /** * Delete everything related to the given list of `persistenceIds`. All events and snapshots are deleted. */ @@ -74,28 +268,21 @@ final class Cleanup(system: ActorSystem, settings: CleanupSettings) { * Delete all events related to one single `persistenceId`. Snapshots are not deleted. */ def deleteAllEvents(persistenceId: String, neverUsePersistenceIdAgain: Boolean): Future[Done] = { - (journal ? CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)).mapTo[Done] + sendToJournal(CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)) } /** * Delete all snapshots related to the given list of `persistenceIds`. Events are not deleted. */ def deleteAllSnapshots(persistenceIds: immutable.Seq[String]): Future[Done] = { - if (snapshotStore.isDefined) - foreach(persistenceIds, "deleteAllSnapshots", pid => deleteAllSnapshots(pid)) - else - Future.successful(Done) + foreach(persistenceIds, "deleteAllSnapshots", pid => deleteAllSnapshots(pid)) } /** * Delete all snapshots related to one single `persistenceId`. Events are not deleted. */ def deleteAllSnapshots(persistenceId: String): Future[Done] = { - snapshotStore match { - case Some(snapshotStoreRef) => - (snapshotStoreRef ? CassandraSnapshotStore.DeleteAllsnapshots(persistenceId)).mapTo[Done] - case None => Future.successful(Done) - } + sendToSnapshotStore(CassandraSnapshotStore.DeleteAllSnapshots(persistenceId)) } private def foreach( @@ -129,4 +316,31 @@ final class Cleanup(system: ActorSystem, settings: CleanupSettings) { result } + private def sendToSnapshotStore(msg: Any): Future[Done] = { + if (dryRun) { + log.info("dry run: Operation on snapshot store: {}", msg) + Future.successful(Done) + } else { + (snapshotStore ? msg).map(_ => Done) + } + } + + private def sendToJournal(msg: Any): Future[Done] = { + if (dryRun) { + log.info("dry run: Operation on journal: {}", msg) + Future.successful(Done) + } else { + (journal ? msg).map(_ => Done) + } + } + + private def sendToJournal(create: ActorRef => Any): Future[Done] = { + if (dryRun) { + log.info("dry run: Operation on journal: {}", create(ActorRef.noSender)) + Future.successful(Done) + } else { + import akka.pattern.extended.ask + ask(journal, create).map(_ => Done) + } + } } diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala index 154f59f4f..5c078d179 100644 --- a/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala +++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala @@ -18,4 +18,5 @@ class CleanupSettings(config: Config) { val snapshotPlugin: String = config.getString("snapshot-plugin") val operationTimeout: FiniteDuration = config.getDuration("operation-timeout", TimeUnit.MILLISECONDS).millis val logProgressEvery: Int = config.getInt("log-progress-every") + val dryRun: Boolean = config.getBoolean("dry-run") } diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala index b3f040e2e..44db8ce1a 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala @@ -14,7 +14,6 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.control.NonFatal - import akka.Done import akka.actor._ import akka.annotation.InternalApi @@ -31,10 +30,10 @@ import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink -import akka.util.OptionVal import com.datastax.driver.core._ import com.datastax.driver.core.policies.LoggingRetryPolicy import com.datastax.driver.core.utils.Bytes +import akka.util.OptionVal import com.typesafe.config.Config class CassandraSnapshotStore(cfg: Config) @@ -47,7 +46,7 @@ class CassandraSnapshotStore(cfg: Config) val snapshotConfig = new CassandraSnapshotStoreConfig(context.system, cfg) val serialization = SerializationExtension(context.system) - val snapshotDeserializer = new SnapshotDeserializer(context.system) + val snapshotSerialization = new SnapshotSerialization(context.system) implicit val ec: ExecutionContext = context.dispatcher import snapshotConfig._ @@ -106,7 +105,7 @@ class CassandraSnapshotStore(cfg: Config) preparedSelectSnapshotMetadata preparedSelectSnapshotMetadataWithMaxLoadAttemptsLimit - case DeleteAllsnapshots(persistenceId) => + case DeleteAllSnapshots(persistenceId) => val result: Future[Done] = deleteAsync(persistenceId, SnapshotSelectionCriteria(maxSequenceNr = Long.MaxValue)).map(_ => Done) result.pipeTo(sender()) @@ -178,7 +177,7 @@ class CassandraSnapshotStore(cfg: Config) case Some(row) => row.getBytes("snapshot") match { case null => - snapshotDeserializer.deserializeSnapshot(row).map(Snapshot.apply) + snapshotSerialization.deserializeSnapshot(row).map(Snapshot.apply) case bytes => // for backwards compatibility Future.successful(serialization.deserialize(Bytes.getArray(bytes), classOf[Snapshot]).get) @@ -187,7 +186,7 @@ class CassandraSnapshotStore(cfg: Config) } override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - serialize(snapshot).flatMap { ser => + snapshotSerialization.serialize(snapshot).flatMap { ser => // using two separate statements with or without the meta data columns because // then users doesn't have to alter table and add the new columns if they don't use // the meta data feature @@ -196,23 +195,7 @@ class CassandraSnapshotStore(cfg: Config) else preparedWriteSnapshot stmt.flatMap { ps => - val bs = ps.bind() - bs.setString("persistence_id", metadata.persistenceId) - bs.setLong("sequence_nr", metadata.sequenceNr) - bs.setLong("timestamp", metadata.timestamp) - bs.setInt("ser_id", ser.serId) - bs.setString("ser_manifest", ser.serManifest) - bs.setBytes("snapshot_data", ser.serialized) - - // meta data, if any - ser.meta match { - case Some(meta) => - bs.setInt("meta_ser_id", meta.serId) - bs.setString("meta_ser_manifest", meta.serManifest) - bs.setBytes("meta", meta.serialized) - case None => - } - + val bs = CassandraSnapshotStore.prepareSnapshotWrite(ps, metadata, ser) session.executeWrite(bs).map(_ => ()) } } @@ -262,46 +245,6 @@ class CassandraSnapshotStore(cfg: Config) session.underlying().flatMap(_.executeAsync(batch)).map(_ => ()) } - private def serialize(payload: Any): Future[Serialized] = - try { - def serializeMeta(): Option[SerializedMeta] = - // meta data, if any - payload match { - case SnapshotWithMetaData(_, m) => - val m2 = m.asInstanceOf[AnyRef] - val serializer = serialization.findSerializerFor(m2) - val serManifest = Serializers.manifestFor(serializer, m2) - val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) - Some(SerializedMeta(metaBuf, serManifest, serializer.identifier)) - case evt => None - } - - val p: AnyRef = (payload match { - case SnapshotWithMetaData(snap, _) => snap // unwrap - case snap => snap - }).asInstanceOf[AnyRef] - val serializer = serialization.findSerializerFor(p) - val serManifest = Serializers.manifestFor(serializer, p) - serializer match { - case asyncSer: AsyncSerializer => - Serialization.withTransportInformation(context.system.asInstanceOf[ExtendedActorSystem]) { () => - asyncSer.toBinaryAsync(p).map { bytes => - val serPayload = ByteBuffer.wrap(bytes) - Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) - } - } - case _ => - Future { - // Serialization.serialize adds transport info - val serPayload = ByteBuffer.wrap(serialization.serialize(p).get) - Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) - } - } - - } catch { - case NonFatal(e) => Future.failed(e) - } - private def metadata( snapshotMetaPs: PreparedStatement, persistenceId: String, @@ -326,13 +269,17 @@ class CassandraSnapshotStore(cfg: Config) private case object Init sealed trait CleanupCommand - final case class DeleteAllsnapshots(persistenceId: String) extends CleanupCommand + final case class DeleteAllSnapshots(persistenceId: String) extends CleanupCommand - private case class Serialized(serialized: ByteBuffer, serManifest: String, serId: Int, meta: Option[SerializedMeta]) + final case class Serialized(serialized: ByteBuffer, serManifest: String, serId: Int, meta: Option[SerializedMeta]) - private case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int) + final case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int) - class SnapshotDeserializer(system: ActorSystem) { + /** + * INTERNAL API + */ + @InternalApi + private[akka] class SnapshotSerialization(system: ActorSystem) { private val serialization = SerializationExtension(system) @@ -347,6 +294,46 @@ class CassandraSnapshotStore(cfg: Config) b } + def serialize(payload: Any)(implicit ec: ExecutionContext): Future[Serialized] = + try { + def serializeMeta(): Option[SerializedMeta] = + // meta data, if any + payload match { + case SnapshotWithMetaData(_, m) => + val m2 = m.asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(m2) + val serManifest = Serializers.manifestFor(serializer, m2) + val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) + Some(SerializedMeta(metaBuf, serManifest, serializer.identifier)) + case _ => None + } + + val p: AnyRef = (payload match { + case SnapshotWithMetaData(snap, _) => snap // unwrap + case snap => snap + }).asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(p) + val serManifest = Serializers.manifestFor(serializer, p) + serializer match { + case asyncSer: AsyncSerializer => + Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => + asyncSer.toBinaryAsync(p).map { bytes => + val serPayload = ByteBuffer.wrap(bytes) + Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) + } + } + case _ => + Future { + // Serialization.serialize adds transport info + val serPayload = ByteBuffer.wrap(serialization.serialize(p).get) + Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) + } + } + + } catch { + case NonFatal(e) => Future.failed(e) + } + def deserializeSnapshot(row: Row)(implicit ec: ExecutionContext): Future[Any] = try { @@ -403,4 +390,31 @@ class CassandraSnapshotStore(cfg: Config) case NonFatal(e) => Future.failed(e) } } + + /** + * INTERNAL API + */ + private[akka] def prepareSnapshotWrite( + ps: PreparedStatement, + metadata: SnapshotMetadata, + ser: Serialized): BoundStatement = { + val bs = ps + .bind() + .setString("persistence_id", metadata.persistenceId) + .setLong("sequence_nr", metadata.sequenceNr) + .setLong("timestamp", metadata.timestamp) + .setInt("ser_id", ser.serId) + .setString("ser_manifest", ser.serManifest) + .setBytes("snapshot_data", ser.serialized) + + // meta data, if any + ser.meta match { + case Some(meta) => + bs.setInt("meta_ser_id", meta.serId) + .setString("meta_ser_manifest", meta.serManifest) + .setBytes("meta", meta.serialized) + case None => + bs + } + } } diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraStatements.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraStatements.scala index 248bb41bc..01f5afc80 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraStatements.scala @@ -66,6 +66,13 @@ trait CassandraStatements { AND sequence_nr <= ? """ + def deleteSnapshotsBefore = + s""" + DELETE FROM ${tableName} + WHERE persistence_id = ? + AND sequence_nr < ? + """ + def selectSnapshot = s""" SELECT * FROM ${tableName} WHERE persistence_id = ? AND @@ -80,6 +87,19 @@ trait CassandraStatements { ${limit.map(l => s"LIMIT ${l}").getOrElse("")} """ + def selectLatestSnapshotMeta = + s"""SELECT persistence_id, sequence_nr, timestamp FROM ${tableName} WHERE + persistence_id = ? + ORDER BY sequence_nr DESC + LIMIT ? + """ + + def selectAllSnapshotMeta = + s"""SELECT sequence_nr, timestamp FROM ${tableName} WHERE + persistence_id = ? + ORDER BY sequence_nr DESC + """ + private def tableName = s"${snapshotConfig.keyspace}.${snapshotConfig.table}" /** diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala index 4750b0dfa..e8bedaf00 100644 --- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala @@ -137,6 +137,15 @@ abstract class CassandraSpec( println(s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong( "sequence_nr")}""") }) + + println("snapshots") + c.execute(s"select * from ${snapshotName}.snapshots") + .asScala + .foreach(row => { + println( + s"""Row:${row.getString("persistence_id")}, ${row.getLong("sequence_nr")}, ${row.getLong("timestamp")}""") + }) + } keyspaces().foreach { keyspace => c.execute(s"drop keyspace if exists $keyspace") diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala index 27b09952f..1770773fd 100644 --- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala @@ -6,17 +6,18 @@ package akka.persistence.cassandra.cleanup import akka.actor.ActorRef import akka.actor.Props -import akka.persistence.PersistentActor -import akka.persistence.SaveSnapshotSuccess -import akka.persistence.SnapshotOffer +import akka.persistence.{ PersistentActor, SaveSnapshotSuccess, SnapshotMetadata, SnapshotOffer } import akka.persistence.cassandra.CassandraSpec +import akka.persistence.cassandra.query.DirectWriting +import akka.stream.scaladsl.Sink import com.typesafe.config.ConfigFactory object CleanupSpec { val config = ConfigFactory.parseString(s""" - akka.loglevel = INFO + akka.loglevel = DEBUG akka.persistence.cassandra.cleanup { log-progress-every = 2 + dry-run = false } """) @@ -61,8 +62,7 @@ object CleanupSpec { } } -class CleanupSpec extends CassandraSpec(CleanupSpec.config) { - +class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { import CleanupSpec._ "Cassandra cleanup" must { @@ -156,6 +156,143 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) { expectMsg(RecoveredState("", Nil, 0L)) } + "delete some for one persistenceId" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 8).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + cleanup.deleteEventsTo(pid, 5).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("", List("evt-6", "evt-7", "evt-8"), 8L)) + } + + "clean up before latest snapshot for one persistence id" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + cleanup.cleanupBeforeSnapshot(pid, 1).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("snap-6", List("evt-7", "evt-8", "evt-9"), 9L)) + + // check old snapshots are done + val snapshots = allSnapshots(pid) + snapshots.size shouldEqual 1 + snapshots.head.sequenceNr shouldEqual 6 + // check old events are gone + queries + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_.event.toString) + .runWith(Sink.seq) + .futureValue shouldEqual List("evt-7", "evt-8", "evt-9") + } + + "clean up before snapshot including timestamp that results in all events kept for one persistence id" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + // a long way in the past so keep everything + cleanup.cleanupBeforeSnapshot(pid, 1, 100).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("snap-6", List("evt-7", "evt-8", "evt-9"), 9L)) + + // check old snapshots are done + val snapshots = allSnapshots(pid) + snapshots.size shouldEqual 2 + // check old events are kept due to timestamp, all events before the oldest snapshot are still deleted + queries + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_.event.toString) + .runWith(Sink.seq) + .futureValue shouldEqual List("evt-4", "evt-5", "evt-6", "evt-7", "evt-8", "evt-9") + } + + "clean up before snapshot including timestamp for one persistence id" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + // timestamp shouldn't result in any more events/snapshtos kept apart from the last one + cleanup.cleanupBeforeSnapshot(pid, 1, System.currentTimeMillis()).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("snap-6", List("evt-7", "evt-8", "evt-9"), 9L)) + + // check old snapshots are done + val snapshots = allSnapshots(pid) + snapshots.size shouldEqual 1 + // check old events are kept due to timestamp + queries + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_.event.toString) + .runWith(Sink.seq) + .futureValue shouldEqual List("evt-7", "evt-8", "evt-9") + } + "delete all for several persistenceId" in { val pidA = nextPid val pidB = nextPid @@ -242,4 +379,130 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) { } + "Time and snapshot based cleanup" must { + "keep the correct number of snapshots" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-3", Nil, 3L)) + + allSnapshots(pid) shouldEqual List(SnapshotMetadata(pid, 2, 2000), SnapshotMetadata(pid, 3, 3000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000)) + } + "keep the all snapshots if fewer than requested without timestamp" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 4).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-3", Nil, 3L)) + + allSnapshots(pid) shouldEqual List( + SnapshotMetadata(pid, 1, 1000), + SnapshotMetadata(pid, 2, 2000), + SnapshotMetadata(pid, 3, 3000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000)) + } + "keep the all snapshots if fewer than requested with timestamp" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 4, 1500).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-3", Nil, 3L)) + + allSnapshots(pid) shouldEqual List( + SnapshotMetadata(pid, 1, 1000), + SnapshotMetadata(pid, 2, 2000), + SnapshotMetadata(pid, 3, 3000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000)) + } + + "work without timestamp when there are no snapshots" in { + val cleanup = new Cleanup(system) + val pid = nextPid + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2).futureValue + oldestSnapshot shouldEqual None + } + + "work with timestamp when there are no snapshots" in { + val cleanup = new Cleanup(system) + val pid = nextPid + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2, 100).futureValue + oldestSnapshot shouldEqual None + } + + "don't delete snapshots newer than the oldest date" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 4, 4000), "snapshot-4").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2, 1500).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-4", Nil, 4L)) + + // here 3 snapshots are kept as snapshot 2 is newer than 1500 + allSnapshots(pid) shouldEqual List( + SnapshotMetadata(pid, 2, 2000), + SnapshotMetadata(pid, 3, 3000), + SnapshotMetadata(pid, 4, 4000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000)) + } + "keep snapshots older than the oldest date to meet snapshotsToKeep" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 4, 4000), "snapshot-4").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2, 5000).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-4", Nil, 4L)) + + // 2 snapshots are kept that are both older than the keepAfter as 2 should be kept + allSnapshots(pid) shouldEqual List(SnapshotMetadata(pid, 3, 3000), SnapshotMetadata(pid, 4, 4000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 3, 3000)) + } + } + + private def allSnapshots(pid: String): Seq[SnapshotMetadata] = { + import scala.collection.JavaConverters._ + journalSession + .execute(s"select * from ${snapshotName}.snapshots where persistence_id = '${pid}' order by sequence_nr") + .asScala + .map(row => + SnapshotMetadata(row.getString("persistence_id"), row.getLong("sequence_nr"), row.getLong("timestamp"))) + .toList + } + } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala b/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala index 4a4aa69d6..e9b0a7698 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala @@ -7,15 +7,18 @@ package akka.persistence.cassandra.query import java.nio.ByteBuffer import akka.actor.ActorSystem -import akka.persistence.PersistentRepr +import akka.persistence.{ PersistentRepr, SnapshotMetadata } import akka.persistence.cassandra.journal.{ CassandraJournalConfig, CassandraStatements, Hour, TimeBucket } +import akka.persistence.cassandra.snapshot +import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStore, CassandraSnapshotStoreConfig } +import akka.persistence.cassandra.snapshot.CassandraSnapshotStore.SnapshotSerialization import akka.serialization.SerializationExtension import com.datastax.driver.core.utils.UUIDs import org.scalatest.{ BeforeAndAfterAll, Suite } -import scala.concurrent.Await + +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.Try - import akka.serialization.Serializers trait DirectWriting extends BeforeAndAfterAll { @@ -26,6 +29,9 @@ trait DirectWriting extends BeforeAndAfterAll { private lazy val writePluginConfig = new CassandraJournalConfig(system, system.settings.config.getConfig("cassandra-journal")) + private lazy val snapshotPluginConfig = + new CassandraSnapshotStoreConfig(system, system.settings.config.getConfig("cassandra-snapshot-store")) + private lazy val session = { Await.result(writePluginConfig.sessionProvider.connect()(system.dispatcher), 5.seconds) } @@ -44,6 +50,22 @@ trait DirectWriting extends BeforeAndAfterAll { } session.prepare(writeStatements.writeMessage(withMeta = false)) } + private lazy val snapshotStatements = new snapshot.CassandraStatements { + override def snapshotConfig: CassandraSnapshotStoreConfig = snapshotPluginConfig + } + + private lazy val snapshotSerialization = new SnapshotSerialization(system) + private lazy val preparedWriteSnapshot = session.prepare(snapshotStatements.writeSnapshot(withMeta = false)) + + def writeTestSnapshot(snapshotMeta: SnapshotMetadata, snapshot: AnyRef): Future[Unit] = { + snapshotSerialization + .serialize(snapshot)(system.dispatcher) + .map { ser => + val bound = CassandraSnapshotStore.prepareSnapshotWrite(preparedWriteSnapshot, snapshotMeta, ser) + session.execute(bound) + () + }(system.dispatcher) + } protected def writeTestEvent(persistent: PersistentRepr): Unit = { val event = persistent.payload.asInstanceOf[AnyRef] diff --git a/docs/src/main/paradox/cleanup.md b/docs/src/main/paradox/cleanup.md new file mode 100644 index 000000000..ea0892a94 --- /dev/null +++ b/docs/src/main/paradox/cleanup.md @@ -0,0 +1,31 @@ +# Database Cleanup + +@@@ warning + +When running any operation for a persistence id the actor with that persistence id must not be running! + +@@@ + +If possible, it is best to keep all events in an event sourced system. That way new [projections](https://doc.akka.io/docs/akka-projection/current/index.html) +and the `tag_view` table can be re-built if it is corrupted (e.g. due to a two persistence ids writing events from two nodes in a split brain). + +In come cases keeping all events is not possible. `EventSourcedBehavior`s can automatically snapshot state and delete events as described in the [Akka docs](https://doc.akka.io/docs/akka/current/typed/persistence-snapshot.html#snapshot-deletion). +Snapshotting is useful even if events aren't deleted as it speeds up recovery. + +The @apidoc[akka.persistence.cassandra.cleanup.Cleanup] tool can retrospectively clean up the journal. Its operations include: + +* Delete all events for a persistence id +* Delete all events and tagged events for the `eventsByTag` query +* Delete all snapshots for a persistence id +* Delete all snapshots and events for a persistence id keeping the latest N snapshots and all the events after them. + +The cleanup tool can be combined with the @ref[query plugin](./read-journal.md) which has a query to get all persistence ids. + + +Scala +: @@snip [snapshot-keyspace](/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala) { #cleanup } + +Java +: @@snip [snapshot-keyspace](/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java) { #cleanup } + +By default, all operations only print what they were going to do. Once you're happy with what the cleanup tool is going todo set `akka.persistence.cassandra.dry-run = false` diff --git a/docs/src/main/paradox/events-by-tag.md b/docs/src/main/paradox/events-by-tag.md index c8a726b0e..cf04cd2b2 100644 --- a/docs/src/main/paradox/events-by-tag.md +++ b/docs/src/main/paradox/events-by-tag.md @@ -139,6 +139,24 @@ be taken not to have batches that will be rejected by Cassandra. Two other cases * Periodically: By default 250ms. To prevent eventsByTag queries being too out of date. * When a starting a new timebucket, which translates to a new partition in Cassandra, the events for the old timebucket are written. +## Cleanup of tag_views table + +By default the tag_views table keeps tagged events indefinitely, even when the original events have been removed. +Depending on the volume of events this may not be suitable for production. + +Before going live decide a time to live (TTL) and, if small enough, consider using the [Time Window Compaction Strategy](http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html). +See `events-by-tag.time-to-live` in reference.conf for how to set this. + +@@@ warning + +The TTL must be greater than any expected delay in using the tagged events to build a read side view +otherwise they'll be deleted before being read. + +@@@ + +The @apidoc[akka.persistence.cassandra.cleanup.Cleanup] tool can also be used for deleting from the `tag_views` +table. See @ref[Database Cleanup](./cleanup.md) for more details. + ## How it works The following section describes more about how the events by tag query work, it is not required knowledge diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md index 21b100132..7c1dbf7ec 100644 --- a/docs/src/main/paradox/index.md +++ b/docs/src/main/paradox/index.md @@ -14,5 +14,6 @@ The Akka Persistence Cassandra plugin allows for using [Apache Cassandra](https: * [Serialization](serialization.md) * [scylladb](scylladb.md) * [migrations](migrations.md) +* [Cleanup](cleanup.md) @@@ diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md index 8eb58bcdb..06f7fbd1a 100644 --- a/docs/src/main/paradox/journal.md +++ b/docs/src/main/paradox/journal.md @@ -119,4 +119,4 @@ CREATE TABLE IF NOT EXISTS akka.metadata( The tool `akka.persistence.cassandra.cleanup.Cleanup` can be used for deleting all events and/or snapshots given list of `persistenceIds` without using persistent actors. It's important that the actors with corresponding -`persistenceId` are not running at the same time as using the tool. +`persistenceId` are not running at the same time as using the tool. See @ref[Database Cleanup](./cleanup.md) for more details. diff --git a/docs/src/main/paradox/snapshots.md b/docs/src/main/paradox/snapshots.md index b607035cc..6c7ee186a 100644 --- a/docs/src/main/paradox/snapshots.md +++ b/docs/src/main/paradox/snapshots.md @@ -49,4 +49,4 @@ CREATE TABLE IF NOT EXISTS akka_snapshot.snapshots ( The tool `akka.persistence.cassandra.cleanup.Cleanup` can be used for deleting all events and/or snapshots given list of `persistenceIds` without using persistent actors. It's important that the actors with corresponding -`persistenceId` are not running at the same time as using the tool. +`persistenceId` are not running at the same time as using the tool. See @ref[Database Cleanup](./cleanup.md) for more details. diff --git a/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java new file mode 100644 index 000000000..097ecba29 --- /dev/null +++ b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java @@ -0,0 +1,44 @@ +package jdoc.cleanup; + +import akka.Done; +import akka.actor.ActorSystem; +import akka.persistence.cassandra.cleanup.Cleanup; +import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; +import akka.persistence.query.PersistenceQuery; +import scala.compat.java8.FutureConverters; + +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; + +public class CleanupDocExample { + + + + public static void example() { + + ActorSystem system = null; + + //#cleanup + CassandraReadJournal queries = PersistenceQuery.get(system).getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier()); + Cleanup cleanup = new Cleanup(system); + + int persistenceIdParallelism = 10; + + + // forall persistence ids, keep two snapshots and delete all events before the oldest kept snapshot + queries.currentPersistenceIds().mapAsync(persistenceIdParallelism, pid -> FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid, 2))).run(system); + + // forall persistence ids, keep everything after the provided unix timestamp, if there aren't enough snapshots after this time + // go back before the timestamp to find snapshot to delete before + // this operation is more expensive that the one above + ZonedDateTime keepAfter = ZonedDateTime.now().minus(1, ChronoUnit.MONTHS); + queries + .currentPersistenceIds() + .mapAsync(persistenceIdParallelism, pid -> FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant().toEpochMilli()))) + .run(system); + + //#cleanup + + + } +} diff --git a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala new file mode 100644 index 000000000..1a68ec92b --- /dev/null +++ b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala @@ -0,0 +1,36 @@ +package doc.cleanup + +import java.time.{ LocalDate, ZonedDateTime } +import java.time.temporal.ChronoUnit + +import akka.actor.ActorSystem +import akka.persistence.cassandra.cleanup.Cleanup +import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal +import akka.persistence.query.PersistenceQuery + +object CleanupDocExample { + + implicit val system: ActorSystem = ??? + + //#cleanup + val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) + val cleanup = new Cleanup(system) + + // how many persistence ids to operate on in parallel + val persistenceIdParallelism = 10 + + // forall persistence ids, keep two snapshots and delete all events before the oldest kept snapshot + queries.currentPersistenceIds().mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2)).run() + + // forall persistence ids, keep everything after the provided unix timestamp, if there aren't enough snapshots after this time + // go back before the timestamp to find snapshot to delete before + // this operation is more expensive that the one above + val keepAfter = ZonedDateTime.now().minus(1, ChronoUnit.MONTHS); + queries + .currentPersistenceIds() + .mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant.toEpochMilli)) + .run() + + //#cleanup + +}