Skip to content

Commit

Permalink
Add documentation (#29)
Browse files Browse the repository at this point in the history
* Add documentation

* Scalafmt

* Test scanWith

* Hopefully fix OOM

* More memory
  • Loading branch information
LukaJCB committed Aug 3, 2020
1 parent 6b3f642 commit 0236e61
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 40 deletions.
5 changes: 1 addition & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ _defaults: &defaults
docker_layer_caching: false
working_directory: ~/repo
environment:
# Customize the JVM maximum heap limit
JVM_OPTS: -Xmx3000m
SBT_OPTS: -Xmx2000m
TERM: dumb

workflows:
Expand Down Expand Up @@ -54,7 +51,7 @@ jobs:

- run:
name: Compile
command: cat /dev/null | sbt +validate
command: cat /dev/null | sbt -mem 7500 +validate

- save_cache:
paths:
Expand Down
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# KafkaStreams4s

This project is being updated constantly. If you would like to add KafkaStreams4s to your project, go to https://search.maven.org/search?q=kafka-streams4s, click the version of one of the artifacts and use dependency declaration of your tool of choice.
[![CircleCI](https://circleci.com/gh/compstak/KafkaStreams4s.svg?style=svg)](https://circleci.com/gh/compstak/KafkaStreams4s) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.compstak/kafka-streams4s-core_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.compstak/kafka-streams4s-core_2.13)

KafkaStreams4s is a library for writing Kafka Streams programs using [cats-effect](https://github.com/typelevel/cats-effect).
To use KafkaStreams4s in an existing SBT project with Scala 2.12 or a later version, add the following dependencies to your build.sbt depending on your needs:

```scala
libraryDependencies ++= Seq(
"com.compstak" %% "kafka-streams4s-core" % "<version>",
"com.compstak" %% "kafka-streams4s-circe" % "<version>",
"com.compstak" %% "kafka-streams4s-avro4s" % "<version>",
"com.compstak" %% "kafka-streams4s-debezium" % "<version>",
"com.compstak" %% "kafka-streams4s-shapeless" % "<version>",
"com.compstak" %% "kafka-streams4s-testing" % "<version>" % Test
)
```

To learn more head to the [documentation page](docs/out/Intro.md).

35 changes: 28 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ scalacOptions ++= Seq(

addCommandAlias("fmtAll", ";scalafmt; test:scalafmt; scalafmtSbt")
addCommandAlias("fmtCheck", ";scalafmtCheck; test:scalafmtCheck; scalafmtSbtCheck")
addCommandAlias("validate", ";fmtCheck; test; it:compile")
addCommandAlias("validate", ";fmtCheck; test; it:compile; docs/mdoc")

lazy val commonSettings = Seq(
crossScalaVersions := supportedScalaVersions,
Expand Down Expand Up @@ -118,12 +118,21 @@ lazy val shapeless = (project in file("shapeless"))
lazy val testing = (project in file("testing"))
.configs(IntegrationTest)
.settings(commonSettings)
.settings(noPublishSettings)
.settings(
name := "kafka-streams4s-testing",
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams-test-utils" % KafkaVersion
)
)
.dependsOn(core)

lazy val tests = (project in file("tests"))
.configs(IntegrationTest)
.settings(commonSettings)
.settings(
name := "kafka-streams4s-tests",
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams-test-utils" % KafkaVersion,
"com.github.fd4s" %% "fs2-kafka" % FS2KafkaVersion,
"com.github.fd4s" %% "fs2-kafka" % FS2KafkaVersion % IntegrationTest,
"org.scalameta" %% "munit" % MunitVersion % "test, it",
"com.compstak" %% "kafka-connect-migrate" % KafkaConnectHttp4sVersion % IntegrationTest,
"io.circe" %% "circe-literal" % CirceVersion % IntegrationTest,
Expand All @@ -134,11 +143,23 @@ lazy val testing = (project in file("testing"))
inConfig(IntegrationTest)(ScalafmtPlugin.scalafmtConfigSettings),
testFrameworks += new TestFramework("munit.Framework")
)
.dependsOn(core, circe, avro4s, debezium, shapeless)
.dependsOn(core, circe, avro4s, debezium, shapeless, testing)

lazy val docs = (project in file("documentation"))
.settings(commonSettings)
.settings(noPublishSettings)
.settings(
mdocVariables := Map("VERSION" -> version.value),
mdocIn := new java.io.File("docs/src"),
mdocOut := new java.io.File("docs/out"),
crossScalaVersions := List(scala213)
)
.dependsOn(core, circe, debezium, avro4s, shapeless, testing)
.enablePlugins(MdocPlugin)

lazy val kafkaStreams4s = (project in file("."))
.settings(commonSettings)
.settings(noPublishSettings)
.settings(name := "kafka-streams4s")
.dependsOn(core, circe, debezium, avro4s, shapeless, testing)
.aggregate(core, circe, debezium, avro4s, shapeless, testing)
.dependsOn(core, circe, debezium, avro4s, shapeless, testing, tests, docs)
.aggregate(core, circe, debezium, avro4s, shapeless, testing, tests, docs)
3 changes: 3 additions & 0 deletions core/src/main/scala/compstak/kafkastreams4s/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ object Platform {

streams.start()
}

def run[F[_]: Async](topo: Topology, props: Properties, timeout: Duration): F[Unit] =
streamsResource[F](topo, props, timeout).use(runStreams[F])
}
14 changes: 14 additions & 0 deletions core/src/main/scala/compstak/kafkastreams4s/STable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.apache.kafka.streams.{KeyValue, StreamsBuilder}
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream._
import SerdeHelpers._
import cats.kernel.Monoid

/**
* A Kafka Streams KTable wrapper that abstracts over the codecs used in KTable operations.
Expand Down Expand Up @@ -158,6 +159,19 @@ class STable[C[_]: Codec, K: C, V: C](val toKTable: KTable[K, V]) {
.reduce((acc: V, cur: V) => f(acc, cur), (acc: V, old: V) => acc)
)

def scanWith[K2: C, V2: C](f: (K, V) => (K2, V2))(g: (V2, V2) => V2): STable[C, K2, V2] =
fromKTable(
toKTable
.groupBy({ (k: K, v: V) =>
val (k2, v2) = f(k, v)
KeyValue.pair(k2, v2)
}: KeyValueMapper[K, V, KeyValue[K2, V2]], groupedForCodec[C, K2, V2])
.reduce((acc: V2, cur: V2) => g(acc, cur), (acc: V2, old: V2) => acc)
)

def scanMap[V2: C: Monoid](f: V => V2): STable[C, K, V2] =
scan(Monoid[V2].empty)((v2, v) => Monoid[V2].combine(v2, f(v)))

def map[V2: C](f: V => V2): STable[C, K, V2] =
fromKTable(toKTable.mapValues(((v: V) => f(v)): ValueMapper[V, V2], materializedForCodec[C, K, V2]))

Expand Down
133 changes: 133 additions & 0 deletions docs/out/Intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Introduction

KafkaStreams4s is a library for writing Kafka Streams programs while leveraging the [cats](https://github.com/typelevel/cats) ecosystem.

## Quick example

This example uses the `kafka-streams4s-circe` module, but the code should look similar even if you do not use circe or even JSON.

For this example let's consider a use case where we track movies and movie sales and want to figure out if we can find trends for movie genres.
Modeling this as Scala case classes we write the following:

```scala
import java.util.UUID

case class Movie(title: String, genre: String)
case class Purchase(movieId: UUID, amount: Int, user: String)
```

In order for us to use these case classes with `kafka-streams4s-circe`, we'll need to define `Encoder`s and `Decoder`s for them.

```scala
import io.circe.{Encoder, Decoder}

implicit val movieDecoder: Decoder[Movie] = Decoder.forProduct2("title", "genre")(Movie.apply _)
implicit val movieEncoder: Encoder[Movie] = Encoder.forProduct2("title", "genre")(m => (m.title, m.genre))

implicit val purchaseDecoder: Decoder[Purchase] = Decoder.forProduct3("movieId", "amount", "user")(Purchase.apply _)
implicit val purchaseEncoder: Encoder[Purchase] =
Encoder.forProduct3("movieId", "amount", "user")(p => (p.movieId, p.amount, p.user))
```

We will assume here that there exist two kafka topics `example.movies` and `example.purchases` which emit json records.
Next, we will want to create a `CirceTable` for the two topics we want to use for our programs.
A `CirceTable[K, V]` is a wrapper around a `KTable[K, V]` and includes all the necessary codecs so that Kafka Streams is able to serialize and deserialize our records without having to pass around any codecs explicitly. Simply put, we can create a `CirceTable[K, V]` for any `K` and `V` that have a circe `Encoder` and `Decoder` in scope.


```scala
import compstak.kafkastreams4s.circe.CirceTable
import org.apache.kafka.streams.StreamsBuilder

val sb = new StreamsBuilder

val movies = CirceTable[UUID, Movie](sb, "example.movies")
val purchases = CirceTable[UUID, Purchase](sb, "example.purchases")
```

Now, what we'll want to do is join the two topics, filter out some genres and accumulate the results into a `Map` of genre to number of purchases.

```scala
val withoutOther: CirceTable[UUID, Movie] = movies.filter(_.genre != "Other")
val pairs: CirceTable[UUID, (Purchase, Movie)] =
purchases.join(withoutOther)(purchase => purchase.movieId)((purchase, movie) => (purchase, movie))

val result: CirceTable[String, Int] =
pairs.scanWith { case (id, (purchase, movie)) => (movie.genre, purchase.amount) }(_ + _)

```

Here we first filter out all the movies that are tagged with the `"Other"` genre.
Then we join the movies and purchases topics and lastly we use the `scanWith` operation to select a new KV-pair and then pass a function to aggregate the result.
Now, all that's left is to direct the result into an output topic `example.output` and run the program

```scala
import cats.implicits._
import cats.effect.IO
import compstak.kafkastreams4s.Platform
import org.apache.kafka.streams.Topology
import java.util.Properties
import java.time.Duration

val props = new Properties //in real code add the desired configuration to this object.

val topology: IO[Topology] = result.to[IO]("example.output") >> IO(sb.build())

val main: IO[Unit] =
topology.flatMap(topo => Platform.run[IO](topo, props, Duration.ofSeconds(2)))
```

`compstak.kafkastreams4s.Platform` gives us a function `run` to run Kafka Streams programs and takes a topology, java properties and a timeout after which the stream threads will be shut off.


## Testing your topologies

KafkaStreams4s comes with a testing module that allows us to test our kafka streams programs without even spinning up a kafka cluster.
To start using it include the `kafka-streams4s-testing` module in your `build.sbt`.
It's built upon `kafka-streams-test-utils` and should give us a good amount of confidence in our streams logic.

First we will create a test driver using our topology defined earlier:

```scala
import cats.effect.Resource
import compstak.kafkastreams4s.testing.KafkaStreamsTestRunner
import org.apache.kafka.streams.TopologyTestDriver

val driver: Resource[IO, TopologyTestDriver] =
Resource.liftF(topology).flatMap(KafkaStreamsTestRunner.testDriverResource[IO])

```

Then we'll setup some inputs for our two topics:

```scala
import compstak.kafkastreams4s.circe.CirceCodec

val testMovies = List(
UUID.fromString("150ac164-a4dd-4809-9e2f-fc092edb9d1d") -> Movie("The Godfather", "Crime"),
UUID.fromString("1312c871-dd07-43a7-ae7b-3a74e0c9ce6d") -> Movie("Schindler's List", "Drama"),
UUID.fromString("b6168e9a-a871-4712-a15b-0261edc7c9d2") -> Movie("Being John Malkovich", "Other")
)

val testPurchases = List(
UUID.fromString("faad4c27-39d6-41df-9d97-102dc5b0bc93") -> Purchase(UUID.fromString("b6168e9a-a871-4712-a15b-0261edc7c9d2"), 2, "JohnDoe42"),
UUID.fromString("310a790e-7df4-4cbb-91f8-42bcf998b9e5") -> Purchase(UUID.fromString("150ac164-a4dd-4809-9e2f-fc092edb9d1d"), 1, "MarkB98"),
UUID.fromString("5d00d33e-82c4-418e-b776-46b16bd13508") -> Purchase(UUID.fromString("1312c871-dd07-43a7-ae7b-3a74e0c9ce6d"), 1, "JaneDoe"),
UUID.fromString("0e36dd2f-e3db-4b1e-af45-52f8bc27d65f") -> Purchase(UUID.fromString("150ac164-a4dd-4809-9e2f-fc092edb9d1d"), 3, "NinaD14"),
)

def pipeIn(driver: TopologyTestDriver): IO[Unit] =
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "example.movies", testMovies: _*) >>
KafkaStreamsTestRunner.inputTestTable[IO, CirceCodec](driver, "example.purchases", testPurchases: _*)

```


Next, we'll get out the values from the output topic using a different function from the `KafkaStreamsTestRunner`.

```scala
def pipeOut(driver: TopologyTestDriver): IO[Map[String, Int]] =
KafkaStreamsTestRunner.outputTestTable[IO, CirceCodec, String, Int](driver, "example.output")

driver.use(d => pipeIn(d) >> pipeOut(d)).unsafeRunSync
// res0: Map[String, Int] = Map("Drama" -> 1, "Crime" -> 4)
```

0 comments on commit 0236e61

Please sign in to comment.