Skip to content

Commit

Permalink
Feature/kafka 241 (#107)
Browse files Browse the repository at this point in the history
* replace most scala.reflect usages with java.nio corresponding classes

* bump embeddedKafka version to 2.4.1

add AvroSerdes and deprecate package methods, improve test classes
  • Loading branch information
Francesco Pellegrini committed Mar 18, 2020
1 parent 9e47a32 commit e33d1b6
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 174 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Relies on the [embedded-kafka](https://github.com/embeddedkafka/embedded-kafka)

embedded-kafka-schema-registry is available on Maven Central, compiled for both Scala 2.11 and 2.12.

Currently there's no support for Scala 2.13-Mx as Confluent artifacts are not published for these versions.
Currently there's no support for Scala 2.13 as Confluent artifacts are not published for such version.

Versions match the version of Confluent Schema Registry they're built against.

Expand Down Expand Up @@ -46,11 +46,11 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {

## Utility methods

The `net.manub.embeddedkafka.avro.schemaregistry` package object provides useful implicit converters for testing with Avro and Schema Registry.
~~The `net.manub.embeddedkafka.avro.schemaregistry` package object provides useful implicit converters for testing with Avro and Schema Registry.~~

The implicit Avro serdes have been deprecated. Please use `AvroSerdes` instead.

## Using streams

* For most of the cases have your class extend the `EmbeddedKafkaStreamsAllInOne` trait (from the `net.manub.embeddedkafka.schemaregistry.streams` package). This offers both streams management and easy creation of consumers for asserting resulting messages in output/sink topics.
* If you only want to use the streams management without the test consumers just have the class extend the `EmbeddedKafkaStreams` trait (from the same package mentioned before).
* Build your own `Topology` and use `runStreams` to test it.
* Have a look at the [example test](src/test/scala/net/manub/embeddedkafka/schemaregistry/streams/ExampleKafkaStreamsSpec.scala).
* For most of the cases have your class extend the `EmbeddedKafkaStreams` trait (from the `net.manub.embeddedkafka.schemaregistry.streams` package). This offers both streams management and easy creation of consumers for asserting resulting messages in output/sink topics.
* Use `EmbeddedKafkaStreams.runStreams` and `EmbeddedKafka.withConsumer` and `EmbeddedKafka.withProducer`. This allows you to create your own consumers of custom types as seen in the [example test](src/test/scala/net/manub/embeddedkafka/schemaregistry/streams/ExampleKafkaStreamsSpec.scala).
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sbtrelease.Version

val embeddedKafkaVersion = "2.4.0"
val embeddedKafkaVersion = "2.4.1"
val confluentVersion = "5.4.1"

lazy val publishSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package net.manub.embeddedkafka.schemaregistry

import java.nio.file.{Files, Path}

import net.manub.embeddedkafka.ops.{EmbeddedKafkaOps, RunningEmbeddedKafkaOps}
import net.manub.embeddedkafka.schemaregistry.ops.{
RunningSchemaRegistryOps,
SchemaRegistryOps
}
import net.manub.embeddedkafka.{EmbeddedKafkaSupport, EmbeddedServer, EmbeddedZ}

import scala.reflect.io.Directory

trait EmbeddedKafka
extends EmbeddedKafkaSupport[EmbeddedKafkaConfig]
with EmbeddedKafkaOps[EmbeddedKafkaConfig, EmbeddedKWithSR]
Expand All @@ -26,7 +26,7 @@ trait EmbeddedKafka
override private[embeddedkafka] def withRunningServers[T](
config: EmbeddedKafkaConfig,
actualZkPort: Int,
kafkaLogsDir: Directory
kafkaLogsDir: Path
)(body: EmbeddedKafkaConfig => T): T = {
val broker =
startKafka(
Expand Down Expand Up @@ -69,8 +69,8 @@ object EmbeddedKafka
override def start()(
implicit config: EmbeddedKafkaConfig
): EmbeddedKWithSR = {
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
val kafkaLogsDir = Directory.makeTemp("kafka-logs")
val zkLogsDir = Files.createTempDirectory("zookeeper-logs")
val kafkaLogsDir = Files.createTempDirectory("kafka-logs")

val factory =
EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package net.manub.embeddedkafka.schemaregistry.avro

import io.confluent.kafka.serializers.{
KafkaAvroDeserializer => ConfluentKafkaAvroDeserializer,
KafkaAvroSerializer => ConfluentKafkaAvroSerializer
}
import net.manub.embeddedkafka.schemaregistry.EmbeddedKafka.{
configForSchemaRegistry,
specificAvroReaderConfigForSchemaRegistry
}
import net.manub.embeddedkafka.schemaregistry.EmbeddedKafkaConfig
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.{Serde, Serdes}

import scala.collection.JavaConverters._

object AvroSerdes {

def specific[T <: SpecificRecord](
isKey: Boolean = false,
extraConfig: Map[String, Object] = Map.empty
)(
implicit config: EmbeddedKafkaConfig
): Serde[T] = {
serdeFrom[T](
configForSchemaRegistry ++ extraConfig,
specificAvroReaderConfigForSchemaRegistry ++ extraConfig, //need this to support SpecificRecord
isKey
)
}

def generic(
isKey: Boolean = false,
extraConfig: Map[String, Object] = Map.empty
)(
implicit config: EmbeddedKafkaConfig
): Serde[GenericRecord] = {
serdeFrom[GenericRecord](
configForSchemaRegistry ++ extraConfig,
configForSchemaRegistry ++ extraConfig,
isKey
)
}

private def serdeFrom[T](
serConfig: Map[String, Object],
deserConfig: Map[String, Object],
isKey: Boolean
): Serde[T] = {
val ser = new ConfluentKafkaAvroSerializer
ser.configure(serConfig.asJava, isKey)
val deser = new ConfluentKafkaAvroDeserializer
deser.configure(deserConfig.asJava, isKey)

Serdes.serdeFrom(ser, deser).asInstanceOf[Serde[T]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ object EmbeddedKafkaConfig {
implicit val defaultConfig: EmbeddedKafkaConfig = apply()

def apply(
kafkaPort: Int = 6001,
zooKeeperPort: Int = 6000,
kafkaPort: Int = OriginalEmbeddedKafkaConfig.defaultKafkaPort,
zooKeeperPort: Int = OriginalEmbeddedKafkaConfig.defaultZookeeperPort,
schemaRegistryPort: Int = defaultSchemaRegistryPort,
avroCompatibilityLevel: AvroCompatibilityLevel =
AvroCompatibilityLevel.NONE,
Expand Down
89 changes: 26 additions & 63 deletions src/main/scala/net.manub.embeddedkafka/schemaregistry/package.scala
Original file line number Diff line number Diff line change
@@ -1,84 +1,47 @@
package net.manub.embeddedkafka

import io.confluent.kafka.serializers.{
KafkaAvroDeserializer => ConfluentKafkaAvroDeserializer,
KafkaAvroSerializer => ConfluentKafkaAvroSerializer
}
import net.manub.embeddedkafka.schemaregistry.EmbeddedKafka.{
configForSchemaRegistry,
specificAvroReaderConfigForSchemaRegistry
}
import net.manub.embeddedkafka.schemaregistry.{
EmbeddedKafkaConfig => EmbeddedKafkaSRConfig
}
import net.manub.embeddedkafka.schemaregistry.avro.AvroSerdes
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.{
Deserializer,
Serde,
Serdes,
Serializer
}

import scala.collection.JavaConverters._
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}

package object schemaregistry {
// SpecificRecord
@deprecated("Use AvroSerdes.specific instead", since = "2.4.1")
implicit def specificAvroValueSerde[T <: SpecificRecord](
implicit config: EmbeddedKafkaSRConfig
): Serde[T] = {
serdeFrom[T](
configForSchemaRegistry,
specificAvroReaderConfigForSchemaRegistry, //need this to support SpecificRecord
isKey = false
)
}
implicit config: EmbeddedKafkaConfig
): Serde[T] =
AvroSerdes.specific[T]()

@deprecated("Use AvroSerdes.specific instead", since = "2.4.1")
implicit def specificAvroValueSerializer[T <: SpecificRecord](
implicit config: EmbeddedKafkaSRConfig
): Serializer[T] = {
specificAvroValueSerde[T].serializer
}
implicit config: EmbeddedKafkaConfig
): Serializer[T] =
AvroSerdes.specific[T]().serializer

@deprecated("Use AvroSerdes.specific instead", since = "2.4.1")
implicit def specificAvroValueDeserializer[T <: SpecificRecord](
implicit config: EmbeddedKafkaSRConfig
): Deserializer[T] = {
specificAvroValueSerde[T].deserializer
}
implicit config: EmbeddedKafkaConfig
): Deserializer[T] =
AvroSerdes.specific[T]().deserializer

// GenericRecord
@deprecated("Use AvroSerdes.generic instead", since = "2.4.1")
implicit def genericAvroValueSerde(
implicit config: EmbeddedKafkaSRConfig
): Serde[GenericRecord] = {
serdeFrom[GenericRecord](
configForSchemaRegistry,
configForSchemaRegistry,
isKey = false
)
}
implicit config: EmbeddedKafkaConfig
): Serde[GenericRecord] =
AvroSerdes.generic()

@deprecated("Use AvroSerdes.generic instead", since = "2.4.1")
implicit def genericAvroValueSerializer(
implicit config: EmbeddedKafkaSRConfig
): Serializer[GenericRecord] = {
genericAvroValueSerde.serializer
}
implicit config: EmbeddedKafkaConfig
): Serializer[GenericRecord] =
AvroSerdes.generic().serializer

@deprecated("Use AvroSerdes.generic instead", since = "2.4.1")
implicit def genericAvroValueDeserializer(
implicit config: EmbeddedKafkaSRConfig
): Deserializer[GenericRecord] = {
genericAvroValueSerde.deserializer
}

private def serdeFrom[T](
serConfig: Map[String, Object],
deserConfig: Map[String, Object],
isKey: Boolean
) = {
val ser = new ConfluentKafkaAvroSerializer
ser.configure(serConfig.asJava, isKey)
val deser = new ConfluentKafkaAvroDeserializer
deser.configure(deserConfig.asJava, isKey)
implicit config: EmbeddedKafkaConfig
): Deserializer[GenericRecord] =
AvroSerdes.generic().deserializer

Serdes.serdeFrom(ser, deser).asInstanceOf[Serde[T]]
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package net.manub.embeddedkafka.schemaregistry

import java.nio.file.Path

import io.confluent.kafka.schemaregistry.RestApp
import kafka.server.KafkaServer
import net.manub.embeddedkafka.{
Expand Down Expand Up @@ -27,7 +29,7 @@ case class EmbeddedKWithSR(
factory: Option[EmbeddedZ],
broker: KafkaServer,
app: EmbeddedSR,
logsDirs: Directory
logsDirs: Path
)(implicit config: EmbeddedKafkaConfig)
extends EmbeddedServerWithKafka {
override def stop(clearLogs: Boolean): Unit = {
Expand All @@ -38,6 +40,6 @@ case class EmbeddedKWithSR(

factory.foreach(_.stop(clearLogs))

if (clearLogs) logsDirs.deleteRecursively()
if (clearLogs) Directory(logsDirs.toFile).deleteRecursively
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ trait EmbeddedKafkaStreams
override protected[embeddedkafka] val streamsConfig =
new EmbeddedStreamsConfigImpl
}

object EmbeddedKafkaStreams extends EmbeddedKafkaStreams

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import net.manub.embeddedkafka.schemaregistry.EmbeddedKafkaSpecSupport.{
NotAvailable
}

import scala.reflect.io.Directory
import java.nio.file.Files

class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
"the EmbeddedKafka object" when {
Expand Down Expand Up @@ -80,8 +80,10 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig()

EmbeddedKafka.startZooKeeper(Directory.makeTemp("zookeeper-test-logs"))
EmbeddedKafka.startKafka(Directory.makeTemp("kafka-test-logs"))
EmbeddedKafka.startZooKeeper(
Files.createTempDirectory("zookeeper-test-logs")
)
EmbeddedKafka.startKafka(Files.createTempDirectory("kafka-test-logs"))
EmbeddedKafka.startSchemaRegistry

EmbeddedKafka.isRunning shouldBe true
Expand All @@ -93,8 +95,10 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig()

EmbeddedKafka.startZooKeeper(Directory.makeTemp("zookeeper-test-logs"))
EmbeddedKafka.startKafka(Directory.makeTemp("kafka-test-logs"))
EmbeddedKafka.startZooKeeper(
Files.createTempDirectory("zookeeper-test-logs")
)
EmbeddedKafka.startKafka(Files.createTempDirectory("kafka-test-logs"))
EmbeddedKafka.isRunning shouldBe false
EmbeddedKafka.stop()
EmbeddedKafka.isRunning shouldBe false
Expand Down

0 comments on commit e33d1b6

Please sign in to comment.