Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix not working JournalDaoStreamMessagesMemoryTest #831

Merged
merged 3 commits into from
May 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@

package akka.persistence.jdbc.query

import java.lang.management.ManagementFactory
import java.lang.management.MemoryMXBean
import java.util.UUID

import akka.actor.ActorSystem
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.persistence.jdbc.config.JournalConfig
import akka.persistence.jdbc.journal.dao.JournalDao
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables }
import akka.serialization.SerializationExtension
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import java.lang.management.{ ManagementFactory, MemoryMXBean }
import java.util.UUID
import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.util.{ Failure, Success }
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.matchers.should.Matchers

object JournalDaoStreamMessagesMemoryTest {

Expand All @@ -33,33 +34,34 @@ object JournalDaoStreamMessagesMemoryTest {
}

abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides)
with JournalTables
with Matchers {
extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) {
import JournalDaoStreamMessagesMemoryTest.MB

private val log = LoggerFactory.getLogger(this.getClass)

val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
val journalTableCfg = journalConfig.journalTableConfiguration

implicit val askTimeout: FiniteDuration = 50.millis

def generateId: Int = 0

val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean

behavior.of("Replaying Persistence Actor")

it should "stream events" in {
if (newDao)
pending
withActorSystem { implicit system: ActorSystem =>
withDatabase { db =>
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

val persistenceId = UUID.randomUUID().toString
val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system))
val writerUuid = UUID.randomUUID().toString
val fqcn = journalConfig.pluginConfig.dao
val args = Seq(
(classOf[Database], db),
(classOf[JdbcProfile], profile),
(classOf[JournalConfig], journalConfig),
(classOf[Serialization], SerializationExtension(system)),
(classOf[ExecutionContext], ec),
(classOf[Materializer], mat))
val dao: JournalDao =
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match {
case Success(dao) => dao
case Failure(cause) => throw cause
}

val payloadSize = 5000 // 5000 bytes
val eventsPerBatch = 1000
Expand Down Expand Up @@ -87,8 +89,8 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
log.info(s"batch $i - events from $start to $end")
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId)))
}.toSeq
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid)))
}

dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
Expand Down