Skip to content

Commit

Permalink
fix: enable JournalDaoStreamMessagesMemoryTest for new dao (#831)
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed May 7, 2024
1 parent 9b02b1d commit 646c051
Showing 1 changed file with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@

package akka.persistence.jdbc.query

import java.lang.management.ManagementFactory
import java.lang.management.MemoryMXBean
import java.util.UUID

import akka.actor.ActorSystem
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.persistence.jdbc.config.JournalConfig
import akka.persistence.jdbc.journal.dao.JournalDao
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables }
import akka.serialization.SerializationExtension
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import java.lang.management.{ ManagementFactory, MemoryMXBean }
import java.util.UUID
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.util.{ Failure, Success }
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.matchers.should.Matchers

object JournalDaoStreamMessagesMemoryTest {

Expand All @@ -33,33 +34,34 @@ object JournalDaoStreamMessagesMemoryTest {
}

abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides)
with JournalTables
with Matchers {
extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) {
import JournalDaoStreamMessagesMemoryTest.MB

private val log = LoggerFactory.getLogger(this.getClass)

val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
val journalTableCfg = journalConfig.journalTableConfiguration

implicit val askTimeout: FiniteDuration = 50.millis

def generateId: Int = 0

val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean

behavior.of("Replaying Persistence Actor")

it should "stream events" in {
if (newDao)
pending
withActorSystem { implicit system: ActorSystem =>
withDatabase { db =>
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

val persistenceId = UUID.randomUUID().toString
val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system))
val writerUuid = UUID.randomUUID().toString
val fqcn = journalConfig.pluginConfig.dao
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
(classOf[JournalConfig], journalConfig),
(classOf[Serialization], SerializationExtension(system)),
(classOf[ExecutionContext], ec),
(classOf[Materializer], mat))
val dao: JournalDao =
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match {
case Success(dao) => dao
case Failure(cause) => throw cause
}

val payloadSize = 5000 // 5000 bytes
val eventsPerBatch = 1000
Expand Down Expand Up @@ -87,8 +89,8 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
log.info(s"batch $i - events from $start to $end")
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId)))
}.toSeq
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid)))
}

dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
Expand Down

0 comments on commit 646c051

Please sign in to comment.