Skip to content

Commit

Permalink
Update to cats-effect-3 (#189)
Browse files Browse the repository at this point in the history
* Update to ce3

* Run scalaFmt

* Bump package versions

* Bump dependency versions

* Update documentation

* Update documentation

* Optimize imports

* Bump timeout to 5 minutes

* Bump avro4s version

* Add dependency override

* Update comments

* Bump kafka connect http4s version

Co-authored-by: dininski <vasil.dininski@kaluza.com>
  • Loading branch information
dininski and dininski committed Aug 17, 2021
1 parent 91918f4 commit 642936f
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 65 deletions.
17 changes: 10 additions & 7 deletions build.sbt
Expand Up @@ -27,15 +27,16 @@ inThisBuild(

enablePlugins(DockerComposePlugin)

val Avro4sVersion = "3.1.1"
val CatsEffectVersion = "2.2.0"
val CirceVersion = "0.13.0"
val Avro4sVersion = "4.0.10"
val CatsEffectVersion = "3.1.1"
val CirceVersion = "0.14.1"
val CirceDebeziumVersion = "0.16.0"
val DoobieVersion = "0.9.4"
val FS2KafkaVersion = "1.0.0"
val Http4sVersion = "0.21.7"
val DoobieVersion = "1.0.0-M2"
val FS2KafkaVersion = "2.0.0"
val Http4sVersion = "0.23.0-RC1"
val KafkaVersion = "2.7.0"
val KafkaConnectHttp4sVersion = "0.5.0"

val KafkaConnectHttp4sVersion = "0.6.0"
val MunitVersion = "0.7.19"
val ShapelessVersion = "2.3.3"
val VulcanVersion = "1.2.0"
Expand Down Expand Up @@ -141,6 +142,8 @@ lazy val tests = (project in file("tests"))
"org.http4s" %% "http4s-async-http-client" % Http4sVersion % IntegrationTest,
"org.tpolecat" %% "doobie-postgres" % DoobieVersion % IntegrationTest
),
// fs2-kafka 2.x brings in 2.8.0 to the classpath, which breaks the tests, hence the override
dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "2.7.0",
Defaults.itSettings,
inConfig(IntegrationTest)(ScalafmtPlugin.scalafmtConfigSettings),
testFrameworks += new TestFramework("munit.Framework")
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/scala/compstak/kafkastreams4s/Platform.scala
@@ -1,14 +1,12 @@
package compstak.kafkastreams4s

import cats._, cats.implicits._
import cats.effect.{Async, Concurrent, ExitCase, ExitCode, Resource, Sync}
import cats.effect.implicits._
import cats.effect.concurrent.Deferred
import org.apache.kafka.streams.{KafkaStreams, Topology}
import cats.effect._
import cats.syntax.all._
import org.apache.kafka.streams.KafkaStreams.State
import java.util.Properties
import org.apache.kafka.streams.{KafkaStreams, Topology}

import java.time.Duration
import org.apache.kafka.common.protocol.types.Field.Bool
import java.util.Properties

sealed trait ShutdownStatus
final case object ShutdownComplete extends ShutdownStatus
Expand All @@ -24,15 +22,15 @@ object ShutdownStatus {
}

object Platform {
def run[F[_]: Concurrent](top: Topology, props: Properties, timeout: Duration): F[ShutdownStatus] =
def run[F[_]: Concurrent: Sync: Async](top: Topology, props: Properties, timeout: Duration): F[ShutdownStatus] =
for {
d <- Deferred[F, Boolean]
r <- Resource
.make(
Sync[F].delay(new KafkaStreams(top, props))
)(s => Sync[F].delay(s.close(timeout)).flatMap(b => d.complete(b)))
)(s => Sync[F].delay(s.close(timeout)).flatMap(b => d.complete(b) >> Sync[F].unit))
.use { streams =>
Async[F].async { (k: Either[Throwable, Unit] => Unit) =>
Async[F].async_ { (k: Either[Throwable, Unit] => Unit) =>
streams.setUncaughtExceptionHandler { (_: Thread, e: Throwable) =>
k(Left(e))
}
Expand Down
11 changes: 7 additions & 4 deletions docs/out/Intro.md
Expand Up @@ -61,8 +61,9 @@ Then we join the movies and purchases topics and lastly we use the `scanWith` op
Now, all that's left is to direct the result into an output topic `example.output` and run the program

```scala
import cats.implicits._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import cats._, cats.implicits._
import cats.effect._, cats.effect.implicits._
import compstak.kafkastreams4s.Platform
import org.apache.kafka.streams.Topology
import java.util.Properties
Expand All @@ -73,7 +74,7 @@ val props = new Properties //in real code add the desired configuration to this
val topology: IO[Topology] = result.to[IO]("example.output") >> IO(sb.build())

val main: IO[Unit] =
topology.flatMap(topo => Platform.run[IO](topo, props, Duration.ofSeconds(2)))
topology.flatMap(topo => Platform.run[IO](topo, props, Duration.ofSeconds(2))).void
```

`compstak.kafkastreams4s.Platform` gives us a function `run` to run Kafka Streams programs and takes a topology, java properties and a timeout after which the stream threads will be shut off.
Expand All @@ -93,7 +94,7 @@ import compstak.kafkastreams4s.testing.KafkaStreamsTestRunner
import org.apache.kafka.streams.TopologyTestDriver

val driver: Resource[IO, TopologyTestDriver] =
Resource.liftF(topology).flatMap(KafkaStreamsTestRunner.testDriverResource[IO])
Resource.eval(topology).flatMap(KafkaStreamsTestRunner.testDriverResource[IO])

```

Expand Down Expand Up @@ -125,6 +126,8 @@ def pipeIn(driver: TopologyTestDriver): IO[Unit] =
Next, we'll get out the values from the output topic using a different function from the `KafkaStreamsTestRunner`.

```scala
import cats.effect.unsafe.implicits.global

def pipeOut(driver: TopologyTestDriver): IO[Map[String, Int]] =
KafkaStreamsTestRunner.outputTestTable[IO, CirceCodec, String, Int](driver, "example.output")

Expand Down
5 changes: 2 additions & 3 deletions docs/src/Intro.md
Expand Up @@ -69,8 +69,6 @@ import org.apache.kafka.streams.Topology
import java.util.Properties
import java.time.Duration

implicit val cs = IO.contextShift(ExecutionContext.global)

val props = new Properties //in real code add the desired configuration to this object.

val topology: IO[Topology] = result.to[IO]("example.output") >> IO(sb.build())
Expand All @@ -96,7 +94,7 @@ import compstak.kafkastreams4s.testing.KafkaStreamsTestRunner
import org.apache.kafka.streams.TopologyTestDriver

val driver: Resource[IO, TopologyTestDriver] =
Resource.liftF(topology).flatMap(KafkaStreamsTestRunner.testDriverResource[IO])
Resource.eval(topology).flatMap(KafkaStreamsTestRunner.testDriverResource[IO])

```

Expand Down Expand Up @@ -128,6 +126,7 @@ def pipeIn(driver: TopologyTestDriver): IO[Unit] =
Next, we'll get out the values from the output topic using a different function from the `KafkaStreamsTestRunner`.

```scala mdoc
import cats.effect.unsafe.implicits.global

def pipeOut(driver: TopologyTestDriver): IO[Map[String, Int]] =
KafkaStreamsTestRunner.outputTestTable[IO, CirceCodec, String, Int](driver, "example.output")
Expand Down
@@ -1,13 +1,10 @@
package compstak.kafkastreams4s.testing

import org.apache.kafka.streams.Topology
import cats.effect.ConcurrentEffect
import cats.effect.ContextShift

import scala.util.Random
import cats.effect.Sync
import cats.implicits._
import cats.effect.Timer
import cats.effect.implicits._
import java.time.Duration
import java.util.UUID
Expand Down
@@ -1,39 +1,29 @@
package compstak.kafkastreams4s.tests

import cats.effect.{Blocker, IO, Resource}
import cats.implicits._
import org.http4s.implicits._
import org.http4s.client.asynchttpclient.AsyncHttpClient
import doobie.{ConnectionIO, Transactor}
import doobie.implicits._
import doobie.free.driver.DriverOp.Connect
import doobie.util.ExecutionContexts
import io.circe.Decoder
import io.circe.literal._
import fs2.kafka._
import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Resource}
import cats.syntax.all._
import compstak.circe.debezium.{DebeziumKey, DebeziumValue}
import compstak.http4s.kafka.connect.KafkaConnectMigration
import compstak.kafkastreams4s.circe.CirceSerdes
import compstak.kafkastreams4s.Platform
import compstak.kafkastreams4s.debezium.JoinTables
import org.apache.kafka.streams.StreamsBuilder
import io.circe.Encoder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.{Consumed, KTable, Produced}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import compstak.kafkastreams4s.circe.CirceSerdes
import doobie.implicits._
import doobie.{ConnectionIO, Transactor}
import fs2.kafka._
import io.circe.literal._
import io.circe.{Decoder, Encoder}
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig}
import org.http4s.asynchttpclient.client.AsyncHttpClient
import org.http4s.implicits._

import java.time.Duration
import java.{util => ju}
import org.apache.kafka.common.serialization.Serdes
import cats.effect.ExitCode
import scala.concurrent.duration._

class DebeziumEndToEndTests extends munit.FunSuite {

override val munitTimeout = 3.minutes

implicit val ctx = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)

val kafkaHost = "localhost:9092"
val outputTopic = "output.topic"
val (foo, bar, baz) = ("foo", "bar", "baz")
Expand All @@ -46,13 +36,12 @@ class DebeziumEndToEndTests extends munit.FunSuite {
"org.postgresql.Driver",
s"jdbc:postgresql://localhost:54320/postgres",
username,
password,
Blocker.liftExecutionContext(ExecutionContexts.synchronous)
password
)

def make: Resource[IO, Unit] =
for {
_ <- Resource.liftF(ddl.transact(xa))
_ <- Resource.eval(ddl.transact(xa))
client <- AsyncHttpClient.resource[IO]()
_ <- KafkaConnectMigration[IO](
client,
Expand All @@ -74,9 +63,9 @@ class DebeziumEndToEndTests extends munit.FunSuite {
),
"experiment"
).evalMap(_.migrate)
_ <- Resource.liftF(insertStmt.transact(xa))
_ <- Resource.eval(insertStmt.transact(xa))
// run the kafka streams topology for a minute and then stop it
_ <- Resource.liftF(
_ <- Resource.eval(
(
KafkaStream.run,
IO.sleep(2.minutes)
Expand Down Expand Up @@ -172,7 +161,8 @@ class DebeziumEndToEndTests extends munit.FunSuite {
.withGroupId("group")

def consume: IO[(String, String, String)] =
consumerStream(settings)
KafkaConsumer
.stream(settings)
.evalTap(_.subscribeTo(outputTopic))
.flatMap(c => c.stream)
.take(1)
Expand Down
Expand Up @@ -14,6 +14,7 @@ import compstak.circe.debezium._
import io.circe.JsonObject

import io.circe.Json
import cats.effect.unsafe.implicits.global

class DebeziumTableJoinTests extends munit.FunSuite {
test("STable joinOption should work as expected") {
Expand Down Expand Up @@ -66,7 +67,7 @@ class DebeziumTableJoinTests extends munit.FunSuite {
)

Resource
.liftF(result.to[IO](out) >> IO(sb.build))
.eval(result.to[IO](out) >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "a", inputA: _*) >>
Expand Down
Expand Up @@ -10,6 +10,7 @@ import scala.concurrent.duration._
import compstak.kafkastreams4s.testing.KafkaStreamsTestRunner
import scala.util.Try
import cats.effect.Resource
import cats.effect.unsafe.implicits.global

class STableOpsTest extends munit.FunSuite {

Expand All @@ -35,7 +36,7 @@ class STableOpsTest extends munit.FunSuite {
val result = tableA.join(tableB)(identity)((s, i) => s * i)

Resource
.liftF(result.to[IO](out) >> IO(sb.build))
.eval(result.to[IO](out) >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "a", inputA: _*) >>
Expand All @@ -61,7 +62,7 @@ class STableOpsTest extends munit.FunSuite {
val result = tableA.joinOption(tableB)(s => Try(s.toInt: Integer).toOption)((a, b) => s"$a:$b")

Resource
.liftF(result.to[IO](out) >> IO(sb.build))
.eval(result.to[IO](out) >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "a", inputA: _*) >>
Expand All @@ -87,7 +88,7 @@ class STableOpsTest extends munit.FunSuite {
val result = tableA.leftJoin(tableB)(identity)((s, i) => s * i.getOrElse(1))

Resource
.liftF(result.to[IO](out) >> IO(sb.build))
.eval(result.to[IO](out) >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "a", inputA: _*) >>
Expand All @@ -113,7 +114,7 @@ class STableOpsTest extends munit.FunSuite {
val result = tableA.keyJoin(tableB)((a, b) => s"$a:$b")

Resource
.liftF(result.to[IO](out) >> IO(sb.build))
.eval(result.to[IO](out) >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "a", inputA: _*) >>
Expand All @@ -139,7 +140,7 @@ class STableOpsTest extends munit.FunSuite {
val result = tableA.keyOuterJoin(tableB)(ior => ior.merge)

Resource
.liftF(result.to[IO](out) >> IO(sb.build))
.eval(result.to[IO](out) >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "a", inputA: _*) >>
Expand Down
@@ -1,7 +1,8 @@
package compstak.kafkastreams4s.tests

import cats.effect.{IO, Resource}
import cats.implicits._
import cats.syntax.all._
import cats.effect.unsafe.implicits.global
import compstak.kafkastreams4s.testing.KafkaStreamsTestRunner
import compstak.kafkastreams4s.vulcan.{VulcanCodec, VulcanTable}
import org.apache.kafka.streams.StreamsBuilder
Expand All @@ -15,7 +16,7 @@ class VulcanCodecTest extends munit.FunSuite {
val result = table.mapCodec[VulcanCodec, VulcanCodec]

Resource
.liftF(result.to[IO]("out") >> IO(sb.build))
.eval(result.to[IO]("out") >> IO(sb.build))
.flatMap(topo => KafkaStreamsTestRunner.testDriverResource[IO](topo))
.use(driver =>
KafkaStreamsTestRunner.inputTestTable2[IO, VulcanCodec, VulcanCodec](driver, "origin", input.toList: _*) >>
Expand Down

0 comments on commit 642936f

Please sign in to comment.