diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 079a1ecb1..8f4a919f4 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -505,6 +505,10 @@ akka.persistence.cassandra { # Log progress after this number of delete operations. Can be set to 1 to log # progress of each operation. log-progress-every = 100 + + # By default no deletes are executed and are instead logged at INFO. Set this to true + # to actually do the deletes + dry-run = true } # Configuration of the Reconciler tool. diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala index 0f53caa3d..359cd0ea9 100644 --- a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala +++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala @@ -4,22 +4,28 @@ package akka.persistence.cassandra.cleanup +import java.lang.{ Integer => JInt, Long => JLong } + import scala.collection.immutable import scala.concurrent.Future import scala.util.Failure import scala.util.Success - -import akka.Done -import akka.actor.ClassicActorSystemProvider +import akka.{ Done, NotUsed } +import akka.actor.{ ActorRef, ClassicActorSystemProvider } import akka.annotation.ApiMayChange import akka.event.Logging import akka.pattern.ask -import akka.persistence.Persistence +import akka.persistence.JournalProtocol.DeleteMessagesTo +import akka.persistence.{ Persistence, SnapshotMetadata } +import akka.persistence.cassandra.PluginSettings import akka.persistence.cassandra.journal.CassandraJournal import akka.persistence.cassandra.reconciler.Reconciliation import akka.persistence.cassandra.reconciler.ReconciliationSettings -import akka.persistence.cassandra.snapshot.CassandraSnapshotStore +import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStatements, CassandraSnapshotStore } +import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } +import akka.stream.scaladsl.{ Sink, Source } import akka.util.Timeout +import com.datastax.oss.driver.api.core.cql.Row /** * Tool for deleting all events and/or snapshots for a given list of `persistenceIds` without using persistent actors. @@ -42,18 +48,196 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu systemProvider, new CleanupSettings(systemProvider.classicSystem.settings.config.getConfig("akka.persistence.cassandra.cleanup"))) - private val system = systemProvider.classicSystem + private implicit val system = systemProvider.classicSystem import settings._ import system.dispatcher private val log = Logging(system, getClass) - private val journal = Persistence(system).journalFor(pluginLocation + ".journal") - private lazy val snapshotStore = Persistence(system).snapshotStoreFor(pluginLocation + ".snapshot") + + // operations on journal, snapshotStore and tagViews should be only be done when dry-run = false + private val journal: ActorRef = Persistence(system).journalFor(pluginLocation + ".journal") + private lazy val snapshotStore: ActorRef = Persistence(system).snapshotStoreFor(pluginLocation + ".snapshot") private lazy val tagViewsReconciliation = new Reconciliation( system, new ReconciliationSettings(system.settings.config.getConfig(pluginLocation + ".reconciler"))) + private implicit val askTimeout: Timeout = operationTimeout + private lazy val session: CassandraSession = + CassandraSessionRegistry(system).sessionFor(pluginLocation) + + private lazy val pluginSettings = PluginSettings(system, system.settings.config.getConfig(pluginLocation)) + private lazy val statements = new CassandraSnapshotStatements(pluginSettings.snapshotSettings) + private lazy val selectLatestSnapshotsPs = session.prepare(statements.selectLatestSnapshotMeta) + private lazy val selectAllSnapshotMetaPs = session.prepare(statements.selectAllSnapshotMeta) + + if (dryRun) { + log.info("Cleanup running in dry run mode. No operations will be executed against the database, only logged") + } + + private def issueSnapshotDelete( + persistenceId: String, + maxToKeep: Long, + rows: Seq[Row]): Future[Option[SnapshotMetadata]] = { + log.debug("issueSnapshotDelete [{}] [{}] [{}]", persistenceId, maxToKeep, rows.size) + rows match { + case Nil => + log.debug("persistence id [{}] has 0 snapshots, no deletes issued", persistenceId) + Future.successful(None) + case fewer if fewer.size < maxToKeep => + // no delete required, return the oldest snapshot + log.debug("Fewer than snapshots than requested for persistence id [{}], no deletes issued", persistenceId) + Future.successful( + Some(SnapshotMetadata(persistenceId, fewer.last.getLong("sequence_nr"), fewer.last.getLong("timestamp")))) + case more => + if (log.isDebugEnabled) { + log.debug( + "Latest {} snapshots for persistence id [{}] range from {} to {}", + maxToKeep, + persistenceId, + more.head.getLong("sequence_nr"), + more.last.getLong("sequence_nr")) + } + val result = + SnapshotMetadata(persistenceId, more.last.getLong("sequence_nr"), more.last.getLong("timestamp")) + if (dryRun) { + log.info( + "dry run: CQL: [{}] persistence_id: [{}] sequence_nr [{}]", + statements.deleteSnapshotsBefore, + persistenceId, + result.sequenceNr) + Future.successful(Some(result)) + } else { + session + .executeWrite(statements.deleteSnapshotsBefore, persistenceId, result.sequenceNr: JLong) + .map(_ => Some(result)) + } + + } + } + + /** + * Keep all snapshots that occurred after `keepAfter`. + * If fewer than `snapshotsToKeep` occurred after `keepAfter` at least that many + * are kept. Setting this to 1 ensures that at least snapshot is kept even if it + * is older than the `keepAfter` + * + * If only N number of snapshot should be kept prefer overload without timestamp + * as it is more efficient. + * + * The returned snapshot metadata can be used to issue deletes for events older than the oldest + * snapshot. + * + * @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots + * + */ + def deleteBeforeSnapshot( + persistenceId: String, + snapshotsToKeep: Int, + keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = { + require(snapshotsToKeep >= 1, "must keep at least one snapshot") + require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0") + selectAllSnapshotMetaPs + .flatMap { ps => + val allRows: Source[Row, NotUsed] = session.select(ps.bind(persistenceId)) + allRows.zipWithIndex + .takeWhile { + case (row, index) => + if (row.getLong("timestamp") > keepAfterUnixTimestamp) { + true + } else if (index < snapshotsToKeep) { + true + } else { + false + } + } + .map(_._1) + .runWith(Sink.seq) + } + .flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows)) + } + + /** + * Keep N snapshots and delete all older snapshots along. + * + * This operation is much cheaper than including the timestamp because it can use the primary key and limit. + * + * @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots. This can be used to delete events from before the snapshot. + */ + def deleteBeforeSnapshot(persistenceId: String, maxSnapshotsToKeep: Int): Future[Option[SnapshotMetadata]] = { + require(maxSnapshotsToKeep >= 1, "Must keep at least one snapshot") + val snapshots: Future[immutable.Seq[Row]] = selectLatestSnapshotsPs.flatMap { ps => + session.select(ps.bind(persistenceId, maxSnapshotsToKeep: JInt)).runWith(Sink.seq) + } + snapshots.flatMap(rows => issueSnapshotDelete(persistenceId, maxSnapshotsToKeep, rows)) + } + + /** + * Delete all events before a sequenceNr for the given persistence id. + * + * WARNING: deleting events is generally discouraged in event sourced systems. + * once deleted the event by tag view can not be re-built + * + * @param persistenceId the persistence id to delete for + * @param toSequenceNr sequence nr (inclusive) to delete up to + */ + def deleteEventsTo(persistenceId: String, toSequenceNr: Long): Future[Done] = { + sendToJournal(replyTo => DeleteMessagesTo(persistenceId, toSequenceNr, replyTo)) + } + + /** + * Deletes all but the last N snapshots and deletes all events before this snapshot + * Does not delete from the tag_views table + * + * WARNING: deleting events is generally discouraged in event sourced systems. + * once deleted the event by tag view can not be re-built + */ + def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int): Future[Done] = { + for { + oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep) + done <- issueDeleteFromSnapshot(oldestSnapshot) + } yield done + } + + /** + * Deletes all events for the given persistence id from before the first after keepAfter. + * If there are not enough snapshots to satisfy nrSnapshotsToKeep then snapshots before + * keepAfter will also be kept. + * + * WARNING: deleting events is generally discouraged in event sourced systems. + * once deleted the event by tag view can not be re-built + */ + def cleanupBeforeSnapshot(persistenceId: String, nrSnapshotsToKeep: Int, keepAfter: Long): Future[Done] = { + for { + oldestSnapshot <- deleteBeforeSnapshot(persistenceId, nrSnapshotsToKeep, keepAfter) + done <- issueDeleteFromSnapshot(oldestSnapshot) + } yield done + } + + /** + * See single persistenceId overload for what is done for each persistence id + */ + def cleanupBeforeSnapshot(persistenceIds: immutable.Seq[String], nrSnapshotsToKeep: Int): Future[Done] = { + foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep)) + } + + /** + * See single persistenceId overload for what is done for each persistence id + */ + def cleanupBeforeSnapshot( + persistenceIds: immutable.Seq[String], + nrSnapshotsToKeep: Int, + keepAfter: Long): Future[Done] = { + foreach(persistenceIds, "cleanupBeforeSnapshot", pid => cleanupBeforeSnapshot(pid, nrSnapshotsToKeep, keepAfter)) + } + + private def issueDeleteFromSnapshot(snapshot: Option[SnapshotMetadata]): Future[Done] = { + snapshot match { + case Some(snapshotMeta) => deleteEventsTo(snapshotMeta.persistenceId, snapshotMeta.sequenceNr) + case None => Future.successful(Done) + } + } + /** * Delete everything related to the given list of `persistenceIds`. All events, tagged events, and * snapshots are deleted. @@ -85,7 +269,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu * Delete all events related to one single `persistenceId`. Snapshots are not deleted. */ def deleteAllEvents(persistenceId: String, neverUsePersistenceIdAgain: Boolean): Future[Done] = { - (journal ? CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)).mapTo[Done] + sendToJournal(CassandraJournal.DeleteAllEvents(persistenceId, neverUsePersistenceIdAgain)) } /** @@ -104,7 +288,14 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu tagViewsReconciliation .tagsForPersistenceId(persistenceId) .flatMap { tags => - Future.sequence(tags.map(tag => tagViewsReconciliation.deleteTagViewForPersistenceIds(Set(persistenceId), tag))) + Future.sequence(tags.map { tag => + if (dryRun) { + log.info("dry run. Delete [{}] tag view for persistence id: [{}]", tag, persistenceId) + Future.successful(Done) + } else { + tagViewsReconciliation.deleteTagViewForPersistenceIds(Set(persistenceId), tag) + } + }) } .map(_ => Done) } @@ -120,7 +311,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu * Delete all snapshots related to one single `persistenceId`. Events are not deleted. */ def deleteAllSnapshots(persistenceId: String): Future[Done] = { - (snapshotStore ? CassandraSnapshotStore.DeleteAllsnapshots(persistenceId)).mapTo[Done] + sendToSnapshotStore(CassandraSnapshotStore.DeleteAllSnapshots(persistenceId)) } private def foreach( @@ -154,4 +345,31 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu result } + private def sendToSnapshotStore(msg: Any): Future[Done] = { + if (dryRun) { + log.info("dry run: Operation on snapshot store: {}", msg) + Future.successful(Done) + } else { + (snapshotStore ? msg).map(_ => Done) + } + } + + private def sendToJournal(msg: Any): Future[Done] = { + if (dryRun) { + log.info("dry run: Operation on journal: {}", msg) + Future.successful(Done) + } else { + (journal ? msg).map(_ => Done) + } + } + + private def sendToJournal(create: ActorRef => Any): Future[Done] = { + if (dryRun) { + log.info("dry run: Operation on journal: {}", create(ActorRef.noSender)) + Future.successful(Done) + } else { + import akka.pattern.extended.ask + ask(journal, create).map(_ => Done) + } + } } diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala index 053892012..245f03e95 100644 --- a/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala +++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/CleanupSettings.scala @@ -17,4 +17,5 @@ class CleanupSettings(config: Config) { val pluginLocation: String = config.getString("plugin-location") val operationTimeout: FiniteDuration = config.getDuration("operation-timeout", TimeUnit.MILLISECONDS).millis val logProgressEvery: Int = config.getInt("log-progress-every") + val dryRun: Boolean = config.getBoolean("dry-run") } diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala index ab2d325a5..3aadcdefe 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala @@ -65,6 +65,13 @@ import akka.persistence.cassandra.FutureDone AND sequence_nr <= ? """ + def deleteSnapshotsBefore = + s""" + DELETE FROM ${tableName} + WHERE persistence_id = ? + AND sequence_nr < ? + """ + def selectSnapshot = s""" SELECT * FROM ${tableName} WHERE persistence_id = ? AND @@ -79,6 +86,19 @@ import akka.persistence.cassandra.FutureDone ${limit.map(l => s"LIMIT ${l}").getOrElse("")} """ + def selectLatestSnapshotMeta = + s"""SELECT persistence_id, sequence_nr, timestamp FROM ${tableName} WHERE + persistence_id = ? + ORDER BY sequence_nr DESC + LIMIT ? + """ + + def selectAllSnapshotMeta = + s"""SELECT sequence_nr, timestamp FROM ${tableName} WHERE + persistence_id = ? + ORDER BY sequence_nr DESC + """ + private def tableName = s"${snapshotSettings.keyspace}.${snapshotSettings.table}" /** diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala index 3bd5b45e5..c5a148eac 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala @@ -16,9 +16,7 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.control.NonFatal -import akka.Done import akka.actor._ -import akka.annotation.InternalApi import akka.pattern.pipe import akka.persistence._ import akka.persistence.cassandra._ @@ -30,7 +28,7 @@ import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import akka.util.OptionVal +import akka.util.{ unused, OptionVal } import com.datastax.oss.driver.api.core.cql._ import com.datastax.oss.protocol.internal.util.Bytes import com.typesafe.config.Config @@ -43,7 +41,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi /** * INTERNAL API */ -@InternalApi private[akka] class CassandraSnapshotStore(cfg: Config, cfgPath: String) +@InternalApi private[akka] class CassandraSnapshotStore(@unused cfg: Config, cfgPath: String) extends SnapshotStore with ActorLogging { @@ -57,7 +55,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi private val settings = new PluginSettings(context.system, sharedConfig) private val snapshotSettings = settings.snapshotSettings private val serialization = SerializationExtension(context.system) - private val snapshotDeserializer = new SnapshotDeserializer(context.system) + private val snapshotSerialization = new SnapshotSerialization(context.system) private val statements = new CassandraStatements(settings) import statements.snapshotStatements._ @@ -96,7 +94,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi preparedSelectSnapshotMetadataWithMaxLoadAttemptsLimit log.debug("Initialized") - case DeleteAllsnapshots(persistenceId) => + case DeleteAllSnapshots(persistenceId) => val result: Future[Done] = deleteAsync(persistenceId, SnapshotSelectionCriteria(maxSequenceNr = Long.MaxValue)).map(_ => Done) result.pipeTo(sender()) @@ -171,7 +169,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi case Some(row) => row.getByteBuffer("snapshot") match { case null => - snapshotDeserializer.deserializeSnapshot(row) + snapshotSerialization.deserializeSnapshot(row) case bytes => // for backwards compatibility val payload = serialization.deserialize(Bytes.getArray(bytes), classOf[Snapshot]).get.data @@ -181,7 +179,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi } override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - serialize(snapshot, metadata.metadata).flatMap { ser => + snapshotSerialization.serialize(snapshot, metadata.metadata).flatMap { ser => // using two separate statements with or without the meta data columns because // then users doesn't have to alter table and add the new columns if they don't use // the meta data feature @@ -190,26 +188,8 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi else preparedWriteSnapshot stmt.flatMap { ps => - val bs = ps - .bind() - .setString("persistence_id", metadata.persistenceId) - .setLong("sequence_nr", metadata.sequenceNr) - .setLong("timestamp", metadata.timestamp) - .setInt("ser_id", ser.serId) - .setString("ser_manifest", ser.serManifest) - .setByteBuffer("snapshot_data", ser.serialized) - - // meta data, if any - val finished = ser.meta match { - case Some(meta) => - bs.setInt("meta_ser_id", meta.serId) - .setString("meta_ser_manifest", meta.serManifest) - .setByteBuffer("meta", meta.serialized) - case None => - bs - } - - session.executeWrite(finished.setExecutionProfileName(snapshotSettings.writeProfile)).map(_ => ()) + val bound = CassandraSnapshotStore.prepareSnapshotWrite(ps, metadata, ser) + session.executeWrite(bound.setExecutionProfileName(snapshotSettings.writeProfile)).map(_ => ()) } } @@ -267,40 +247,6 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi session.underlying().flatMap(_.executeAsync(batch.build()).toScala).map(_ => ()) } - private def serialize(payload: Any, meta: Option[Any]): Future[Serialized] = - try { - def serializeMeta(): Option[SerializedMeta] = - meta.map { m => - val m2 = m.asInstanceOf[AnyRef] - val serializer = serialization.findSerializerFor(m2) - val serManifest = Serializers.manifestFor(serializer, m2) - val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) - SerializedMeta(metaBuf, serManifest, serializer.identifier) - } - - val p: AnyRef = payload.asInstanceOf[AnyRef] - val serializer = serialization.findSerializerFor(p) - val serManifest = Serializers.manifestFor(serializer, p) - serializer match { - case asyncSer: AsyncSerializer => - Serialization.withTransportInformation(context.system.asInstanceOf[ExtendedActorSystem]) { () => - asyncSer.toBinaryAsync(p).map { bytes => - val serPayload = ByteBuffer.wrap(bytes) - Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) - } - } - case _ => - Future { - // Serialization.serialize adds transport info - val serPayload = ByteBuffer.wrap(serialization.serialize(p).get) - Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) - } - } - - } catch { - case NonFatal(e) => Future.failed(e) - } - private def metadata( snapshotMetaPs: PreparedStatement, persistenceId: String, @@ -343,15 +289,19 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi private case object Init sealed trait CleanupCommand - final case class DeleteAllsnapshots(persistenceId: String) extends CleanupCommand + final case class DeleteAllSnapshots(persistenceId: String) extends CleanupCommand - private case class Serialized(serialized: ByteBuffer, serManifest: String, serId: Int, meta: Option[SerializedMeta]) + final case class Serialized(serialized: ByteBuffer, serManifest: String, serId: Int, meta: Option[SerializedMeta]) - private case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int) + final case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int) final case class DeserializedSnapshot(payload: Any, meta: OptionVal[Any]) - class SnapshotDeserializer(system: ActorSystem) { + /** + * INTERNAL API + */ + @InternalApi + private[akka] class SnapshotSerialization(system: ActorSystem)(implicit val ec: ExecutionContext) { private val log = Logging(system, this.getClass) @@ -368,6 +318,40 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi b } + def serialize(payload: Any, meta: Option[Any]): Future[Serialized] = + try { + def serializeMeta(): Option[SerializedMeta] = + meta.map { m => + val m2 = m.asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(m2) + val serManifest = Serializers.manifestFor(serializer, m2) + val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) + SerializedMeta(metaBuf, serManifest, serializer.identifier) + } + + val p: AnyRef = payload.asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(p) + val serManifest = Serializers.manifestFor(serializer, p) + serializer match { + case asyncSer: AsyncSerializer => + Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => + asyncSer.toBinaryAsync(p).map { bytes => + val serPayload = ByteBuffer.wrap(bytes) + Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) + } + } + case _ => + Future { + // Serialization.serialize adds transport info + val serPayload = ByteBuffer.wrap(serialization.serialize(p).get) + Serialized(serPayload, serManifest, serializer.identifier, serializeMeta()) + } + } + + } catch { + case NonFatal(e) => Future.failed(e) + } + def deserializeSnapshot(row: Row)(implicit ec: ExecutionContext): Future[DeserializedSnapshot] = try { @@ -419,4 +403,31 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi case NonFatal(e) => Future.failed(e) } } + + /** + * INTERNAL API + */ + private[akka] def prepareSnapshotWrite( + ps: PreparedStatement, + metadata: SnapshotMetadata, + ser: Serialized): BoundStatement = { + val bs = ps + .bind() + .setString("persistence_id", metadata.persistenceId) + .setLong("sequence_nr", metadata.sequenceNr) + .setLong("timestamp", metadata.timestamp) + .setInt("ser_id", ser.serId) + .setString("ser_manifest", ser.serManifest) + .setByteBuffer("snapshot_data", ser.serialized) + + // meta data, if any + ser.meta match { + case Some(meta) => + bs.setInt("meta_ser_id", meta.serId) + .setString("meta_ser_manifest", meta.serManifest) + .setByteBuffer("meta", meta.serialized) + case None => + bs + } + } } diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala index 1395d1391..2d73734b9 100644 --- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala @@ -75,7 +75,7 @@ object CassandraSpec { val fallbackConfig = ConfigFactory.parseString(s""" akka.loggers = ["akka.persistence.cassandra.SilenceAllTestEventListener"] - akka.loglevel = INFO + akka.loglevel = DEBUG akka.use-slf4j = off datastax-java-driver { @@ -192,6 +192,16 @@ abstract class CassandraSpec( println(s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong( "sequence_nr")}""") }) + + println("snapshots") + cluster + .execute(s"select * from ${snapshotName}.snapshots") + .asScala + .foreach(row => { + println( + s"""Row:${row.getString("persistence_id")}, ${row.getLong("sequence_nr")}, ${row.getLong("timestamp")}""") + }) + } keyspaces().foreach { keyspace => cluster.execute(s"drop keyspace if exists $keyspace") diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala index c5134dec2..7e0468d3e 100644 --- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala @@ -9,11 +9,9 @@ import java.time.ZoneOffset import akka.actor.ActorRef import akka.actor.Props -import akka.persistence.PersistentActor -import akka.persistence.SaveSnapshotSuccess -import akka.persistence.SnapshotOffer +import akka.persistence.{ PersistentActor, SaveSnapshotSuccess, SnapshotMetadata, SnapshotOffer } import akka.persistence.cassandra.CassandraSpec -import akka.persistence.cassandra.query.firstBucketFormatter +import akka.persistence.cassandra.query.{ firstBucketFormatter, DirectWriting } import akka.persistence.journal.Tagged import akka.persistence.query.NoOffset import akka.stream.scaladsl.Sink @@ -22,9 +20,10 @@ import com.typesafe.config.ConfigFactory object CleanupSpec { val today = LocalDateTime.now(ZoneOffset.UTC) val config = ConfigFactory.parseString(s""" - akka.loglevel = INFO + akka.loglevel = DEBUG akka.persistence.cassandra.cleanup { log-progress-every = 2 + dry-run = false } akka.persistence.cassandra.events-by-tag { first-time-bucket = "${today.minusDays(5).format(firstBucketFormatter)}" @@ -79,7 +78,7 @@ object CleanupSpec { } } -class CleanupSpec extends CassandraSpec(CleanupSpec.config) { +class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { import CleanupSpec._ "Cassandra cleanup" must { @@ -203,6 +202,143 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) { queries.currentEventsByTag(tag = "tag-b", offset = NoOffset).runWith(Sink.seq).futureValue.size should ===(0) } + "delete some for one persistenceId" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 8).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + cleanup.deleteEventsTo(pid, 5).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("", List("evt-6", "evt-7", "evt-8"), 8L)) + } + + "clean up before latest snapshot for one persistence id" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + cleanup.cleanupBeforeSnapshot(pid, 1).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("snap-6", List("evt-7", "evt-8", "evt-9"), 9L)) + + // check old snapshots are done + val snapshots = allSnapshots(pid) + snapshots.size shouldEqual 1 + snapshots.head.sequenceNr shouldEqual 6 + // check old events are gone + queries + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_.event.toString) + .runWith(Sink.seq) + .futureValue shouldEqual List("evt-7", "evt-8", "evt-9") + } + + "clean up before snapshot including timestamp that results in all events kept for one persistence id" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + // a long way in the past so keep everything + cleanup.cleanupBeforeSnapshot(pid, 1, 100).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("snap-6", List("evt-7", "evt-8", "evt-9"), 9L)) + + // check old snapshots are done + val snapshots = allSnapshots(pid) + snapshots.size shouldEqual 2 + // check old events are kept due to timestamp, all events before the oldest snapshot are still deleted + queries + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_.event.toString) + .runWith(Sink.seq) + .futureValue shouldEqual List("evt-4", "evt-5", "evt-6", "evt-7", "evt-8", "evt-9") + } + + "clean up before snapshot including timestamp for one persistence id" in { + val pid = nextPid + val p = system.actorOf(TestActor.props(pid)) + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + p ! Snap + expectMsgType[Ack] + (1 to 3).foreach { i => + p ! PersistEvent + expectMsgType[Ack] + } + system.stop(p) + + val cleanup = new Cleanup(system) + // timestamp shouldn't result in any more events/snapshtos kept apart from the last one + cleanup.cleanupBeforeSnapshot(pid, 1, System.currentTimeMillis()).futureValue + + val p2 = system.actorOf(TestActor.props(pid)) + p2 ! GetRecoveredState + expectMsg(RecoveredState("snap-6", List("evt-7", "evt-8", "evt-9"), 9L)) + + // check old snapshots are done + val snapshots = allSnapshots(pid) + snapshots.size shouldEqual 1 + // check old events are kept due to timestamp + queries + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_.event.toString) + .runWith(Sink.seq) + .futureValue shouldEqual List("evt-7", "evt-8", "evt-9") + } + "delete all for several persistenceId" in { val pidA = nextPid val pidB = nextPid @@ -303,4 +439,130 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) { } + "Time and snapshot based cleanup" must { + "keep the correct number of snapshots" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-3", Nil, 3L)) + + allSnapshots(pid) shouldEqual List(SnapshotMetadata(pid, 2, 2000), SnapshotMetadata(pid, 3, 3000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000)) + } + "keep the all snapshots if fewer than requested without timestamp" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 4).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-3", Nil, 3L)) + + allSnapshots(pid) shouldEqual List( + SnapshotMetadata(pid, 1, 1000), + SnapshotMetadata(pid, 2, 2000), + SnapshotMetadata(pid, 3, 3000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000)) + } + "keep the all snapshots if fewer than requested with timestamp" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 4, 1500).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-3", Nil, 3L)) + + allSnapshots(pid) shouldEqual List( + SnapshotMetadata(pid, 1, 1000), + SnapshotMetadata(pid, 2, 2000), + SnapshotMetadata(pid, 3, 3000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000)) + } + + "work without timestamp when there are no snapshots" in { + val cleanup = new Cleanup(system) + val pid = nextPid + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2).futureValue + oldestSnapshot shouldEqual None + } + + "work with timestamp when there are no snapshots" in { + val cleanup = new Cleanup(system) + val pid = nextPid + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2, 100).futureValue + oldestSnapshot shouldEqual None + } + + "don't delete snapshots newer than the oldest date" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 4, 4000), "snapshot-4").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2, 1500).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-4", Nil, 4L)) + + // here 3 snapshots are kept as snapshot 2 is newer than 1500 + allSnapshots(pid) shouldEqual List( + SnapshotMetadata(pid, 2, 2000), + SnapshotMetadata(pid, 3, 3000), + SnapshotMetadata(pid, 4, 4000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000)) + } + "keep snapshots older than the oldest date to meet snapshotsToKeep" in { + val cleanup = new Cleanup(system) + val pid = nextPid + writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 2, 2000), "snapshot-2").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 3, 3000), "snapshot-3").futureValue + writeTestSnapshot(SnapshotMetadata(pid, 4, 4000), "snapshot-4").futureValue + + val oldestSnapshot = cleanup.deleteBeforeSnapshot(pid, 2, 5000).futureValue + + val p1 = system.actorOf(TestActor.props(pid)) + p1 ! GetRecoveredState + expectMsg(RecoveredState("snapshot-4", Nil, 4L)) + + // 2 snapshots are kept that are both older than the keepAfter as 2 should be kept + allSnapshots(pid) shouldEqual List(SnapshotMetadata(pid, 3, 3000), SnapshotMetadata(pid, 4, 4000)) + + oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 3, 3000)) + } + } + + private def allSnapshots(pid: String): Seq[SnapshotMetadata] = { + import scala.collection.JavaConverters._ + cluster + .execute(s"select * from ${snapshotName}.snapshots where persistence_id = '${pid}' order by sequence_nr") + .asScala + .map(row => + SnapshotMetadata(row.getString("persistence_id"), row.getLong("sequence_nr"), row.getLong("timestamp"))) + .toList + } + } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala b/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala index 08596d096..e08f9cc97 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala @@ -7,11 +7,13 @@ package akka.persistence.cassandra.query import java.nio.ByteBuffer import akka.actor.ActorSystem -import akka.persistence.PersistentRepr +import akka.persistence.{ PersistentRepr, SnapshotMetadata } import akka.persistence.cassandra.Hour import akka.persistence.cassandra.PluginSettings import akka.persistence.cassandra.journal.CassandraJournalStatements import akka.persistence.cassandra.journal.TimeBucket +import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStatements, CassandraSnapshotStore } +import akka.persistence.cassandra.snapshot.CassandraSnapshotStore.SnapshotSerialization import akka.serialization.SerializationExtension import akka.serialization.Serializers import com.datastax.oss.driver.api.core.CqlSession @@ -19,22 +21,36 @@ import com.datastax.oss.driver.api.core.uuid.Uuids import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite +import scala.concurrent.{ ExecutionContext, Future } + trait DirectWriting extends BeforeAndAfterAll { self: Suite => def system: ActorSystem private lazy val serialization = SerializationExtension(system) private lazy val settings = PluginSettings(system) + private lazy implicit val ec: ExecutionContext = system.dispatcher def cluster: CqlSession private lazy val writeStatements: CassandraJournalStatements = new CassandraJournalStatements(settings) + private lazy val snapshotStatements: CassandraSnapshotStatements = new CassandraSnapshotStatements( + settings.snapshotSettings) + private lazy val snapshotSerialization = new SnapshotSerialization(system)(system.dispatcher) private lazy val preparedWriteMessage = cluster.prepare(writeStatements.writeMessage(withMeta = true)) - private lazy val preparedDeleteMessage = cluster.prepare(writeStatements.deleteMessage) + private lazy val preparedWriteSnapshot = cluster.prepare(snapshotStatements.writeSnapshot(withMeta = false)) + + def writeTestSnapshot(snapshotMeta: SnapshotMetadata, snapshot: AnyRef): Future[Unit] = { + snapshotSerialization.serialize(snapshot, None).map { ser => + val bound = CassandraSnapshotStore.prepareSnapshotWrite(preparedWriteSnapshot, snapshotMeta, ser) + cluster.execute(bound) + () + } + } - protected def writeTestEvent(persistent: PersistentRepr, partitionNr: Long = 1L): Unit = { + def writeTestEvent(persistent: PersistentRepr, partitionNr: Long = 1L): Unit = { val event = persistent.payload.asInstanceOf[AnyRef] val serializer = serialization.findSerializerFor(event) val serialized = ByteBuffer.wrap(serialization.serialize(event).get) diff --git a/docs/src/main/paradox/cleanup.md b/docs/src/main/paradox/cleanup.md new file mode 100644 index 000000000..ea0892a94 --- /dev/null +++ b/docs/src/main/paradox/cleanup.md @@ -0,0 +1,31 @@ +# Database Cleanup + +@@@ warning + +When running any operation for a persistence id the actor with that persistence id must not be running! + +@@@ + +If possible, it is best to keep all events in an event sourced system. That way new [projections](https://doc.akka.io/docs/akka-projection/current/index.html) +and the `tag_view` table can be re-built if it is corrupted (e.g. due to a two persistence ids writing events from two nodes in a split brain). + +In come cases keeping all events is not possible. `EventSourcedBehavior`s can automatically snapshot state and delete events as described in the [Akka docs](https://doc.akka.io/docs/akka/current/typed/persistence-snapshot.html#snapshot-deletion). +Snapshotting is useful even if events aren't deleted as it speeds up recovery. + +The @apidoc[akka.persistence.cassandra.cleanup.Cleanup] tool can retrospectively clean up the journal. Its operations include: + +* Delete all events for a persistence id +* Delete all events and tagged events for the `eventsByTag` query +* Delete all snapshots for a persistence id +* Delete all snapshots and events for a persistence id keeping the latest N snapshots and all the events after them. + +The cleanup tool can be combined with the @ref[query plugin](./read-journal.md) which has a query to get all persistence ids. + + +Scala +: @@snip [snapshot-keyspace](/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala) { #cleanup } + +Java +: @@snip [snapshot-keyspace](/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java) { #cleanup } + +By default, all operations only print what they were going to do. Once you're happy with what the cleanup tool is going todo set `akka.persistence.cassandra.dry-run = false` diff --git a/docs/src/main/paradox/events-by-tag.md b/docs/src/main/paradox/events-by-tag.md index 930407f98..d4b01b556 100644 --- a/docs/src/main/paradox/events-by-tag.md +++ b/docs/src/main/paradox/events-by-tag.md @@ -170,7 +170,7 @@ otherwise they'll be deleted before being read. @@@ The @apidoc[akka.persistence.cassandra.cleanup.Cleanup] tool can also be used for deleting from the `tag_views` -table. +table. See @ref[Database Cleanup](./cleanup.md) for more details. ## How it works diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md index e72fc42da..02728b838 100644 --- a/docs/src/main/paradox/index.md +++ b/docs/src/main/paradox/index.md @@ -21,5 +21,6 @@ The Akka Persistence Cassandra plugin allows for using [Apache Cassandra](https: * [ScyllaDB](scylladb.md) * [Amazon Keyspaces](keyspaces.md) * [Migrations](migrations.md) +* [Cleanup](cleanup.md) @@@ diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md index 16d0e2faf..d43892cfa 100644 --- a/docs/src/main/paradox/journal.md +++ b/docs/src/main/paradox/journal.md @@ -129,4 +129,4 @@ datastax-java-driver.profiles { The @apidoc[akka.persistence.cassandra.cleanup.Cleanup] tool can be used for deleting all events and/or snapshots given list of `persistenceIds` without using persistent actors. It's important that the actors with corresponding -`persistenceId` are not running at the same time as using the tool. +`persistenceId` are not running at the same time as using the tool. See @ref[Database Cleanup](./cleanup.md) for more details. diff --git a/docs/src/main/paradox/snapshots.md b/docs/src/main/paradox/snapshots.md index 3bc9de6f0..ea8ef0c7a 100644 --- a/docs/src/main/paradox/snapshots.md +++ b/docs/src/main/paradox/snapshots.md @@ -72,4 +72,4 @@ The snapshot is stored in a single row so the maximum size of a serialized snaps The @apidoc[akka.persistence.cassandra.cleanup.Cleanup] tool can be used for deleting all events and/or snapshots given list of `persistenceIds` without using persistent actors. It's important that the actors with corresponding -`persistenceId` are not running at the same time as using the tool. +`persistenceId` are not running at the same time as using the tool. See @ref[Database Cleanup](./cleanup.md) for more details. diff --git a/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java new file mode 100644 index 000000000..097ecba29 --- /dev/null +++ b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java @@ -0,0 +1,44 @@ +package jdoc.cleanup; + +import akka.Done; +import akka.actor.ActorSystem; +import akka.persistence.cassandra.cleanup.Cleanup; +import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; +import akka.persistence.query.PersistenceQuery; +import scala.compat.java8.FutureConverters; + +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; + +public class CleanupDocExample { + + + + public static void example() { + + ActorSystem system = null; + + //#cleanup + CassandraReadJournal queries = PersistenceQuery.get(system).getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier()); + Cleanup cleanup = new Cleanup(system); + + int persistenceIdParallelism = 10; + + + // forall persistence ids, keep two snapshots and delete all events before the oldest kept snapshot + queries.currentPersistenceIds().mapAsync(persistenceIdParallelism, pid -> FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid, 2))).run(system); + + // forall persistence ids, keep everything after the provided unix timestamp, if there aren't enough snapshots after this time + // go back before the timestamp to find snapshot to delete before + // this operation is more expensive that the one above + ZonedDateTime keepAfter = ZonedDateTime.now().minus(1, ChronoUnit.MONTHS); + queries + .currentPersistenceIds() + .mapAsync(persistenceIdParallelism, pid -> FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant().toEpochMilli()))) + .run(system); + + //#cleanup + + + } +} diff --git a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala new file mode 100644 index 000000000..1a68ec92b --- /dev/null +++ b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala @@ -0,0 +1,36 @@ +package doc.cleanup + +import java.time.{ LocalDate, ZonedDateTime } +import java.time.temporal.ChronoUnit + +import akka.actor.ActorSystem +import akka.persistence.cassandra.cleanup.Cleanup +import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal +import akka.persistence.query.PersistenceQuery + +object CleanupDocExample { + + implicit val system: ActorSystem = ??? + + //#cleanup + val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) + val cleanup = new Cleanup(system) + + // how many persistence ids to operate on in parallel + val persistenceIdParallelism = 10 + + // forall persistence ids, keep two snapshots and delete all events before the oldest kept snapshot + queries.currentPersistenceIds().mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2)).run() + + // forall persistence ids, keep everything after the provided unix timestamp, if there aren't enough snapshots after this time + // go back before the timestamp to find snapshot to delete before + // this operation is more expensive that the one above + val keepAfter = ZonedDateTime.now().minus(1, ChronoUnit.MONTHS); + queries + .currentPersistenceIds() + .mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant.toEpochMilli)) + .run() + + //#cleanup + +}