Skip to content

Commit

Permalink
Merge pull request #4 from kwark/slick-3.2.0
Browse files Browse the repository at this point in the history
Slick 3.2.0
  • Loading branch information
kwark committed Mar 20, 2017
2 parents 286083f + f39ebf0 commit bacf6a4
Show file tree
Hide file tree
Showing 18 changed files with 155 additions and 45 deletions.
1 change: 1 addition & 0 deletions .travis.yml
@@ -1,6 +1,7 @@
language: scala
scala:
- 2.11.8
- 2.12.1
script: sbt clean +test
sudo: false
jdk:
Expand Down
17 changes: 13 additions & 4 deletions README.adoc
Expand Up @@ -5,7 +5,7 @@ image::https://travis-ci.org/WegenenVerkeer/akka-persistence-postgresql.svg?bran
akka-persistence-postgresql is a journal and snapshot store plugin for http://doc.akka.io/docs/akka/current/scala/persistence.html[akka-persistence] using Postgresql.
It uses Slick to talk to a Postgresql database.

Currently only scala 2.11, Java8, akka 2.4.x and slick 3.1.x are supported.
Scala 2.11 & 2.12, Java8, akka 2.4.x and both slick 3.1.1, 3.2.0 are supported.

The plugin supports the following functionality:

Expand All @@ -17,10 +17,19 @@ The plugin supports the following functionality:
== Usage

Just add the following dependency to your SBT dependencies
Versions: The table below lists the versions and their main dependencies

|===
|Version to use|Scala |Akka |Slick
|0.5.0 |2.11.x |2.4.x |3.1.x
|0.6.0-SNAPSHOT|2.12.x or 2.11.x|2.4.x|3.2.0
|===

So you just need to add the following dependency to your SBT dependencies

libraryDependencies += "be.wegenenverkeer" %% "akka-persistence-pg" % "0.5.0"


== Configuration

Add the following to your `application.conf`:
Expand Down Expand Up @@ -195,7 +204,7 @@ Imaging the following scenario:

* Two persistent actors, A en B, each want to store an event (eventA and eventB) using the journal.
* Two transactions are started almost simulateneously. The first transaction (storing eventA) starts first
and gets an id = 1000 from the sequence. The second transaction (storing eventB) get id = 1001.
and gets an id = 1000 from the sequence. The second transaction (storing eventB) gets id = 1001.
* For some reason however, the second transaction gets committed first.
The journal table now has an entry with 1002 as it's highest entry.
* The persistence query gets notified and reads this event with id = 1002 from the journal.
Expand Down Expand Up @@ -223,7 +232,7 @@ The persistence query plugin will now automatically use the 'rowid' column inste
It is now simply impossible for events with a lower 'rowid' than the maximum 'rowid' present to appear in the journal after.

This strategy has a better throughput than the TableLockingStrategy,
but the latency between storing events and them being available for querying is higher.
but the latency between storing events and them being available for querying is a bit higher.

=== SingleThreadedBatchWriteStrategy

Expand Down
5 changes: 3 additions & 2 deletions modules/akka-persistence-pg/src/main/resources/reference.conf
Expand Up @@ -39,14 +39,15 @@ pg-persistence {
user = "akkapg"
url = "jdbc:postgresql://localhost:5432/akka"
jndiName = "" # if you already have a DB configured somewhere else, you can share it through JNDI,
# when doing so, user, password and url are not needed
# when doing so, user, password and url are not needed but maxConnections is required
maxConnections = 4 # maximum number of JDBC connections of the underlying pool
numThreads = 4
queueSize = 1000
connectionTimeout = 1000
validationTimeout = 1000
connectionPool = "HikariCP" # set to "disabled" to disable connection pooling, useful for tests
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
#for other optional hikarCP related properties, check the JdbcBackend.forConfig scaladoc
#for other optional HikarCP related properties, check the JdbcBackend.forConfig scaladoc
properties = {
prepareThreshold = 1 #enable prepared statement caching on the server side, required because HikariCP does not do prepared statement caching
}
Expand Down
Expand Up @@ -8,15 +8,14 @@ import java.time.temporal.ChronoField
import org.postgresql.util.HStoreConverter
import slick.ast.Library.SqlOperator
import slick.ast.{FieldSymbol, TypedType}
import slick.driver.{JdbcTypesComponent, PostgresDriver}
import slick.jdbc.JdbcType
import slick.jdbc.{JdbcType, JdbcTypesComponent, PostgresProfile}
import slick.lifted.ExtensionMethods

import scala.collection.convert.{WrapAsJava, WrapAsScala}
import scala.reflect.ClassTag
import scala.language.implicitConversions

trait AkkaPgJdbcTypes extends JdbcTypesComponent { driver: PostgresDriver =>
trait AkkaPgJdbcTypes extends JdbcTypesComponent { driver: PostgresProfile =>

def pgjson: String

Expand Down
Expand Up @@ -3,6 +3,6 @@ package akka.persistence.pg
trait PgConfig {

def pluginConfig: PluginConfig
lazy val driver = pluginConfig.pgPostgresDriver
lazy val driver = pluginConfig.pgPostgresProfile
lazy val database = pluginConfig.database
}

This file was deleted.

@@ -0,0 +1,12 @@
package akka.persistence.pg

import slick.jdbc.PostgresProfile

trait PgPostgresProfile extends PostgresProfile with AkkaPgJdbcTypes {

override val api = new API with AkkaPgImplicits {}

}

class PgPostgresProfileImpl(override val pgjson: String) extends PgPostgresProfile

Expand Up @@ -48,7 +48,7 @@ class PluginConfig(systemConfig: Config) {

val jsonType = config.getString("pgjson")

val pgPostgresDriver = new PgPostgresDriverImpl(jsonType match {
val pgPostgresProfile = new PgPostgresProfileImpl(jsonType match {
case "jsonb" => "jsonb"
case "json" => "json"
case a: String => sys.error(s"unsupported value for pgjson '$a'. Only 'json' or 'jsonb' supported")
Expand All @@ -65,7 +65,10 @@ class PluginConfig(systemConfig: Config) {

val db = PluginConfig.asOption(dbConfig.getString("jndiName")) match {
case Some(jndiName) =>
JdbcBackend.Database.forName(jndiName, asyncExecutor(jndiName))
JdbcBackend.Database.forName(jndiName,
Some(if (dbConfig.hasPath("maxConnections")) dbConfig.getInt("maxConnections") else dbConfig.getInt("numThreads")),
asyncExecutor(jndiName)
)

case None =>
dbConfig.getString("connectionPool") match {
Expand All @@ -76,11 +79,10 @@ class PluginConfig(systemConfig: Config) {
simpleDataSource.setUser(dbConfig.getString("user"))
simpleDataSource.setPassword(dbConfig.getString("password"))
simpleDataSource.setPrepareThreshold(1)
JdbcBackend.Database.forDataSource(simpleDataSource, asyncExecutor("akkapg-unpooled"))
JdbcBackend.Database.forDataSource(simpleDataSource, None, asyncExecutor("akkapg-unpooled"))

case _ =>
//Slick's Database.forConfig does NOT use the 'url' when also configuring using a JDBC DataSource instead of
// a JDBC Driver class
//Slick's Database.forConfig does NOT use the 'url' when also configuring using a JDBC DataSource instead of a JDBC Driver class
val props = new Properties()
org.postgresql.Driver.parseURL(dbConfig.getString("url"), new Properties()).asScala foreach {
case ("PGDBNAME", v) => props.put("databaseName", v)
Expand Down
Expand Up @@ -93,7 +93,7 @@ class RowIdUpdater(pluginConfig: PluginConfig) extends Actor
notifiers = Queue.empty
}

import pluginConfig.pgPostgresDriver.api._
import pluginConfig.pgPostgresProfile.api._


def findMaxRowId(): Future[Long] = {
Expand Down
Expand Up @@ -26,10 +26,10 @@ private class StoreActor(pluginConfig: PluginConfig)
case object Run

import context.dispatcher
import pluginConfig.pgPostgresDriver.api._
import pluginConfig.pgPostgresProfile.api._

private var senders: List[ActorRef] = List.empty[ActorRef]
private var actions: Seq[pluginConfig.pgPostgresDriver.api.DBIO[_]] = Seq.empty[DBIO[_]]
private var actions: Seq[pluginConfig.pgPostgresProfile.api.DBIO[_]] = Seq.empty[DBIO[_]]

override def receive: Receive = idle

Expand Down
Expand Up @@ -15,7 +15,7 @@ import scala.language.postfixOps
trait WriteStrategy {

def pluginConfig: PluginConfig
lazy val driver = pluginConfig.pgPostgresDriver
lazy val driver = pluginConfig.pgPostgresProfile

import driver.api._

Expand Down Expand Up @@ -70,7 +70,7 @@ class TransactionalWriteStrategy(override val pluginConfig: PluginConfig,
|!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".stripMargin)

import pluginConfig.pgPostgresDriver.api._
import pluginConfig.pgPostgresProfile.api._

def store(actions: Seq[DBIO[_]],
notifier: Notifier)
Expand All @@ -86,7 +86,7 @@ class TransactionalWriteStrategy(override val pluginConfig: PluginConfig,
class TableLockingWriteStrategy(override val pluginConfig: PluginConfig,
override val system: ActorSystem) extends WriteStrategy {

import pluginConfig.pgPostgresDriver.api._
import pluginConfig.pgPostgresProfile.api._

def store(actions: Seq[DBIO[_]],
notifier: Notifier)
Expand Down
@@ -0,0 +1,12 @@
include "pg-writestrategy-base.conf"
pg-persistence {
db {
jndiName = "MyDS"
maxConnections = 4
numThreads = 4
queueSize = 1000
}
writestrategy = "akka.persistence.pg.journal.RowIdUpdatingStrategy"
}


@@ -0,0 +1,82 @@
package akka.persistence.pg

import java.util.concurrent.TimeUnit
import javax.naming.{Context, InitialContext}

import akka.actor._
import akka.persistence.pg.TestActor.{Alter, GetState, TheState}
import akka.persistence.pg.journal.JournalTable
import akka.persistence.pg.util.{CreateTables, RecreateSchema}
import akka.testkit.TestProbe
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import org.postgresql.ds.PGSimpleDataSource
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Milliseconds, Seconds, Span}

import scala.language.postfixOps

class PersistUsingJndiTest extends FunSuite
with Matchers
with BeforeAndAfterAll
with JournalTable
with CreateTables
with RecreateSchema
with PgConfig
with WaitForEvents
with ScalaFutures
{

override implicit val patienceConfig = PatienceConfig(timeout = Span(3, Seconds), interval = Span(100, Milliseconds))

val config: Config = ConfigFactory.load("pg-persist-jndi.conf")
implicit val system = ActorSystem("TestCluster", config)
override lazy val pluginConfig: PluginConfig = PgExtension(system).pluginConfig

import driver.api._

val testProbe = TestProbe()
implicit val timeOut = Timeout(1, TimeUnit.MINUTES)

test("generate events") {
val test = system.actorOf(Props(new TestActor(testProbe.ref)))

testProbe.send(test, Alter("foo"))
testProbe.expectMsg("j")
testProbe.send(test, GetState)
testProbe.expectMsg(TheState(id = "foo"))

testProbe.send(test, Alter("bar"))
testProbe.expectMsg("j")
testProbe.send(test, GetState)
testProbe.expectMsg(TheState(id = "bar"))

database.run(journals.size.result).futureValue shouldBe 2
}

override def beforeAll() {
System.setProperty(Context.INITIAL_CONTEXT_FACTORY, "tyrex.naming.MemoryContextFactory")
System.setProperty(Context.PROVIDER_URL, "/")

val simpleDataSource = new PGSimpleDataSource()
simpleDataSource.setUrl(pluginConfig.dbConfig.getString("url"))
simpleDataSource.setUser(pluginConfig.dbConfig.getString("user"))
simpleDataSource.setPassword(pluginConfig.dbConfig.getString("password"))
simpleDataSource.setPrepareThreshold(1)

new InitialContext().rebind("MyDS", simpleDataSource)

database.run(recreateSchema.andThen(createTables)).futureValue
}

override protected def afterAll(): Unit = {
system.terminate()
system.whenTerminated.futureValue
()
}


}


Expand Up @@ -4,7 +4,7 @@ import java.util.UUID

import akka.actor.{Props, ActorLogging}
import akka.persistence.PersistentActor
import akka.persistence.pg.PgPostgresDriver
import akka.persistence.pg.PgPostgresProfile
import akka.persistence.pg.event.{EventWrapper, ExtraDBIOSupport}
import akka.persistence.pg.perf.Messages.{Altered, Alter}
import slick.jdbc.{GetResult, PositionedResult}
Expand All @@ -13,10 +13,10 @@ import scala.language.postfixOps
import scala.util.Random

object RandomDelayPerfActor {
def props(driver: PgPostgresDriver) = Props(new RandomDelayPerfActor(driver))
def props(driver: PgPostgresProfile) = Props(new RandomDelayPerfActor(driver))
}

class RandomDelayPerfActor(driver: PgPostgresDriver) extends PersistentActor with ActorLogging {
class RandomDelayPerfActor(driver: PgPostgresProfile) extends PersistentActor with ActorLogging {

override val persistenceId: String = "TestActor_"+UUID.randomUUID().toString

Expand Down
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Props
import akka.persistence.PersistentActor
import akka.persistence.pg.event.{EventWrapper, ExtraDBIOSupport}
import akka.persistence.pg.PgPostgresDriver
import akka.persistence.pg.PgPostgresProfile
import akka.persistence.pg.perf.Messages.{Altered, Alter}
import akka.persistence.pg.perf.ReadModelUpdateActor.TextNotUnique
import org.postgresql.util.PSQLException
Expand All @@ -17,11 +17,11 @@ object ReadModelUpdateActor {
private val id = new AtomicInteger(0)
def reset() = id.set(0)

def props(driver: PgPostgresDriver, fullTableName: String) = Props(new ReadModelUpdateActor(driver, fullTableName, id.incrementAndGet()))
def props(driver: PgPostgresProfile, fullTableName: String) = Props(new ReadModelUpdateActor(driver, fullTableName, id.incrementAndGet()))
}


class ReadModelUpdateActor(driver: PgPostgresDriver, fullTableName: String, id: Int) extends PersistentActor {
class ReadModelUpdateActor(driver: PgPostgresProfile, fullTableName: String, id: Int) extends PersistentActor {

override val persistenceId: String = s"TestActor_$id"

Expand Down
Expand Up @@ -9,8 +9,9 @@ import akka.persistence.pg.event._
import akka.persistence.pg.journal.JournalTable
import akka.persistence.pg.perf.Messages.Alter
import akka.persistence.pg.perf.{PerfActor, RandomDelayPerfActor}
import akka.persistence.pg.snapshot.SnapshotTable
import akka.persistence.pg.util.{CreateTables, RecreateSchema}
import akka.persistence.pg.{PgExtension, WaitForEvents, PgConfig, PluginConfig}
import akka.persistence.pg.{PgConfig, PgExtension, PluginConfig, WaitForEvents}
import akka.util.Timeout
import com.typesafe.config.Config
import org.scalatest._
Expand All @@ -28,6 +29,7 @@ abstract class WriteStrategySuite(config: Config) extends FunSuite
with Matchers
with BeforeAndAfterAll
with JournalTable
with SnapshotTable
with RecreateSchema
with CreateTables
with PgConfig
Expand Down Expand Up @@ -81,7 +83,7 @@ abstract class WriteStrategySuite(config: Config) extends FunSuite

override def beforeAll() {
database.run(
recreateSchema.andThen(journals.schema.create)
recreateSchema.andThen(journals.schema.create).andThen(snapshots.schema.create)
).futureValue
actors = 1 to 10 map { _ => system.actorOf(RandomDelayPerfActor.props(driver)) }
}
Expand Down

0 comments on commit bacf6a4

Please sign in to comment.