diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/ChangeSliceRangesSpec.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/ChangeSliceRangesSpec.scala new file mode 100644 index 000000000..a55945685 --- /dev/null +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/ChangeSliceRangesSpec.scala @@ -0,0 +1,256 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.projection.r2dbc + +import java.util.UUID + +import scala.annotation.tailrec +import scala.concurrent.Future +import scala.concurrent.duration._ + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.persistence.typed.PersistenceId +import akka.projection.ProjectionBehavior +import akka.projection.ProjectionId +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.r2dbc.scaladsl.R2dbcHandler +import akka.projection.r2dbc.scaladsl.R2dbcProjection +import akka.projection.r2dbc.scaladsl.R2dbcSession +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +object ChangeSliceRangesSpec { + + val config: Config = ConfigFactory + .parseString(""" + akka.persistence.r2dbc { + query { + backtracking { + window = 5 seconds + behind-current-time = 3 seconds + } + } + } + """) + .withFallback(TestConfig.config) + + final case class Processed(projectionId: ProjectionId, envelope: EventEnvelope[String]) + +} + +class ChangeSliceRangesSpec + extends ScalaTestWithActorTestKit(ChangeSliceRangesSpec.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + import ChangeSliceRangesSpec._ + import EventSourcedEndToEndSpec.Persister + + override def typedSystem: ActorSystem[_] = system + + private val log = LoggerFactory.getLogger(getClass) + + private val projectionSettings = R2dbcProjectionSettings(system) + + private class TestHandler(projectionId: ProjectionId, probe: ActorRef[Processed], delaySlice: Int) + extends R2dbcHandler[EventEnvelope[String]] { + private val log = LoggerFactory.getLogger(getClass) + + override def process(session: R2dbcSession, envelope: EventEnvelope[String]): Future[Done] = { + val slice = persistenceExt.sliceForPersistenceId(envelope.persistenceId) + log.debugN("{} Processed {}, pid {}, slice {}", projectionId.key, envelope.event, envelope.persistenceId, slice) + probe ! Processed(projectionId, envelope) + if (slice == delaySlice) + akka.pattern.after(3.second)(Future.successful(Done)) + else + Future.successful(Done) + } + } + + private def startProjections( + entityType: String, + projectionName: String, + nrOfProjections: Int, + processedProbe: ActorRef[Processed], + delaySlice: Int = -1): Vector[ActorRef[ProjectionBehavior.Command]] = { + val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, nrOfProjections) + + sliceRanges.map { range => + val projectionId = ProjectionId(projectionName, s"${range.min}-${range.max}") + val sourceProvider = + EventSourcedProvider + .eventsBySlices[String](system, R2dbcReadJournal.Identifier, entityType, range.min, range.max) + val projection = R2dbcProjection + .exactlyOnce( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = () => new TestHandler(projectionId, processedProbe.ref, delaySlice)) + spawn(ProjectionBehavior(projection)) + }.toVector + } + + def persistenceIdForSlice(entityType: String, slice: Int): PersistenceId = { + @tailrec def loop(n: Int): PersistenceId = { + val candidate = PersistenceId(entityType, s"p$n") + if (persistenceExt.sliceForPersistenceId(candidate.id) == slice) + candidate + else + loop(n + 1) + } + loop(0) + } + + private def mkEvent(n: Int): String = { + val template = "0000000" + val s = n.toString + "e" + (template + s).takeRight(5) + } + + private def assertEventsProcessed( + expectedEvents: Vector[String], + processedProbe: TestProbe[Processed], + verifyProjectionId: Boolean): Unit = { + val expectedNumberOfEvents = expectedEvents.size + var processed = Vector.empty[Processed] + + (1 to expectedNumberOfEvents).foreach { _ => + // not using receiveMessages(expectedEvents) for better logging in case of failure + try { + processed :+= processedProbe.receiveMessage(15.seconds) + } catch { + case e: AssertionError => + val missing = expectedEvents.diff(processed.map(_.envelope.event)) + log.error(s"Processed [${processed.size}] events, but expected [$expectedNumberOfEvents]. " + + s"Missing [${missing.mkString(",")}]. " + + s"Received [${processed.map(p => s"(${p.envelope.event}, ${p.envelope.persistenceId}, ${p.envelope.sequenceNr})").mkString(", ")}]. ") + throw e + } + } + + if (verifyProjectionId) { + val byPid = processed.groupBy(_.envelope.persistenceId) + byPid.foreach { + case (_, processedByPid) => + // all events of a pid must be processed by the same projection instance + processedByPid.map(_.projectionId).toSet.size shouldBe 1 + // processed events in right order + processedByPid.map(_.envelope.sequenceNr).toVector shouldBe (1 to processedByPid.size).toVector + } + } + } + + s"Changing projection slice ranges (dialect ${r2dbcSettings.dialectName})" must { + + "support scaling up and down" in { + val numberOfEntities = 20 + val numberOfEvents = numberOfEntities * 10 + val entityType = nextEntityType() + + val entities = (0 until numberOfEntities).map { n => + val persistenceId = PersistenceId(entityType, s"p$n") + spawn(Persister(persistenceId), s"$entityType-p$n") + } + + val projectionName = UUID.randomUUID().toString + val processedProbe = createTestProbe[Processed]() + var projections = startProjections(entityType, projectionName, nrOfProjections = 4, processedProbe.ref) + + (1 to numberOfEvents).foreach { n => + val p = n % numberOfEntities + entities(p) ! Persister.Persist(mkEvent(n)) + + if (n % 10 == 0) + Thread.sleep(50) + else if (n % 25 == 0) + Thread.sleep(1500) + + // stop projections + if (n == numberOfEvents / 4) { + val probe = createTestProbe() + projections.foreach { ref => + ref ! ProjectionBehavior.Stop + probe.expectTerminated(ref) + } + } + + // resume projections again but with more nrOfProjections + if (n == (numberOfEvents / 4) + 20) + projections = startProjections(entityType, projectionName, nrOfProjections = 8, processedProbe.ref) + + // stop projections + if (n == numberOfEvents * 3 / 4) { + val probe = createTestProbe() + projections.foreach { ref => + ref ! ProjectionBehavior.Stop + probe.expectTerminated(ref) + } + } + + // resume projections again but with less nrOfProjections + if (n == (numberOfEvents * 3 / 4) + 20) + projections = startProjections(entityType, projectionName, nrOfProjections = 2, processedProbe.ref) + } + + val expectedEvents = (1 to numberOfEvents).map(mkEvent).toVector + assertEventsProcessed(expectedEvents, processedProbe, verifyProjectionId = false) + + projections.foreach(_ ! ProjectionBehavior.Stop) + } + + "support scaling down after long idle" in { + val numberOfEntities = 32 + val numberOfEvents = numberOfEntities * 20 + val entityType = nextEntityType() + + val entities = (0 until numberOfEntities).map { n => + val persistenceId = persistenceIdForSlice(entityType, (1024 / numberOfEntities) * n) + spawn(Persister(persistenceId), s"$entityType-p$n") + } + + val projectionName = UUID.randomUUID().toString + val processedProbe = createTestProbe[Processed]() + // slice 0 is slow, 0-511 falling behind 512-1023 + var projections = + startProjections(entityType, projectionName, nrOfProjections = 4, processedProbe.ref, delaySlice = 0) + + val expectedEvents = (1 to numberOfEvents).map(mkEvent).toVector + + (1 to numberOfEvents).foreach { n => + val p = n % numberOfEntities + entities(p) ! Persister.Persist(mkEvent(n)) + if (n == numberOfEvents / 2) + Thread.sleep( + (r2dbcSettings.querySettings.backtrackingWindow + r2dbcSettings.querySettings.backtrackingBehindCurrentTime + 1.second).toMillis) + } + + // stop projections + val probe = createTestProbe() + projections.foreach { ref => + ref ! ProjectionBehavior.Stop + probe.expectTerminated(ref, 10.seconds) + } + // start again, with less instances + projections = startProjections(entityType, projectionName, nrOfProjections = 2, processedProbe.ref) + + assertEventsProcessed(expectedEvents, processedProbe, verifyProjectionId = false) + + projections.foreach(_ ! ProjectionBehavior.Stop) + } + + } + +} diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala index 1f70f76fa..c1ebd6296 100644 --- a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala @@ -288,62 +288,6 @@ class EventSourcedEndToEndSpec projections.foreach(_ ! ProjectionBehavior.Stop) } - "support change of slice distribution" in { - val numberOfEntities = 20 - val numberOfEvents = numberOfEntities * 10 - val entityType = nextEntityType() - - val entities = (0 until numberOfEntities).map { n => - val persistenceId = PersistenceId(entityType, s"p$n") - spawn(Persister(persistenceId), s"$entityType-p$n") - } - - val projectionName = UUID.randomUUID().toString - val processedProbe = createTestProbe[Processed]() - var projections = startProjections(entityType, projectionName, nrOfProjections = 4, processedProbe.ref) - - (1 to numberOfEvents).foreach { n => - val p = n % numberOfEntities - entities(p) ! Persister.Persist(mkEvent(n)) - - if (n % 10 == 0) - Thread.sleep(50) - else if (n % 25 == 0) - Thread.sleep(1500) - - // stop projections - if (n == numberOfEvents / 4) { - val probe = createTestProbe() - projections.foreach { ref => - ref ! ProjectionBehavior.Stop - probe.expectTerminated(ref) - } - } - - // resume projections again but with more nrOfProjections - if (n == (numberOfEvents / 4) + 20) - projections = startProjections(entityType, projectionName, nrOfProjections = 8, processedProbe.ref) - - // stop projections - if (n == numberOfEvents * 3 / 4) { - val probe = createTestProbe() - projections.foreach { ref => - ref ! ProjectionBehavior.Stop - probe.expectTerminated(ref) - } - } - - // resume projections again but with less nrOfProjections - if (n == (numberOfEvents * 3 / 4) + 20) - projections = startProjections(entityType, projectionName, nrOfProjections = 2, processedProbe.ref) - } - - val expectedEvents = (1 to numberOfEvents).map(mkEvent).toVector - assertEventsProcessed(expectedEvents, processedProbe, verifyProjectionId = false) - - projections.foreach(_ ! ProjectionBehavior.Stop) - } - "accept unknown sequence number if previous is old" in { val entityType = nextEntityType() val pid1 = nextPid(entityType) diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 2265f8ab9..2331c50e0 100644 --- a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -1112,5 +1112,58 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.savePaused(paused = false).futureValue offsetStore.readManagementState().futureValue shouldBe Some(ManagementState(paused = false)) } + + "start from earliest slice when projection key is changed" in { + val projectionId1 = ProjectionId(UUID.randomUUID().toString, "512-767") + val projectionId2 = ProjectionId(projectionId1.name, "768-1023") + val projectionId3 = ProjectionId(projectionId1.name, "512-1023") + val offsetStore1 = new R2dbcOffsetStore( + projectionId1, + Some(new TestTimestampSourceProvider(512, 767, clock)), + system, + settings, + r2dbcExecutor) + val offsetStore2 = new R2dbcOffsetStore( + projectionId2, + Some(new TestTimestampSourceProvider(768, 1023, clock)), + system, + settings, + r2dbcExecutor) + + val p1 = "p500" // slice 645 + val p2 = "p863" // slice 645 + val p3 = "p11" // slice 656 + val p4 = "p92" // slice 905 + + val time1 = TestClock.nowMicros().instant() + val time2 = time1.plusSeconds(1) + val time3 = time1.plusSeconds(2) + val time4 = time1.plusSeconds(3 * 60) // far ahead + + offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time1, Map(p1 -> 1L)), p1, 1L)).futureValue + offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time2, Map(p2 -> 1L)), p2, 1L)).futureValue + offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time3, Map(p3 -> 1L)), p3, 1L)).futureValue + offsetStore2 + .saveOffset(OffsetPidSeqNr(TimestampOffset(time4, Map(p4 -> 1L)), p4, 1L)) + .futureValue + + // after downscaling + val offsetStore3 = new R2dbcOffsetStore( + projectionId3, + Some(new TestTimestampSourceProvider(512, 1023, clock)), + system, + settings, + r2dbcExecutor) + + val offset = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) // this will load from database + offsetStore3.getState().size shouldBe 4 + + offset.timestamp shouldBe time2 + offset.seen shouldBe Map.empty + + // getOffset is used by management api, and that should not be adjusted + TimestampOffset.toTimestampOffset(offsetStore3.getOffset().futureValue.get).timestamp shouldBe time4 + } + } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index 3f948cb4f..94609ec2a 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -20,7 +20,7 @@ import scala.concurrent.Future @InternalApi private[projection] trait OffsetStoreDao { - def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.Record]] + def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] def readPrimitiveOffset(): Future[immutable.IndexedSeq[OffsetSerialization.SingleOffset]] diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 0c29d1047..bd2e44062 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -54,7 +54,7 @@ private[projection] class PostgresOffsetStoreDao( private val selectTimestampOffsetSql: String = sql""" - SELECT slice, persistence_id, seq_nr, timestamp_offset + SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?""" private val insertTimestampOffsetSql: String = @@ -131,7 +131,7 @@ private[projection] class PostgresOffsetStoreDao( s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") } - override def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.Record]] = { + override def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { val (minSlice, maxSlice) = { sourceProvider match { case Some(provider) => (provider.minSlice, provider.maxSlice) @@ -148,11 +148,12 @@ private[projection] class PostgresOffsetStoreDao( .bind(2, projectionId.name) }, row => { + val projectionKey = row.get("projection_key", classOf[String]) val slice = row.get("slice", classOf[java.lang.Integer]) val pid = row.get("persistence_id", classOf[String]) val seqNr = row.get("seq_nr", classOf[java.lang.Long]) val timestamp = row.get("timestamp_offset", classOf[Instant]) - R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp) + R2dbcOffsetStore.RecordWithProjectionKey(R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp), projectionKey) }) } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index f301ab2f6..b11f51f80 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -55,6 +55,7 @@ private[projection] object R2dbcOffsetStore { strictSeqNr: Boolean, fromBacktracking: Boolean, fromPubSub: Boolean) + final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0) @@ -289,8 +290,8 @@ private[projection] class R2dbcOffsetStore( private def readTimestampOffset(): Future[Option[TimestampOffset]] = { idle.set(false) val oldState = state.get() - dao.readTimestampOffset().map { records => - val newState = State(records) + dao.readTimestampOffset().map { recordsWithKey => + val newState = State(recordsWithKey.map(_.record)) logger.debugN( "readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}]", newState.byPid.size, @@ -301,12 +302,36 @@ private[projection] class R2dbcOffsetStore( clearInflight() if (newState == State.empty) { None + } else if (moreThanOneProjectionKey(recordsWithKey)) { + // When downscaling projection instances (changing slice distribution) there + // is a possibility that one of the previous projection instances was further behind than the backtracking + // window, which would cause missed events if we started from latest. In that case we use the latest + // offset of the earliest slice + val latestBySlice = newState.latestBySlice + val earliest = latestBySlice.minBy(_.timestamp).timestamp + Some(TimestampOffset(earliest, Map.empty)) } else { newState.latestOffset } } } + private def moreThanOneProjectionKey(recordsWithKey: immutable.IndexedSeq[RecordWithProjectionKey]): Boolean = { + @tailrec def loop(key: String, iter: Iterator[RecordWithProjectionKey]): Boolean = { + if (iter.hasNext) { + val next = iter.next() + if (next.projectionKey != key) true + else loop(next.projectionKey, iter) + } else + false + } + + if (recordsWithKey.isEmpty) + false + else + loop(recordsWithKey.head.projectionKey, recordsWithKey.iterator) + } + private def readPrimitiveOffset[Offset](): Future[Option[Offset]] = { if (settings.isOffsetTableDefined) { val singleOffsets = dao.readPrimitiveOffset()