Skip to content

Commit

Permalink
execution profile for snapshot store, akka#794 (akka#809)
Browse files Browse the repository at this point in the history
* a few missing in SnapshotStore
* separate profile for snapshot store and use ONE as default
  • Loading branch information
patriknw committed Sep 8, 2020
1 parent e2e7099 commit 3853e67
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
13 changes: 9 additions & 4 deletions core/src/main/resources/reference.conf
Expand Up @@ -431,8 +431,8 @@ akka.persistence.cassandra {
# Dispatcher for the plugin actor
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"

write-profile = "akka-persistence-cassandra-profile"
read-profile = "akka-persistence-cassandra-profile"
write-profile = "akka-persistence-cassandra-snapshot-profile"
read-profile = "akka-persistence-cassandra-snapshot-profile"

# Parameter indicating whether the journal keyspace should be auto created.
# Not all Cassandra settings are configurable when using autocreate and for
Expand Down Expand Up @@ -562,8 +562,13 @@ datastax-java-driver {
# the journal does not use any counters or collections
default-idempotence = true
}


}
akka-persistence-cassandra-snapshot-profile {
basic.request {
consistency = ONE
# the snapshot store does not use any counters or collections
default-idempotence = true
}
}
}
}
Expand Down
Expand Up @@ -228,7 +228,9 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
metadata(snapshotMetaPs, persistenceId, criteria, limit = None).flatMap {
mds: immutable.Seq[SnapshotMetadata] =>
val boundStatementBatches = mds
.map(md => preparedDeleteSnapshot.map(_.bind(md.persistenceId, md.sequenceNr: JLong)))
.map(md =>
preparedDeleteSnapshot.map(_.bind(md.persistenceId, md.sequenceNr: JLong)
.setExecutionProfileName(snapshotSettings.writeProfile)))
.grouped(0xFFFF - 1)
if (boundStatementBatches.nonEmpty) {
Future
Expand All @@ -245,7 +247,8 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
}
} else {
val boundDeleteSnapshot = preparedDeleteAllSnapshotsForPidAndSequenceNrBetween.map(
_.bind(persistenceId, criteria.minSequenceNr: JLong, criteria.maxSequenceNr: JLong))
_.bind(persistenceId, criteria.minSequenceNr: JLong, criteria.maxSequenceNr: JLong)
.setExecutionProfileName(snapshotSettings.writeProfile))
boundDeleteSnapshot.flatMap(session.executeWrite(_)).map(_ => ())
}
}
Expand Down Expand Up @@ -304,7 +307,9 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
persistenceId: String,
criteria: SnapshotSelectionCriteria,
limit: Option[Int]): Future[immutable.Seq[SnapshotMetadata]] = {
val boundStmt = snapshotMetaPs.bind(persistenceId, criteria.maxSequenceNr: JLong, criteria.minSequenceNr: JLong)
val boundStmt = snapshotMetaPs
.bind(persistenceId, criteria.maxSequenceNr: JLong, criteria.minSequenceNr: JLong)
.setExecutionProfileName(snapshotSettings.readProfile)
log.debug("Executing metadata query")
val source: Source[SnapshotMetadata, NotUsed] = session
.select(boundStmt)
Expand All @@ -325,7 +330,8 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
session.prepare(deleteAllSnapshotForPersistenceIdAndSequenceNrBetween)

def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
val boundDeleteSnapshot = preparedDeleteSnapshot.map(_.bind(metadata.persistenceId, metadata.sequenceNr: JLong))
val boundDeleteSnapshot = preparedDeleteSnapshot.map(
_.bind(metadata.persistenceId, metadata.sequenceNr: JLong).setExecutionProfileName(snapshotSettings.writeProfile))
boundDeleteSnapshot.flatMap(session.executeWrite(_)).map(_ => ())
}

Expand Down
19 changes: 19 additions & 0 deletions docs/src/main/paradox/snapshots.md
Expand Up @@ -34,6 +34,25 @@ The default table definitions look like this:

@@snip [snapshot-tables](/target/snapshot-tables.txt) { #snapshot-tables}

### Consistency

By default the snapshot store uses `ONE` for all reads and writes, since snapshots
should only be used as an optimization to reduce number of replayed events.
If a recovery doesn't see the latest snapshot it will just start from an older snapshot
and replay events from there. Be careful to not delete events too eagerly after storing
snapshots since the deletes may be visible before the snapshot is visible. Keep a few
snapshots and corresponding events before deleting older events and snapshots.

The consistency level for snapshots can be changed with:

```
datastax-java-driver.profiles {
akka-persistence-cassandra-snapshot-profile {
basic.request.consistency = QUORUM
}
}
```

## Configuration

To activate the snapshot-store plugin, add the following line to your Akka `application.conf`:
Expand Down

0 comments on commit 3853e67

Please sign in to comment.