Skip to content

Commit

Permalink
Add MySQL implementation (#803)
Browse files Browse the repository at this point in the history
* Add MySQL implementation

* Fix code style
  • Loading branch information
steventwheeler committed Mar 11, 2024
1 parent df901fa commit 98710d8
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration
lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match {
case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg)
case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg)
case "MySQL" => new MySQLSequenceNextValUpdater(profile, durableStateTableCfg)
case _ => ???
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,19 @@ import slick.sql.SqlStreamingAction

def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String]
}

/**
* INTERNAL API
*/
@InternalApi private[jdbc] class MySQLSequenceNextValUpdater(
profile: JdbcProfile,
val durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {
import profile.api._
private val schema = durableStateTableCfg.schemaName.map(n => s"'$n'").getOrElse("DATABASE()")
// Note: for actual MySQL servers (i.e. not MariaDB) the variable information_schema_stats_expiry should be set to zero.
final val nextValFetcher =
s"""(SELECT AUTO_INCREMENT FROM information_schema.tables WHERE table_name = '${durableStateTableCfg.tableName}' AND table_schema = ${schema})"""

def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package akka.persistence.jdbc.integration

import com.typesafe.config.ConfigFactory
import slick.jdbc.MySQLProfile
import akka.persistence.jdbc.state.scaladsl.DurableStateStorePluginSpec

class MySQLDurableStateStorePluginSpec
extends DurableStateStorePluginSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQLProfile) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package akka.persistence.jdbc.integration

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec
import akka.persistence.jdbc.testkit.internal.Mysql

class MySQLScalaJdbcDurableStateStoreQueryTest
extends JdbcDurableStateSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQL) {
implicit lazy val system: ActorSystem =
ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers))
}

0 comments on commit 98710d8

Please sign in to comment.