Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use system.registerOnTermination instead of Coordinated shutdown #193

Merged
merged 3 commits into from Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -316,7 +316,7 @@ For more information please review the two default implementations `akka.persist

## Explicitly shutting down the database connections
The plugin automatically shuts down the HikariCP connection pool when the ActorSystem is terminated.
This is done using [Coordinated Shutdown](https://doc.akka.io/docs/akka/current/actors.html#coordinated-shutdown).
This is done using [ActorSystem.registerOnTermination](https://doc.akka.io/api/akka/current/akka/actor/ActorSystem.html#registerOnTermination[T](code:=>T):Unit).

[slick]: http://slick.lightbend.com/
[slick-jndi]: http://slick.typesafe.com/doc/3.1.1/database.html#using-a-jndi-name
Expand Down
10 changes: 1 addition & 9 deletions src/main/resources/reference.conf
Expand Up @@ -16,12 +16,6 @@ akka-persistence-jdbc {

database-provider-fqcn = "akka.persistence.jdbc.util.DefaultSlickDatabaseProvider"

# (only used when database-provider-fqcn is set to the default "akka.persistence.jdbc.util.DefaultSlickDatabaseProvider")
# Automatically close the database connection using coordinated shutdown
shared-db-add-shutdown-hook = true
# The phase in which the database connection will be closed (used only if shared-db-add-shutdown-hook is enabled)
shared-db-coordinated-shutdown-phase = "before-actor-system-terminate"

shared-databases {
// Shared databases can be defined here.
// This reference config contains a partial example if a shared database which is enabled by configuring "slick" as the shared db
Expand Down Expand Up @@ -354,10 +348,8 @@ jdbc-read-journal {
# are delivered downstreams.
max-buffer-size = "500"

# Automatically close the database connection using Coordinated shutdown
# If enabled, automatically close the database connection when the actor system is terminated
add-shutdown-hook = true
# The phase in which the database connection will be closed (used only if add-shutdown-hook is enabled)
coordinated-shutdown-phase = "before-actor-system-terminate"

# This setting can be used to configure usage of a shared database.
# To disable usage of a shared database, set to null or an empty string.
Expand Down
Expand Up @@ -16,7 +16,6 @@

package akka.persistence.jdbc.config

import akka.actor.CoordinatedShutdown
import akka.persistence.jdbc.util.ConfigOps._
import com.typesafe.config.Config

Expand Down Expand Up @@ -132,6 +131,5 @@ class ReadJournalConfig(config: Config) {
val refreshInterval: FiniteDuration = config.asFiniteDuration("refresh-interval", 1.second)
val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt
val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true)
val coordinatedShutdownPhase: String = config.asString("coordinated-shutdown-phase", CoordinatedShutdown.PhaseBeforeActorSystemTerminate)
override def toString: String = s"ReadJournalConfig($slickConfiguration,$journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$coordinatedShutdownPhase)"
override def toString: String = s"ReadJournalConfig($slickConfiguration,$journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook)"
}
Expand Up @@ -23,8 +23,8 @@ import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.persistence.jdbc.config.JournalConfig
import akka.persistence.jdbc.journal.JdbcAsyncWriteJournal.{ InPlaceUpdateEvent, WriteFinished }
import akka.persistence.jdbc.journal.dao.{ JournalDao, JournalDaoWithUpdates }
import akka.persistence.jdbc.util.SlickExtension
import akka.persistence.journal.{ AsyncWriteJournal, Tagged }
import akka.persistence.jdbc.util.{ SlickDatabase, SlickExtension }
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream.{ ActorMaterializer, Materializer }
Expand Down Expand Up @@ -55,12 +55,12 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
implicit val mat: Materializer = ActorMaterializer()
val journalConfig = new JournalConfig(config)

val slickExtension = SlickExtension(system)
val db: Database = slickExtension.database(config)
val slickDb: SlickDatabase = SlickExtension(system).database(config)
def db: Database = slickDb.database

val journalDao: JournalDao = {
val fqcn = journalConfig.pluginConfig.dao
val profile: JdbcProfile = slickExtension.profile(config)
val profile: JdbcProfile = slickDb.profile
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
Expand Down Expand Up @@ -118,7 +118,7 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
.map(_ => ())

override def postStop(): Unit = {
if (journalConfig.useSharedDb.isEmpty) {
if (slickDb.allowShutdown) {
// Since a (new) db is created when this actor (re)starts, we must close it when the actor stops
db.close()
}
Expand Down
Expand Up @@ -17,8 +17,8 @@
package akka.persistence.jdbc.query
package scaladsl

import akka.{ Done, NotUsed }
import akka.actor.{ CoordinatedShutdown, ExtendedActorSystem }
import akka.NotUsed
import akka.actor.ExtendedActorSystem
import akka.persistence.jdbc.config.ReadJournalConfig
import akka.persistence.jdbc.query.JournalSequenceActor.{ GetMaxOrderingId, MaxOrderingId }
import akka.persistence.jdbc.query.dao.ReadJournalDao
Expand Down Expand Up @@ -73,18 +73,16 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E
private val eventAdapters = Persistence(system).adaptersFor(writePluginId)

val readJournalDao: ReadJournalDao = {
val slickExtension = SlickExtension(system)
val db = slickExtension.database(config)
if (readJournalConfig.addShutdownHook) {
CoordinatedShutdown(system).addTask(readJournalConfig.coordinatedShutdownPhase, "close-jdbc-read-journal-db") { () =>
Future {
db.close()
Done
}
val slickDb = SlickExtension(system).database(config)
val db = slickDb.database
if (readJournalConfig.addShutdownHook && slickDb.allowShutdown) {
system.registerOnTermination {
db.close()

}
}
val fqcn = readJournalConfig.pluginConfig.dao
val profile: JdbcProfile = slickExtension.profile(config)
val profile: JdbcProfile = slickDb.profile
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
Expand Down
Expand Up @@ -19,7 +19,7 @@ package akka.persistence.jdbc.snapshot
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.persistence.jdbc.config.SnapshotConfig
import akka.persistence.jdbc.snapshot.dao.SnapshotDao
import akka.persistence.jdbc.util.SlickExtension
import akka.persistence.jdbc.util.{ SlickDatabase, SlickExtension }
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import akka.serialization.{ Serialization, SerializationExtension }
Expand Down Expand Up @@ -47,12 +47,12 @@ class JdbcSnapshotStore(config: Config) extends SnapshotStore {
implicit val mat: Materializer = ActorMaterializer()
val snapshotConfig = new SnapshotConfig(config)

val slickExtension = SlickExtension(system)
val db: Database = slickExtension.database(config)
val slickDb: SlickDatabase = SlickExtension(system).database(config)
def db: Database = slickDb.database

val snapshotDao: SnapshotDao = {
val fqcn = snapshotConfig.pluginConfig.dao
val profile: JdbcProfile = slickExtension.profile(config)
val profile: JdbcProfile = slickDb.profile
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
Expand Down Expand Up @@ -104,7 +104,7 @@ class JdbcSnapshotStore(config: Config) extends SnapshotStore {
}

override def postStop(): Unit = {
if (snapshotConfig.useSharedDb.isEmpty) {
if (slickDb.allowShutdown) {
// Since a (new) db is created when this actor (re)starts, we must close it when the actor stops
db.close()
}
Expand Down
118 changes: 118 additions & 0 deletions src/main/scala/akka/persistence/jdbc/util/SlickDatabase.scala
@@ -0,0 +1,118 @@
/*
* Copyright 2016 Dennis Vriend
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.persistence.jdbc.util

import akka.actor.ActorSystem
import javax.naming.InitialContext
import akka.persistence.jdbc.config.SlickConfiguration
import com.typesafe.config.Config
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import slick.jdbc.JdbcBackend._

/**
* INTERNAL API
*/
@deprecated(message = "Internal API, will be removed in 4.0.0", since = "3.4.0")
object SlickDriver {

/**
* INTERNAL API
*/
@deprecated(message = "Internal API, will be removed in 4.0.0", since = "3.4.0")
def forDriverName(config: Config): JdbcProfile =
SlickDatabase.profile(config, "slick")
}

/**
* INTERNAL API
*/
object SlickDatabase {

/**
* INTERNAL API
*/
@deprecated(message = "Internal API, will be removed in 4.0.0", since = "3.4.0")
def forConfig(config: Config, slickConfiguration: SlickConfiguration): Database = {
database(config, slickConfiguration, "slick.db")
}

/**
* INTERNAL API
*/
private[jdbc] def profile(config: Config, path: String): JdbcProfile =
DatabaseConfig.forConfig[JdbcProfile](path, config).profile

/**
* INTERNAL API
*/
private[jdbc] def database(config: Config, slickConfiguration: SlickConfiguration, path: String): Database = {
slickConfiguration.jndiName
.map(Database.forName(_, None))
.orElse {
slickConfiguration.jndiDbName.map(
new InitialContext().lookup(_).asInstanceOf[Database])
}
.getOrElse(Database.forConfig(path, config))
}

/**
* INTERNAL API
*/
private[jdbc] def strict(config: Config, slickConfiguration: SlickConfiguration, path: String): SlickDatabase = {
val dbPath = if (path.isEmpty) "db" else s"$path.db"
StrictSlickDatabase(
database(config, slickConfiguration, dbPath),
profile(config, path))
}
}

trait SlickDatabase {
def database: Database
def profile: JdbcProfile

/**
* If true, the requesting side usualy a (read/write/snapshot journal)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: usually

* should shutdown the database when it closes. If false, it should leave
* the database connection pool open, since it might still be used elsewhere.
*/
def allowShutdown: Boolean
}

case class StrictSlickDatabase(database: Database, profile: JdbcProfile) extends SlickDatabase {
Copy link
Member Author

@WellingR WellingR Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure if this is the best name for the "simple" version if the SlickDatabase. Any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should call it EagerSlickDatabase instead as opposed to the lazy one.

override def allowShutdown: Boolean = true
}

/**
* A LazySlickDatabase lazily initializes a database, it also manages the shutdown of the database
* @param config The configuration used to create the database
*/
class LazySlickDatabase(config: Config, system: ActorSystem) extends SlickDatabase {
val profile: JdbcProfile = SlickDatabase.profile(config, path = "")

lazy val database: Database = {
val db = SlickDatabase.database(config, new SlickConfiguration(config), path = "db")
system.registerOnTermination {
db.close()

}
db
}

/** This database shutdown is managed by the db holder, so users of this db do not need to bother shutting it down */
override def allowShutdown: Boolean = false
}
71 changes: 0 additions & 71 deletions src/main/scala/akka/persistence/jdbc/util/SlickDriver.scala

This file was deleted.