diff --git a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala b/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala index d06bf67..13e2d0e 100644 --- a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala +++ b/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala @@ -20,6 +20,7 @@ import cats.collections.Heap import cats.effect.IO import cats.effect.std.AtomicCell import cats.effect.testkit.TestControl +import cats.implicits.catsSyntaxOptionId import cats.syntax.either._ import cats.syntax.traverse._ import com.commercetools.queue.testing._ @@ -98,37 +99,143 @@ class SubscriberSuite extends CatsEffectSuite { } } - queueSub.test("Messages consumed with process must follow the decision") { case (queue, subscriber, publisher) => + queueSub.test("Messages consumed and ok'ed or drop'ed should follow the decision") { + case (queue, subscriber, publisher) => + TestControl + .executeEmbed(for { + // first populate the queue + messages <- List.range(0, 100).traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) + } + _ <- queue.setAvailableMessages(messages) + result <- subscriber + .process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] { + override def handle(msg: Message[IO, String]): IO[Decision[Int]] = + if (msg.rawPayload.toInt % 2 == 0) IO.pure(Decision.Drop) + else IO.pure(Decision.Ok(1)) + }) + .interruptAfter(3.seconds) + .compile + .foldMonoid + } yield result) + .flatMap { result => + for { + _ <- assertIO(queue.getAvailableMessages, Nil) + _ = assertEquals(result, 50.asRight) + } yield () + } + } + + queueSub.test("Messages consumed and confirmed or dropped should follow the decision") { + case (queue, subscriber, publisher) => + TestControl + .executeEmbed(for { + // first populate the queue + messages <- List.range(0, 100).traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) + } + _ <- queue.setAvailableMessages(messages) + result <- subscriber + .process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] { + override def handle(msg: Message[IO, String]): IO[Decision[Int]] = + if (msg.rawPayload.toInt % 2 == 0) IO.pure(Decision.Ok(1)) + else IO.pure(Decision.Drop) + }) + .take(50) + .compile + .foldMonoid + } yield result) + .flatMap { result => + for { + _ <- assertIO(queue.getAvailableMessages, Nil) + _ = assertEquals(result, 50.asRight) + } yield () + } + } + + queueSub.test("Messages consumed and requeued should follow the decision") { case (queue, subscriber, publisher) => TestControl .executeEmbed(for { // first populate the queue messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _)) + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) } _ <- queue.setAvailableMessages(messages) - // then process messages in batches of 5 - // processing is (virtually) instantaneous in this case, - // so messages are immediately acked, from the mocked time PoV - // however, receiving messages waits for the amount of provided `waitingTime` - // in the test queue implementation, event if enough messages are available - // so this step makes time progress in steps of `waitingTime` + opCounter <- AtomicCell[IO].of(0) result <- subscriber .process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] { override def handle(msg: Message[IO, String]): IO[Decision[Int]] = - IO.pure(Decision.Ok(1)) + opCounter.update(_ + 1) >> { + // let's reenqueue at the first run, and then confirm + if (msg.metadata.contains("reenqueued")) IO.pure(Decision.Ok(1)) + else IO.pure(Decision.Reenqueue(Map("reenqueued" -> "true").some, None)) + } }) - .interruptAfter(3.seconds) + .take(100) .compile .foldMonoid - } yield result) - .flatMap { result => + totOpCount <- opCounter.get + } yield (result, totOpCount)) + .flatMap { case (result, totOpCount) => for { - _ <- assertIO(IO.pure(result), 100.asRight) _ <- assertIO(queue.getAvailableMessages, Nil) - _ <- assertIO(queue.getLockedMessages, Nil) - _ <- assertIO(queue.getDelayedMessages, Nil) + _ = assertEquals(totOpCount, 200) + _ = assertEquals(result, 100.asRight) } yield () } } + queueSub.test("Messages that are marked as failed and acked should follow the decision") { + case (queue, subscriber, publisher) => + TestControl + .executeEmbed(for { + // first populate the queue + messages <- List.range(0, 100).traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) + } + _ <- queue.setAvailableMessages(messages) + result <- subscriber + .process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] { + override def handle(msg: Message[IO, String]): IO[Decision[Int]] = + IO.pure(Decision.Fail(new Throwable(s"failed ${msg.rawPayload}"), ack = true)) + }) + .take(100) + .collect { case Left(_) => 1 } + .compile + .foldMonoid + } yield result) + .flatMap { result => + for { + _ <- assertIO(queue.getAvailableMessages, Nil) + _ = assertEquals(result, 100) + } yield () + } + } + + queueSub.test("Messages that are marked as failed and not acked should follow the decision") { + case (queue, subscriber, publisher) => + TestControl + .executeEmbed(for { + // first populate the queue + messages <- List.range(0, 100).traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) + } + _ <- queue.setAvailableMessages(messages) + result <- subscriber + .process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] { + override def handle(msg: Message[IO, String]): IO[Decision[Int]] = + IO.pure(Decision.Fail(new Throwable(s"failed ${msg.rawPayload}"), ack = false)) + }) + .take(100) + .collect { case Left(_) => 1 } + .compile + .foldMonoid + } yield result) + .flatMap { result => + for { + _ <- assertIO(queue.getAvailableMessages.map(_.size), 100) + _ = assertEquals(result, 100) + } yield () + } + } } diff --git a/core/src/test/scala/com/commercetools/queue/testing/LockedTestMessage.scala b/core/src/test/scala/com/commercetools/queue/testing/LockedTestMessage.scala index b9d7b84..80df5ee 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/LockedTestMessage.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/LockedTestMessage.scala @@ -40,7 +40,7 @@ final case class LockedTestMessage[T]( override def enqueuedAt: Instant = msg.enqueuedAt - override val metadata: Map[String, String] = Map.empty + override val metadata: Map[String, String] = msg.metadata override def ack(): IO[Unit] = // done, just delete it diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestMessage.scala b/core/src/test/scala/com/commercetools/queue/testing/TestMessage.scala index 5d86a3d..aae0f6b 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestMessage.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestMessage.scala @@ -20,7 +20,7 @@ import cats.Order import java.time.Instant -final case class TestMessage[T](payload: T, enqueuedAt: Instant) +final case class TestMessage[T](payload: T, enqueuedAt: Instant, metadata: Map[String, String] = Map.empty) object TestMessage { diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala index 26d74d6..b836a98 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala @@ -126,10 +126,14 @@ class TestQueue[T]( state <- update(state) } yield delay match { case None => - state.copy(available = state.available.addAll(messages.map(x => TestMessage(x._1, now)))) + state.copy(available = state.available.addAll(messages.map { case (payload, metadata) => + TestMessage(payload, now, metadata) + })) case Some(delay) => val delayed = now.plusMillis(delay.toMillis) - state.copy(delayed = messages.map(x => TestMessage(x._1, delayed)) reverse_::: state.delayed) + state.copy(delayed = messages.map { case (payload, metadata) => + TestMessage(payload, delayed, metadata) + } reverse_::: state.delayed) } }