From 29b1e0e2dd7ce9878882ca424a8d774be4530f18 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 26 Apr 2024 10:41:33 +0800 Subject: [PATCH 1/3] fix not working JournalDaoStreamMessagesMemoryTest --- .../JournalDaoStreamMessagesMemoryTest.scala | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 7fe1d76b..1d7b418a 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -8,7 +8,6 @@ package akka.persistence.jdbc.query import java.lang.management.ManagementFactory import java.lang.management.MemoryMXBean import java.util.UUID - import akka.actor.ActorSystem import akka.persistence.{ AtomicWrite, PersistentRepr } import akka.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables } @@ -23,34 +22,24 @@ import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ import scala.util.{ Failure, Success } import akka.stream.testkit.scaladsl.TestSink -import org.scalatest.matchers.should.Matchers object JournalDaoStreamMessagesMemoryTest { - val configOverrides: Map[String, ConfigValue] = Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100")) + val configOverrides: Map[String, ConfigValue] = Map( + "jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"), + "jdbc-journal.dao" -> ConfigValueFactory.fromAnyRef("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao")) val MB = 1024 * 1024 } abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) - extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) - with JournalTables - with Matchers { + extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) { import JournalDaoStreamMessagesMemoryTest.MB private val log = LoggerFactory.getLogger(this.getClass) - val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration - val journalTableCfg = journalConfig.journalTableConfiguration - - implicit val askTimeout: FiniteDuration = 50.millis - - def generateId: Int = 0 - val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean - behavior.of("Replaying Persistence Actor") - it should "stream events" in { if (newDao) pending @@ -88,7 +77,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val atomicWrites = (start to end).map { j => AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) - }.toSeq + } dao.asyncWriteMessages(atomicWrites).map(_ => i) } From 090d196e2e1b70ab5403371204fb50d90dffa635 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 26 Apr 2024 11:04:07 +0800 Subject: [PATCH 2/3] working on both default and legacy --- .../JournalDaoStreamMessagesMemoryTest.scala | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 1d7b418a..18d145dd 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -5,29 +5,30 @@ package akka.persistence.jdbc.query -import java.lang.management.ManagementFactory -import java.lang.management.MemoryMXBean -import java.util.UUID -import akka.actor.ActorSystem +import akka.actor.{ ActorSystem, ExtendedActorSystem } +import akka.persistence.jdbc.config.JournalConfig +import akka.persistence.jdbc.journal.dao.JournalDao import akka.persistence.{ AtomicWrite, PersistentRepr } -import akka.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables } -import akka.serialization.SerializationExtension +import akka.serialization.{ Serialization, SerializationExtension } import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ Materializer, SystemMaterializer } import com.typesafe.config.{ ConfigValue, ConfigValueFactory } import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.slf4j.LoggerFactory +import slick.jdbc.JdbcBackend.Database +import slick.jdbc.JdbcProfile +import java.lang.management.{ ManagementFactory, MemoryMXBean } +import java.util.UUID import scala.collection.immutable -import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.util.{ Failure, Success } -import akka.stream.testkit.scaladsl.TestSink object JournalDaoStreamMessagesMemoryTest { - val configOverrides: Map[String, ConfigValue] = Map( - "jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"), - "jdbc-journal.dao" -> ConfigValueFactory.fromAnyRef("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao")) + val configOverrides: Map[String, ConfigValue] = Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100")) val MB = 1024 * 1024 } @@ -41,14 +42,25 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean it should "stream events" in { - if (newDao) - pending withActorSystem { implicit system: ActorSystem => withDatabase { db => implicit val ec: ExecutionContextExecutor = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer val persistenceId = UUID.randomUUID().toString - val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system)) + val fqcn = journalConfig.pluginConfig.dao + val args = Seq( + (classOf[Database], db), + (classOf[JdbcProfile], profile), + (classOf[JournalConfig], journalConfig), + (classOf[Serialization], SerializationExtension(system)), + (classOf[ExecutionContext], ec), + (classOf[Materializer], mat)) + val dao: JournalDao = + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match { + case Success(dao) => dao + case Failure(cause) => throw cause + } val payloadSize = 5000 // 5000 bytes val eventsPerBatch = 1000 @@ -128,3 +140,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) } } } + +class H2JournalDaoStreamMessagesMemoryTest + extends JournalDaoStreamMessagesMemoryTest("h2-application.conf") + with H2Cleaner From eb55dae3ac851c3bc02543feaf26cda0c250f434 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 26 Apr 2024 13:36:37 +0800 Subject: [PATCH 3/3] fix oracle writer issue --- .../jdbc/query/JournalDaoStreamMessagesMemoryTest.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 18d145dd..e1835c52 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -48,6 +48,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) implicit val mat: Materializer = SystemMaterializer(system).materializer val persistenceId = UUID.randomUUID().toString + val writerUuid = UUID.randomUUID().toString val fqcn = journalConfig.pluginConfig.dao val args = Seq( (classOf[Database], db), @@ -88,7 +89,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) log.info(s"batch $i - events from $start to $end") val atomicWrites = (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) } dao.asyncWriteMessages(atomicWrites).map(_ => i) @@ -140,7 +141,3 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) } } } - -class H2JournalDaoStreamMessagesMemoryTest - extends JournalDaoStreamMessagesMemoryTest("h2-application.conf") - with H2Cleaner