Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
version = "2.4.2"
align.tokens = more
align.openParenCallSite = true
align.openParenDefnSite = true
danglingParentheses = false
version = "2.5.1"

preset = defaultWithAlign
danglingParentheses.preset = false
rewrite.rules = [
SortImports
]
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ sudo: required

scala:
- 2.12.11
- 2.13.1
- 2.13.2

jdk:
- openjdk8
Expand Down
65 changes: 15 additions & 50 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,24 @@ import sbt.librarymanagement.ModuleID

object Dependencies {

object akka {
private val version = "2.5.30"

val actor = "com.typesafe.akka" %% "akka-actor" % version
val slf4j = "com.typesafe.akka" %% "akka-slf4j" % version
val persistence = "com.typesafe.akka" %% "akka-persistence" % version
val protobuf = "com.typesafe.akka" %% "akka-protobuf" % version
val persistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % version
val stream = "com.typesafe.akka" %% "akka-stream" % version
val streamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % version
}

object logback {
val classic = "ch.qos.logback" % "logback-classic" % "1.1.11"
}

object slf4j {
val api = "org.slf4j" % "slf4j-api" % "1.7.30"
}

object typesafe {
val config = "com.typesafe" % "config" % "1.4.0"
}

object scalatest {
val scalatestMain = "org.scalatest" %% "scalatest" % "3.1.1"
val scalatestPlus = "org.scalatestplus" %% "scalacheck-1-14" % "3.1.1.1"
}

object levelDb {
val levelDb = "org.iq80.leveldb" % "leveldb" % "0.12"
}

object scala {
val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6"
}
private val akkaVersion = "2.5.30"

val core: Seq[ModuleID] = Seq(
akka.actor,
akka.stream,
akka.persistence,
akka.persistenceQuery,
akka.protobuf,
logback.classic,
slf4j.api,
typesafe.config,
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-protobuf" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.1.8",
"org.slf4j" % "slf4j-api" % "1.7.30",
"com.typesafe" % "config" % "1.4.0",
// -- Testing --
scalatest.scalatestMain % Test,
scalatest.scalatestPlus % Test,
akka.slf4j % Test,
akka.streamTestKit % Test,
levelDb.levelDb % Test,
"org.scalatest" %% "scalatest" % "3.1.1" % Test,
"org.scalatestplus" %% "scalacheck-1-14" % "3.1.1.1" % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"org.iq80.leveldb" % "leveldb" % "0.12" % Test,
// -- Backwards Compatibility --
scala.collectionCompat
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.4"
)
}
22 changes: 12 additions & 10 deletions project/Publish.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import sbt.plugins._
object NoPublish extends AutoPlugin {
override def requires: Plugins = JvmPlugin

override def projectSettings: Seq[Setting[_]] = Seq(
publishArtifact := false,
publish := {},
publishLocal := {}
)
override def projectSettings: Seq[Setting[_]] =
Seq(
publishArtifact := false,
publish := {},
publishLocal := {}
)
}

object Publish extends AutoPlugin {
Expand All @@ -19,9 +20,10 @@ object Publish extends AutoPlugin {
override def trigger: PluginTrigger = allRequirements
override def requires: Plugins = BintrayPlugin

override def projectSettings: Seq[Setting[_]] = Seq(
bintrayOrganization := Some("firstbird"),
bintrayPackage := "akka-persistence-query-view",
bintrayPackageLabels := Seq("akka", "akka-persistence", "event-sourcing", "cqrs")
)
override def projectSettings: Seq[Setting[_]] =
Seq(
bintrayOrganization := Some("firstbird"),
bintrayPackage := "akka-persistence-query-view",
bintrayPackageLabels := Seq("akka", "akka-persistence", "event-sourcing", "cqrs")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ class QueryViewSnapshotSerializer(val system: ExtendedActorSystem) extends BaseS
override def toBinary(o: AnyRef, buf: ByteBuffer): Unit =
toBinary(o, new ByteBufferOutputStream(buf))

private def toBinary(o: AnyRef, out: OutputStream): Unit = o match {
case qvs: QueryViewSnapshot[_] => serializeQueryViewSnapshot(qvs, out)
case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
private def toBinary(o: AnyRef, out: OutputStream): Unit =
o match {
case qvs: QueryViewSnapshot[_] => serializeQueryViewSnapshot(qvs, out)
case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
fromBinary(new ByteArrayInputStream(bytes))
Expand Down
18 changes: 10 additions & 8 deletions src/main/scala/akka/persistence/QueryView.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,19 +447,21 @@ abstract class QueryView
()
}

override def saveSnapshot(snapshot: Any): Unit = if (!savingSnapshot) {
// Decorate the snapshot
savingSnapshot = true
super.saveSnapshot(QueryViewSnapshot(snapshot, _lastOffset, _sequenceNrByPersistenceId))
}
override def saveSnapshot(snapshot: Any): Unit =
if (!savingSnapshot) {
// Decorate the snapshot
savingSnapshot = true
super.saveSnapshot(QueryViewSnapshot(snapshot, _lastOffset, _sequenceNrByPersistenceId))
}

private def snapshotSaved(metadata: SnapshotMetadata): Unit = {
savingSnapshot = false
lastSnapshotSequenceNr = metadata.sequenceNr
_noOfEventsSinceLastSnapshot = 0L
log.debug("Snapshot saved successfully snapshotterId={} lastSnapshotSequenceNr={}",
snapshotterId,
lastSnapshotSequenceNr)
log.debug(
"Snapshot saved successfully snapshotterId={} lastSnapshotSequenceNr={}",
snapshotterId,
lastSnapshotSequenceNr)
}

private def snapshotSavingFailed(error: Throwable): Unit = {
Expand Down
9 changes: 5 additions & 4 deletions src/test/scala/com/ovoenergy/akka/AkkaFixture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ trait AkkaFixture extends BeforeAndAfterEach { self: Suite with ConfigFixture =>

implicit def system: ActorSystem =
Option(_system).getOrElse(throw new IllegalStateException("ActorSystem not yet started"))
def extendedActorSystem: ExtendedActorSystem = system match {
case eas: ExtendedActorSystem => eas
case _ => throw new IllegalStateException("ActorSystem not an instance of ExtendedActorSystem")
}
def extendedActorSystem: ExtendedActorSystem =
system match {
case eas: ExtendedActorSystem => eas
case _ => throw new IllegalStateException("ActorSystem not an instance of ExtendedActorSystem")
}

override protected def beforeEach(): Unit = {
super.beforeEach()
Expand Down
15 changes: 8 additions & 7 deletions src/test/scala/com/ovoenergy/akka/AkkaPersistenceFixture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ trait AkkaPersistenceFixture extends ConfigFixture with ScalaFutures with Before
written
}

def deleteFromJournal(persistenceId: String, toSequenceNr: Long): Unit = withJournalWriter(persistenceId) { writer =>
writer
.ask(DeleteFromJournal(toSequenceNr))(10.seconds)
.mapTo[DeleteMessagesSuccess]
.futureValue(timeout(scaled(5.seconds)))
note(s"Events deleted from $persistenceId up to $toSequenceNr")
}
def deleteFromJournal(persistenceId: String, toSequenceNr: Long): Unit =
withJournalWriter(persistenceId) { writer =>
writer
.ask(DeleteFromJournal(toSequenceNr))(10.seconds)
.mapTo[DeleteMessagesSuccess]
.futureValue(timeout(scaled(5.seconds)))
note(s"Events deleted from $persistenceId up to $toSequenceNr")
}

override protected def beforeEach(): Unit = {
super.beforeEach()
Expand Down