-
Notifications
You must be signed in to change notification settings - Fork 14
/
VulcanCodecTest.scala
30 lines (25 loc) · 1.13 KB
/
VulcanCodecTest.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package compstak.kafkastreams4s.tests
import cats.effect.{IO, Resource}
import cats.syntax.all._
import cats.effect.unsafe.implicits.global
import compstak.kafkastreams4s.testing.KafkaStreamsTestRunner
import compstak.kafkastreams4s.vulcan.{VulcanCodec, VulcanTable}
import org.apache.kafka.streams.StreamsBuilder
class VulcanCodecTest extends munit.FunSuite {
test("Using `mapCodec` should serialize and deserialize using the VulcanCodec") {
val input = Map("foo" -> 1, "bar" -> 2, "baz" -> 3, "qux" -> 2)
val sb = new StreamsBuilder
val table = VulcanTable[String, Int](sb, "origin")
val result = table.mapCodec[VulcanCodec, VulcanCodec]
Resource
.eval(result.to[IO]("out") >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable2[IO, VulcanCodec, VulcanCodec](driver, "origin", input.toList: _*) >>
KafkaStreamsTestRunner
.outputTestTable2[IO, VulcanCodec, String, VulcanCodec, Int](driver, "out")
.map(outputData => assertEquals(outputData, input))
)
.unsafeToFuture
}
}