/
ConsumerMainSpec.scala
71 lines (54 loc) · 2.31 KB
/
ConsumerMainSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package uk.co.odinconsultants.fp.cats.fs2.example
import java.util.concurrent.atomic.AtomicInteger
import cats.effect.IO
import cats.effect.concurrent.Ref
import org.scalatest.{Matchers, WordSpec}
import fs2.{Pipe, Stream}
class ConsumerMainSpec extends WordSpec with Matchers {
import ConsumerMain._
case class MockKafka()
case class MockRecord(id: Int)
case class MockProducerRecords(id: Int)
case class MockCommittableOffset(id: Int)
"Kafka pipeline" should {
"Read, write and commit" in {
/*
"StateT is not safe to use with effect types, because it's not safe in the face of concurrent access.
Instead, consider using a Ref (from either fs2 or cats-effect, depending what version)."
https://stackoverflow.com/questions/51624763/fs2-stream-with-statetio-periodically-dumping-state
*/
val nToRead = 10
val nReadCommitted = Ref[IO].of(0)
val nWriteCommitted = Ref[IO].of(0)
val s = Stream.emit(MockKafka()).covary[IO]
val subscribe: MockKafka => IO[Unit] =
_ => IO {
println("subscribed")
}
val toRecords: MockKafka => Stream[IO, MockRecord] =
_ => Stream.emits((1 to nToRead).map(x => MockRecord(x))).covary[IO]
val producerPipe: Pipe[IO, MockProducerRecords, MockRecord] =
s => s.map(p => MockRecord(p.id))
val toWriteRecords: MockRecord => MockCommittableOffset =
r => MockCommittableOffset(r.id)
import cats.implicits._
Stream.eval {
nReadCommitted product nWriteCommitted
}.flatMap { case (readState, writeState) =>
val commitRead: MockRecord => IO[MockProducerRecords] = r => readState.update(_ + 1).flatMap(_ => IO {
MockProducerRecords(r.id)
})
val commitWrite: Pipe[IO, MockCommittableOffset, Int] = s => s.flatMap(c => Stream.eval(writeState.update(_ + 1).flatMap(_ => IO { c.id })))
val x = pipeline(s, subscribe, toRecords, commitRead, producerPipe, toWriteRecords, commitWrite)
x.append {
Stream.eval(makeAssertion(nToRead, readState) product makeAssertion(nToRead, writeState))
}
}.compile.drain.unsafeRunSync()
}
}
private def makeAssertion(expected: Int, state: Ref[IO, Int]): IO[Unit] =
state.get.flatMap(x => IO {
x shouldBe expected
println(s"x = $x")
})
}