diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c9786c2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.vagrant/ +target/ +.idea +*.iml diff --git a/README.md b/README.md new file mode 100644 index 0000000..a21b894 --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +# akka-persistence-inmemory +Akka-persistence-inmemory is a plugin for [akka-persistence](http://doc.akka.io/docs/akka/snapshot/scala/persistence.html) +that writes journal entries to an in-memory store. It supports writing journal messages and snapshots so its very useful +for testing your persistent actors. + +# Installation +To include the plugin into your sbt project, add the following lines to your build.sbt file: + + libraryDependencies += "com.github.dnvriend" %% "akka-persistence-inmemory" % "0.0.1" + +For Maven users, add the following to the pom.xml + + + com.github.dnvriend + akka-persistence-inmemory_2.10 + 0.0.1 + + + + com.github.dnvriend + akka-persistence-inmemory_2.11 + 0.0.1 + + +This version of akka-persistence-inmemory depends on Akka 2.3.4 and is cross-built against Scala 2.10.4 and 2.11.2 +and should be binary compatible with Akka 2.3.5 + +# Configuration +Add the following to the application.conf: + +``` +akka { + persistence { + journal.plugin = "inmemory-journal" + snapshot-store.plugin = "inmemory-snapshot-store" + } +} +``` + +### What's new? + +### 0.0.1 + - Initial Release + +Have fun! \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..110e988 --- /dev/null +++ b/build.sbt @@ -0,0 +1,73 @@ +import SonatypeKeys._ + +organization := "com.github.dnvriend" + +name := "akka-persistence-inmemory" + +version := "0.0.1" + +scalaVersion := "2.11.1" + +crossScalaVersions := Seq("2.10.4", "2.11.2") + +scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8") + +resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven" + +profileName := "com.github.dnvriend" + +libraryDependencies ++= { + val akkaVersion = "2.3.4" + Seq( + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "com.typesafe.akka" %% "akka-persistence-experimental" % akkaVersion, + "ch.qos.logback" % "logback-classic" % "1.1.2", + "org.slf4j" % "slf4j-nop" % "1.6.4", + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % "2.1.4" % "test", + "com.github.krasserm" %% "akka-persistence-testkit" % "0.3.4" % "test" + ) +} + +testOptions += Tests.Argument(TestFrameworks.JUnit, "-v") + +// publish settings + +publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) + Some("snapshots" at nexus + "content/repositories/snapshots") + else + Some("releases" at nexus + "service/local/staging/deploy/maven2") +} + +publishMavenStyle := true + +publishArtifact in Test := false + +pomIncludeRepository := { _ => false } + +pomExtra := ( + https://github.com/dnvriend/akka-persistence-inmemory + + + BSD-style + http://www.opensource.org/licenses/bsd-license.php + repo + + + + https://github.com/dnvriend/akka-persistence-inmemory + scm:git:git@github.com:dnvriend/akka-persistence-inmemory.git + + + + you + Dennis Vriend + https://github.com/dnvriend + + +) + +xerial.sbt.Sonatype.sonatypeSettings diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..5075289 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.5 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..3c898c4 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,9 @@ +addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0") + +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4") + +addSbtPlugin("org.scala-sbt.plugins" % "sbt-onejar" % "0.8") + +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1") + +resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" \ No newline at end of file diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..6a753b6 --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,12 @@ +akka { + +} + +inmemory-journal { + class = "akka.persistence.inmemory.journal.InMemoryJournal" +} + +inmemory-snapshot-store { + class = "akka.persistence.inmemory.snapshot.InMemorySnapshotStore" +} + diff --git a/src/main/scala/akka/persistence/inmemory/journal/InMemoryJournal.scala b/src/main/scala/akka/persistence/inmemory/journal/InMemoryJournal.scala new file mode 100644 index 0000000..1dad3a3 --- /dev/null +++ b/src/main/scala/akka/persistence/inmemory/journal/InMemoryJournal.scala @@ -0,0 +1,134 @@ +package akka.persistence.inmemory.journal + +import akka.actor.ActorLogging +import akka.persistence.{PersistentConfirmation, PersistentId, PersistentRepr} +import akka.persistence.journal.AsyncWriteJournal + +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq +import scala.concurrent.Future + +class InMemoryJournal extends AsyncWriteJournal with ActorLogging { + implicit val ec = context.system.dispatcher + val journal: scala.collection.mutable.Map[String, List[PersistentRepr]] = new ConcurrentHashMap[String, List[PersistentRepr]].asScala + + override def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = { + Future[Unit] { + val mess = messages + log.debug("writeMessages for {} persistent messages", mess.size) + mess.foreach { repr => + import repr._ + journal.get(persistenceId) match { + case None => journal.put(processorId, List(repr)) + case Some(list) => journal.put(processorId, repr :: list) + } + } + } + } + + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = { + Future[Unit] { + val perm = permanent + val pid = persistenceId + val toSeq = toSequenceNr + log.debug("asyncDeleteMessagesTo for processorId: {} to sequenceNr: {}, permanent: {}", pid, toSeq, perm) + perm match { + case true => + journal.get(pid) match { + case None => + case Some(list) => journal.put(pid, list.filterNot(_.sequenceNr <= toSeq)) + } + case false => + journal.get(pid) match { + case None => + case Some(list) => journal.put(pid, list.map { repr => + if(repr.sequenceNr <= toSeq) repr.update(deleted = true) else repr + }) + } + } + } + } + + @scala.deprecated("writeConfirmations will be removed, since Channels will be removed.") + override def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = { + Future[Unit] { + val confirms = confirmations + log.debug("writeConfirmations for {} messages", confirms.size) + confirms.foreach { confirmation => + import confirmation._ + journal.get(persistenceId) match { + case None => + case Some(list) => + journal.put(persistenceId, list.map { msg => + if(msg.sequenceNr == sequenceNr) { + val confirmationIds = msg.confirms :+ confirmation.channelId + msg.update(confirms = confirmationIds) + } else msg + }) + } + } + } + } + + @scala.deprecated("asyncDeleteMessages will be removed.") + override def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = { + Future[Unit] { + val mids = messageIds + val perm = permanent + log.debug("Async delete {} messages, permanent: {}", mids.size, perm) + + mids.foreach { persistentId => + import persistentId._ + perm match { + case true => + journal.get(processorId) match { + case None => + case Some(list) => journal.put(processorId, list.filterNot(_.sequenceNr == sequenceNr)) + } + case false => + journal.get(processorId) match { + case None => + case Some(list) => journal.put(processorId, list.map { repr => + if(repr.sequenceNr == sequenceNr) repr.update(deleted = true) else repr + }) + } + } + } + } + } + + override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + Future[Long] { + val pid = persistenceId + val fromSeq = fromSequenceNr + log.debug("Async read for highest sequence number for processorId: {} (hint, seek from nr: {})", pid, fromSeq) + journal.get(pid) match { + case None => 0 + case Some(list) => list.map(_.sequenceNr).max + } + } + } + + override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = { + Future[Unit] { + val pid = persistenceId + val fromSeq = fromSequenceNr + val toSeq = toSequenceNr + val limit = max + val replay = replayCallback + + log.debug("Async replay for processorId {}, from sequenceNr: {}, to sequenceNr: {} with max records: {}", pid, fromSeq, toSeq, limit) + + journal.get(pid) match { + case None => + case Some(list) => + val takeMax = if(limit >= java.lang.Integer.MAX_VALUE) java.lang.Integer.MAX_VALUE else limit.toInt + list.filter { repr => + repr.sequenceNr >= fromSeq && repr.sequenceNr <= toSeq + }.sortBy(_.sequenceNr) + .take(takeMax).foreach(replay) + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/akka/persistence/inmemory/snapshot/InMemorySnapshotStore.scala b/src/main/scala/akka/persistence/inmemory/snapshot/InMemorySnapshotStore.scala new file mode 100644 index 0000000..df52688 --- /dev/null +++ b/src/main/scala/akka/persistence/inmemory/snapshot/InMemorySnapshotStore.scala @@ -0,0 +1,63 @@ +package akka.persistence.inmemory.snapshot + +import akka.actor.ActorLogging +import akka.persistence.serialization.Snapshot +import akka.persistence.snapshot.SnapshotStore +import akka.persistence.{SnapshotMetadata, Persistence, SelectedSnapshot, SnapshotSelectionCriteria} +import akka.serialization.SerializationExtension +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ +import scala.concurrent.Future + +class InMemorySnapshotStore extends SnapshotStore with ActorLogging { + implicit val system = context.system + val extension = Persistence(context.system) + val serialization = SerializationExtension(context.system) + implicit val executionContext = context.system.dispatcher + + val snapshots: scala.collection.mutable.Map[SnapshotMetadata, Any] = new ConcurrentHashMap[SnapshotMetadata, Any].asScala + + override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + Future[Option[SelectedSnapshot]] { + val pid = persistenceId + val crit = criteria + log.debug("loading for persistenceId: {}, criteria: {}", pid, crit) + val snapshotEntries = snapshots.keySet.toList.filter { meta => + meta.persistenceId == persistenceId && meta.sequenceNr <= criteria.maxSequenceNr + }.filterNot(_.timestamp > criteria.maxTimestamp) + .sortBy(_.sequenceNr) + .reverse.headOption + + snapshotEntries match { + case None => None + case Some(meta) => snapshots.get(meta) match { + case None => None + case Some(value) => Some(SelectedSnapshot(meta, value)) + } + } + } + } + + override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + Future[Unit] { + val meta = metadata + val snap = snapshot + log.debug("Saving metadata: {}, snapshot: {}", meta, snap) + snapshots.put(meta, snap) + } + } + + override def saved(metadata: SnapshotMetadata): Unit = log.debug("Saved: {}", metadata) + + override def delete(metadata: SnapshotMetadata): Unit = { + log.debug("Deleting: {}", metadata) + snapshots.remove(metadata) + } + + override def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = { + log.debug("Deleting for persistenceId: {} and criteria: {}", persistenceId, criteria) + snapshots.keySet.toList.filter { meta => + meta.persistenceId == persistenceId && meta.sequenceNr <= criteria.maxSequenceNr + }.foreach(snapshots.remove) + } +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..766ef86 --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,9 @@ +akka { + loglevel = debug + + persistence { + journal.plugin = "inmemory-journal" + snapshot-store.plugin = "inmemory-snapshot-store" + } +} + diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 0000000..b168db5 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,34 @@ + + + + + info + + + %date{HH:mm:ss} %-5level [%X{akkaSource}] - %msg%n + + + + + ${communication-manager.log-file:-communication-manager.log} + + %date{HH:mm:ss} %-5level [%X{akkaSource}] - %msg%n + + + + + + + + + + + + + + + + + + + diff --git a/src/test/scala/akka/persistence/inmemory/journal/InMemoryJournalSpec.scala b/src/test/scala/akka/persistence/inmemory/journal/InMemoryJournalSpec.scala new file mode 100644 index 0000000..10ed14b --- /dev/null +++ b/src/test/scala/akka/persistence/inmemory/journal/InMemoryJournalSpec.scala @@ -0,0 +1,8 @@ +package akka.persistence.inmemory.journal + +import akka.persistence.journal.LegacyJournalSpec +import com.typesafe.config.ConfigFactory + +class InMemoryJournalSpec extends LegacyJournalSpec { + lazy val config = ConfigFactory.load("application.conf") +} diff --git a/src/test/scala/akka/persistence/inmemory/snapshot/InMemorySnapshotStoreSpec.scala b/src/test/scala/akka/persistence/inmemory/snapshot/InMemorySnapshotStoreSpec.scala new file mode 100644 index 0000000..7260e18 --- /dev/null +++ b/src/test/scala/akka/persistence/inmemory/snapshot/InMemorySnapshotStoreSpec.scala @@ -0,0 +1,8 @@ +package akka.persistence.inmemory.snapshot + +import akka.persistence.snapshot.SnapshotStoreSpec +import com.typesafe.config.ConfigFactory + +class InMemorySnapshotStoreSpec extends SnapshotStoreSpec { + lazy val config = ConfigFactory.load("application.conf") +} \ No newline at end of file