From 5f24dd25ae00ab900a5251332d5a07808d4e1efc Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Fri, 3 Dec 2021 13:02:19 -0500 Subject: [PATCH] refactor avro registry serdes --- src/main/resources/flink-runner.conf | 3 +- .../flinkrunner/model/FlinkConfig.scala | 4 + .../flinkrunner/model/SinkConfig.scala | 5 + .../flinkrunner/model/SourceConfig.scala | 5 + ...stryKafkaRecordDeserializationSchema.scala | 105 +++++++-------- ...gistryKafkaRecordSerializationSchema.scala | 124 ++++++++++-------- .../epiphanous/flinkrunner/model/MyADT.scala | 28 ---- .../flinkrunner/model/MyAvroADT.scala | 120 +++++++++++++++++ .../flinkrunner/model/NoJobFactory.scala | 10 ++ ...ryKafkaRecordSerializationSchemaTest.scala | 109 +++++++++++++++ ...ConfluentAvroSerializationSchemaTest.scala | 14 -- ...gistryKafkaRecordSerializationSchema.scala | 69 ++++++++++ ...yADTConfluentAvroSerializationSchema.scala | 8 -- 13 files changed, 441 insertions(+), 163 deletions(-) delete mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/MyADT.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/NoJobFactory.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchemaTest.scala delete mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroSerializationSchemaTest.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroRegistryKafkaRecordSerializationSchema.scala delete mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroSerializationSchema.scala diff --git a/src/main/resources/flink-runner.conf b/src/main/resources/flink-runner.conf index f4441cf..aeb382a 100644 --- a/src/main/resources/flink-runner.conf +++ b/src/main/resources/flink-runner.conf @@ -28,4 +28,5 @@ state { backend = rocksdb } max.lateness = 5m -jobs = {} \ No newline at end of file +watermark.strategy = "bounded lateness" +jobs = {} diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala index 72d5685..07cc530 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala @@ -180,6 +180,10 @@ class FlinkConfig[ADT <: FlinkEvent]( def getBucketAssigner[E <: ADT](name: String) = factory.getBucketAssigner[E](name, this) + @deprecated( + "Use the ConfluentAvroRegistryKafkaRecordSerialization and ...Deserialization classes instead", + "4.0.0" + ) def getAvroCoder(name: String) = factory.getAvroCoder(name, this) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala index f235cf4..68b0b39 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala @@ -1,7 +1,9 @@ package io.epiphanous.flinkrunner.model +import com.google.common.collect.Maps import io.epiphanous.flinkrunner.model.FlinkConnectorName._ +import java.util import java.util.Properties sealed trait SinkConfig { @@ -12,6 +14,9 @@ sealed trait SinkConfig { def label: String = s"$connector/$name" def properties: Properties + + def propertiesMap: util.HashMap[String, String] = + Maps.newHashMap(Maps.fromProperties(properties)) } object SinkConfig { diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala index 85d081b..6c68c0d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala @@ -1,7 +1,9 @@ package io.epiphanous.flinkrunner.model +import com.google.common.collect.Maps import io.epiphanous.flinkrunner.model.FlinkConnectorName._ +import java.util import java.util.Properties import scala.concurrent.duration.DurationInt import scala.util.Try @@ -18,6 +20,9 @@ sealed trait SourceConfig { def maxAllowedLateness: Long def properties: Properties + + def propertiesMap: util.HashMap[String, String] = + Maps.newHashMap(Maps.fromProperties(properties)) } object SourceConfig { diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala index 6e2ac0b..e9b8cf6 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala @@ -1,27 +1,20 @@ package io.epiphanous.flinkrunner.serde import com.typesafe.scalalogging.LazyLogging -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient -import io.confluent.kafka.serializers.{ - KafkaAvroDeserializer, - KafkaAvroDeserializerConfig -} +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.serializers.KafkaAvroDeserializer import io.epiphanous.flinkrunner.model.{ FlinkConfig, + FlinkConnectorName, FlinkEvent, KafkaSourceConfig } -import io.epiphanous.flinkrunner.serde.ConfluentAvroRegistryKafkaRecordDeserializationSchema.DEFAULT_CACHE_CAPACITY -import org.apache.avro.specific.SpecificRecord import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema import org.apache.flink.util.Collector import org.apache.kafka.clients.consumer.ConsumerRecord import java.util -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.mutable /** * A schema to deserialize bytes from kafka into an ADT event using a @@ -45,68 +38,64 @@ abstract class ConfluentAvroRegistryKafkaRecordDeserializationSchema[ ) extends KafkaRecordDeserializationSchema[E] with LazyLogging { - val sourceConfig: KafkaSourceConfig = - config.getSourceConfig(sourceName).asInstanceOf[KafkaSourceConfig] - - val topic: String = sourceConfig.topic + val sourceConfig: KafkaSourceConfig = { + val sc = config.getSourceConfig(sourceName) + if (sc.connector != FlinkConnectorName.Kafka) + throw new RuntimeException( + s"Requested source $sourceName is not a kafka source" + ) + sc.asInstanceOf[KafkaSourceConfig] + } - val url: String = - sourceConfig.properties.getProperty("schema.registry.url") - val cacheCapacity: Int = sourceConfig.properties - .getProperty("schema.registry.cache.capacity", DEFAULT_CACHE_CAPACITY) - .toInt - val useSpecificAvroReader: Boolean = sourceConfig.properties - .getProperty("use.specific.avro.reader", "true") - .toBoolean - val useLogicalTypes: Boolean = sourceConfig.properties - .getProperty("use.logical.type.converters", "true") - .toBoolean + val schemaRegistryProps: util.HashMap[String, String] = + sourceConfig.propertiesMap - /** create deserializer config */ - val deserializerConfig: util.Map[String, Boolean] = Map( - KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> useSpecificAvroReader, - KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG -> useLogicalTypes - ).asJava + val topic: String = sourceConfig.topic - /** our schema registry client */ - val schemaRegistryClient = - new CachedSchemaRegistryClient(url, cacheCapacity) + /** + * Implementing subclasses must provide an instance of a schema registry + * client to use, for instance a CachedSchemaRegistryClient + * or a MockSchemaRegistryClient for testing. + */ + def schemaRegistryClient: SchemaRegistryClient - /** map to store the value, and optionally, key deserializers */ - val deserializers: mutable.Map[String, KafkaAvroDeserializer] = - mutable.Map( - "value" -> new KafkaAvroDeserializer( - schemaRegistryClient, - deserializerConfig - ) - ) + val valueDeserializer = new KafkaAvroDeserializer( + schemaRegistryClient, + schemaRegistryProps + ) - /** add the key deserializer if needed */ - if (sourceConfig.isKeyed) { - val keyDeserializer = new KafkaAvroDeserializer(schemaRegistryClient) - keyDeserializer.configure(deserializerConfig, true) - deserializers += ("key" -> keyDeserializer) - } + val keyDeserializer: Option[KafkaAvroDeserializer] = + if (sourceConfig.isKeyed) { + val ks = new KafkaAvroDeserializer(schemaRegistryClient) + ks.configure(schemaRegistryProps, true) + Some(ks) + } else None /** - * Convert a kafka consumer record instance into an instance of our - * produced event type. Must be defined by implementing classes. - * @param record - * a kafka consumer record + * Convert a deserialized key/value pair of objects into an instance of + * the flink runner ADT. This method must be implemented by subclasses. + * + * The key and value are passed as AnyRefs, so implementing subclasses + * will need to pattern match. + * + * @param key + * an optional deserialized key object + * @param value + * a deserialized value object * @return * an instance of the flink runner ADT */ - def fromConsumerRecord( - record: ConsumerRecord[Array[Byte], Array[Byte]]): E + def fromKeyValue(key: Option[AnyRef], value: AnyRef): E override def deserialize( record: ConsumerRecord[Array[Byte], Array[Byte]], - out: Collector[E]): Unit = fromConsumerRecord(record) + out: Collector[E]): Unit = { + val key = + keyDeserializer.map(ds => ds.deserialize(topic, record.key())) + val value = valueDeserializer.deserialize(topic, record.value()) + if (Option(value).nonEmpty) out.collect(fromKeyValue(key, value)) + } override def getProducedType: TypeInformation[E] = TypeInformation.of(new TypeHint[E] {}) } - -object ConfluentAvroRegistryKafkaRecordDeserializationSchema { - val DEFAULT_CACHE_CAPACITY = "1000" -} diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchema.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchema.scala index 120a2c0..f39007a 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchema.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchema.scala @@ -1,25 +1,20 @@ package io.epiphanous.flinkrunner.serde import com.typesafe.scalalogging.LazyLogging -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient -import io.confluent.kafka.serializers.{ - KafkaAvroSerializer, - KafkaAvroSerializerConfig -} +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.serializers.KafkaAvroSerializer import io.epiphanous.flinkrunner.model.{ FlinkConfig, + FlinkConnectorName, FlinkEvent, KafkaSinkConfig } -import io.epiphanous.flinkrunner.serde.ConfluentAvroRegistryKafkaRecordSerializationSchema.DEFAULT_CACHE_CAPACITY import org.apache.avro.specific.SpecificRecord import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema import org.apache.kafka.clients.producer.ProducerRecord import java.{lang, util} -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.mutable /** * A schema to serialize an ADT event using a confluent avro schema @@ -44,66 +39,87 @@ abstract class ConfluentAvroRegistryKafkaRecordSerializationSchema[ ) extends KafkaRecordSerializationSchema[E] with LazyLogging { - val sinkConfig: KafkaSinkConfig = - config.getSourceConfig(sinkName).asInstanceOf[KafkaSinkConfig] - - val url: String = - sinkConfig.properties.getProperty("schema.registry.url") - val cacheCapacity: Int = sinkConfig.properties - .getProperty("schema.registry.cache.capacity", DEFAULT_CACHE_CAPACITY) - .toInt - val removeJavaProps: Boolean = sinkConfig.properties - .getProperty("serializer.remove.java.props", "true") - .toBoolean - val useLogicalTypes: Boolean = sinkConfig.properties - .getProperty("serializer.use.logical.type.converters", "true") - .toBoolean - - /** create serializer config */ - val serializerConfig: util.Map[String, Boolean] = Map( - KafkaAvroSerializerConfig.AVRO_REMOVE_JAVA_PROPS_CONFIG -> removeJavaProps, - KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG -> useLogicalTypes - ).asJava + val sinkConfig: KafkaSinkConfig = { + val sc = config.getSinkConfig(sinkName) + if (sc.connector != FlinkConnectorName.Kafka) + throw new RuntimeException( + s"Requested sink $sinkName is not a kafka sink" + ) + sc.asInstanceOf[KafkaSinkConfig] + } - /** our schema registry client */ - val schemaRegistryClient = - new CachedSchemaRegistryClient(url, cacheCapacity) + val schemaRegistryProps: util.HashMap[String, String] = + sinkConfig.propertiesMap /** map to store the value, and optionally, key serializers */ - val serializers: mutable.Map[String, KafkaAvroSerializer] = - mutable.Map( - "value" -> new KafkaAvroSerializer( - schemaRegistryClient, - serializerConfig - ) - ) + val valueSerializer = new KafkaAvroSerializer( + schemaRegistryClient, + schemaRegistryProps + ) /** add the key serializer if needed */ - if (sinkConfig.isKeyed) { - val keySerializer = new KafkaAvroSerializer(schemaRegistryClient) - keySerializer.configure(serializerConfig, true) - serializers += ("key" -> keySerializer) - } + val keySerializer: Option[KafkaAvroSerializer] = + if (sinkConfig.isKeyed) { + val ks = new KafkaAvroSerializer(schemaRegistryClient) + ks.configure(schemaRegistryProps, true) + Some(ks) + } else None + + val topic: String = sinkConfig.topic + + /** + * A helper method to serialize an arbitary key/value pair. This should + * be used by subclasses that implement the [[toKeyValue()]] method. + * + * @param key + * the key + * @param value + * the value + * @tparam K + * the type of key + * @tparam V + * the type of value + * @return + * a tuple of byte arrays (with the key optional) + */ +// def kvSerialize[K, V](key: K, value: V): (Array[Byte], Array[Byte]) = { +// ( +// keySerializer.map(s => s.serialize(topic, key)).orNull, +// valueSerializer.serialize(topic, value) +// ) +// } /** - * Convert an element into a producer record of byte arrays. Must be - * defined by implementing classes. + * Implementing subclasses must provide an instance of a schema registry + * client to use, for instance a CachedSchemaRegistryClient + * or a MockSchemaRegistryClient for testing. + */ + def schemaRegistryClient: SchemaRegistryClient + + /** + * Convert a flink runner ADT instance into a key/value pair of objects + * to serialize into a kafka message. This must be defined by + * implementing subclasses. + * + * The purpose of this method is to decouple the structure of the flink + * runner ADT from the avro schemas of the underlying kafka messages. + * * @param element * an instance of the flinkrunner ADT * @return - * ProducerRecord of bytes + * (Option[AnyRef], AnyRef) */ - def toProducerRecord( - element: E): ProducerRecord[Array[Byte], Array[Byte]] + def toKeyValue(element: E): (Option[AnyRef], AnyRef) override def serialize( element: E, context: KafkaRecordSerializationSchema.KafkaSinkContext, - timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = - toProducerRecord(element) - -} + timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { + val (k, v) = toKeyValue(element) + val key = + keySerializer.flatMap(ks => k.map(kk => ks.serialize(topic, kk))) + val value = valueSerializer.serialize(topic, v) + new ProducerRecord(topic, null, element.$timestamp, key.orNull, value) + } -object ConfluentAvroRegistryKafkaRecordSerializationSchema { - val DEFAULT_CACHE_CAPACITY = "1000" } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MyADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MyADT.scala deleted file mode 100644 index ec1383e..0000000 --- a/src/test/scala/io/epiphanous/flinkrunner/model/MyADT.scala +++ /dev/null @@ -1,28 +0,0 @@ -package io.epiphanous.flinkrunner.model - -import java.time.Instant -import java.util.UUID - -sealed trait MyADT extends FlinkEvent - -case class A( - id: String = UUID.randomUUID().toString, - a: String = "A", - value: Int = 0, - modified: Instant) - extends MyADT { - override def $id: String = id - override def $key: String = a - override def $timestamp: Long = modified.toEpochMilli -} - -case class B( - id: String = UUID.randomUUID().toString, - b: String = "B", - value: Double = 0d, - modified: Instant) - extends MyADT { - override def $id: String = id - override def $key: String = b - override def $timestamp: Long = modified.toEpochMilli -} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala new file mode 100644 index 0000000..99dce01 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala @@ -0,0 +1,120 @@ +package io.epiphanous.flinkrunner.model + +import org.apache.avro.specific.SpecificRecord +import org.apache.avro.{AvroRuntimeException, Schema} + +import java.time.Instant + +sealed trait MyAvroADT extends FlinkEvent + +case class AWrapper(value: ARecord) extends MyAvroADT { + override val $id: String = value.a0 + override val $key: String = $id + override val $timestamp: Long = value.a3.toEpochMilli +} + +case class BWrapper(value: BRecord) extends MyAvroADT { + override val $id: String = value.b0 + override val $key: String = $id + override val $timestamp: Long = value.b3.toEpochMilli +} + +case class ARecord( + var a0: String, + var a1: Int, + var a2: Double, + var a3: Instant) + extends SpecificRecord { + override def put(i: Int, v: Any): Unit = { + (i, v) match { + case (0, x: String) => this.a0 = x + case (1, x: Int) => this.a1 = x + case (2, x: Double) => this.a2 = x + case (3, x: Long) => this.a3 = Instant.ofEpochMilli(x) + case _ => + if (i < 0 || i > 3) new AvroRuntimeException("Bad index") + else new AvroRuntimeException("Bad value") + } + } + + override def get(i: Int): AnyRef = i match { + case 0 => a0.asInstanceOf[AnyRef] + case 1 => a1.asInstanceOf[AnyRef] + case 2 => a2.asInstanceOf[AnyRef] + case 3 => a3.toEpochMilli.asInstanceOf[AnyRef] + case _ => new AvroRuntimeException("Bad index") + } + + override def getSchema: Schema = ARecord.SCHEMA$ +} +object ARecord { + val schemaString: String = + """ + |{ + | "type": "record", + | "name": "ARecord", + | "namespace": "io.epiphanous.flinkrunner.model", + | "fields": [ + | { "name": "a0", "type": "string" }, + | { "name": "a1", "type": "int" }, + | { "name": "a2", "type": "double" }, + | { "name": "a3", "type": "long", "logicalType": "time-millis" } + | ] + |}""".stripMargin + val SCHEMA$ : Schema = new Schema.Parser().parse(schemaString) +} + +case class BRecord( + var b0: String, + var b1: Option[Int], + var b2: Option[Double], + var b3: Instant) + extends SpecificRecord { + override def put(i: Int, v: Any): Unit = { + (i, v) match { + case (0, x: String) => this.b0 = x + case (1, x: Int) => this.b1 = Some(x) + case (1, _) => this.b1 = null + case (2, x: Double) => this.b2 = Some(x) + case (2, _) => this.b2 = null + case (3, x: Long) => this.b3 = Instant.ofEpochMilli(x) + case _ => + if (i < 0 || i > 3) new AvroRuntimeException("Bad index") + else new AvroRuntimeException("Bad value") + } + } + + override def get(i: Int): AnyRef = i match { + case 0 => b0.asInstanceOf[AnyRef] + case 1 => + (b1 match { + case Some(x) => x + case None => null + }).asInstanceOf[AnyRef] + case 2 => + (b2 match { + case Some(x) => x + case None => null + }).asInstanceOf[AnyRef] + case 3 => b3.toEpochMilli.asInstanceOf[AnyRef] + case _ => new AvroRuntimeException("Bad index") + } + + override def getSchema: Schema = BRecord.SCHEMA$ +} +object BRecord { + val schemaString: String = + """ + |{ + | "type": "record", + | "name": "BRecord", + | "namespace": "io.epiphanous.flinkrunner.model", + | "fields": [ + | { "name": "b0", "type": "string" }, + | { "name": "b1", "type": ["null", "int"] }, + | { "name": "b2", "type": ["null", "double"] }, + | { "name": "b3", "type": "long", "logicalType": "time-millis" } + | ] + |}""".stripMargin + val SCHEMA$ : Schema = new Schema.Parser().parse(schemaString) +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/NoJobFactory.scala b/src/test/scala/io/epiphanous/flinkrunner/model/NoJobFactory.scala new file mode 100644 index 0000000..e9fbac1 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/NoJobFactory.scala @@ -0,0 +1,10 @@ +package io.epiphanous.flinkrunner.model + +import io.epiphanous.flinkrunner.flink.BaseFlinkJob +import io.epiphanous.flinkrunner.{FlinkRunner, FlinkRunnerFactory} + +class NoJobFactory[ADT <: FlinkEvent] extends FlinkRunnerFactory[ADT] { + override def getJobInstance[DS, OUT <: ADT]( + name: String, + runner: FlinkRunner[ADT]): BaseFlinkJob[DS, OUT, ADT] = ??? +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchemaTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchemaTest.scala new file mode 100644 index 0000000..f597218 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordSerializationSchemaTest.scala @@ -0,0 +1,109 @@ +package io.epiphanous.flinkrunner.serde + +import io.confluent.kafka.schemaregistry.client.SchemaMetadata +import io.epiphanous.flinkrunner.UnitSpec +import io.epiphanous.flinkrunner.model._ +import org.apache.avro.Schema +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.EncoderFactory + +import java.io.{ByteArrayOutputStream, DataOutputStream} +import java.time.Instant + +class ConfluentAvroRegistryKafkaRecordSerializationSchemaTest + extends UnitSpec { + val factory = new NoJobFactory[MyAvroADT] + val optConfig: String = + s""" + |sinks { + | test { + | connector = kafka + | topic = test + | isKeyed = true + | config { + | schema.registry.url = "mock://test" + | avro.use.logical.type.converters = true + | avro.remove.java.properties = true + | value.subject.name.strategy = io.confluent.kafka.serializers.subject.RecordNameStrategy + | } + | } + |} + |""".stripMargin + val config = new FlinkConfig[MyAvroADT]( + Array.empty[String], + factory, + Map.empty, + Some(optConfig) + ) + val serde = + new MyADTConfluentAvroRegistryKafkaRecordSerializationSchema[ + MyAvroADT]( + "test", // sink name must match this + config + ) + + // helper to return the class name of the object passed in (without a $ at the end) + def className[T](obj: T): String = { + obj.getClass.getName match { + case s if s.endsWith("$") => s.substring(0, s.length - 1) + case s => s + } + } + + // printout a byte array with a prefix + def showBytes(prefix: String, bytes: Array[Byte]): Unit = + println(s"$prefix: ${bytes.mkString("Array(", ", ", ")")}") + + // mimic the binary encoding used for schema registry encoded objects + def binaryEncode[T](obj: T, schemaInfo: SchemaMetadata): Array[Byte] = { + val schema = new Schema.Parser().parse(schemaInfo.getSchema) + val schemaId = schemaInfo.getId + val baos = new ByteArrayOutputStream() + baos.write(0) + val dos = new DataOutputStream(baos) + dos.writeInt(schemaId) + dos.flush() + val encoder = EncoderFactory.get().binaryEncoder(baos, null) + val datumWriter = new GenericDatumWriter[T](schema) + datumWriter.write(obj, encoder) + encoder.flush() + val bytes = baos.toByteArray + baos.close() + bytes + } + + // some test fixtures + val aRecord: ARecord = ARecord("a123", 17, 32.2, Instant.now()) + val aWrapper: AWrapper = AWrapper(aRecord) + val aName: String = className(aRecord) + val keySchemaInfo: SchemaMetadata = + serde.schemaRegistryClient.getLatestSchemaMetadata("test-key") + val aSchemaInfo: SchemaMetadata = + serde.schemaRegistryClient.getLatestSchemaMetadata( + aName + ) + + behavior of "ConfluentAvroSerializationSchema" + + it should "find the right schema for a key" in { + keySchemaInfo.getSchema shouldEqual "\"string\"" + } + + it should "find the right schema for a class" in { + aSchemaInfo.getSchema shouldEqual ARecord.SCHEMA$.toString + } + + it should "serialize to a producer record" in { + val (aKey, aValue) = serde.toKeyValue(aWrapper) + val aWrapperKeyExpectedBytes: Option[Array[Byte]] = + aKey.map(k => binaryEncode(k, keySchemaInfo)) + val aWrapperValueExpectedBytes: Array[Byte] = + binaryEncode(aValue, aSchemaInfo) + val result = serde.serialize(aWrapper, null, aWrapper.$timestamp) + result.key() shouldEqual aWrapperKeyExpectedBytes.value + result.value() shouldEqual aWrapperValueExpectedBytes + result.timestamp() shouldEqual aWrapper.$timestamp + result.topic() shouldEqual serde.topic + } + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroSerializationSchemaTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroSerializationSchemaTest.scala deleted file mode 100644 index dbe4e54..0000000 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroSerializationSchemaTest.scala +++ /dev/null @@ -1,14 +0,0 @@ -package io.epiphanous.flinkrunner.serde - -import io.epiphanous.flinkrunner.UnitSpec -import io.epiphanous.flinkrunner.model.FlinkEvent - -class ConfluentAvroSerializationSchemaTest extends UnitSpec { - -// val ss = new ConfluentAvroSerializationSchema() - - behavior of "ConfluentAvroSerializationSchema" - - it should "" - -} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroRegistryKafkaRecordSerializationSchema.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroRegistryKafkaRecordSerializationSchema.scala new file mode 100644 index 0000000..9b2acc4 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroRegistryKafkaRecordSerializationSchema.scala @@ -0,0 +1,69 @@ +package io.epiphanous.flinkrunner.serde + +import io.confluent.kafka.schemaregistry.avro.AvroSchema +import io.confluent.kafka.schemaregistry.client.{ + MockSchemaRegistryClient, + SchemaRegistryClient +} +import io.epiphanous.flinkrunner.model.{ + ARecord, + AWrapper, + BRecord, + BWrapper, + FlinkConfig, + MyAvroADT +} +import org.apache.avro.specific.SpecificRecord + +class MyADTConfluentAvroRegistryKafkaRecordSerializationSchema[ + E <: MyAvroADT](name: String, config: FlinkConfig[MyAvroADT]) + extends ConfluentAvroRegistryKafkaRecordSerializationSchema[ + E, + MyAvroADT]( + name, + config + ) { + + /** + * Implementing subclasses must provide an instance of a schema registry + * client to use, for instance a CachedSchemaRegistryClient + * or a MockSchemaRegistryClient for testing. + */ + override val schemaRegistryClient: SchemaRegistryClient = + new MockSchemaRegistryClient() + + // for testing purposes + val stringSchema: AvroSchema = new AvroSchema("""{"type":"string"}""") + val aRecordName: String = + ARecord.getClass.getCanonicalName.replaceAll("\\$$", "") + val bRecordName: String = + BRecord.getClass.getCanonicalName.replaceAll("\\$$", "") + schemaRegistryClient.register( + s"test-key", + stringSchema + ) + schemaRegistryClient.register( + aRecordName, + new AvroSchema(ARecord.SCHEMA$) + ) + schemaRegistryClient.register( + bRecordName, + new AvroSchema(BRecord.SCHEMA$) + ) + + /** + * Map a flinkrunner ADT instance into a key/value pair to serialize into + * kafka + * @param element + * an instance of the flinkrunner ADT + * @return + * (Option[AnyRef], AnyRef) + */ + override def toKeyValue(element: E): (Option[String], SpecificRecord) = + element match { + case a: AWrapper => + (Some(a.$id), a.value) + case b: BWrapper => + (Some(b.$id), b.value) + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroSerializationSchema.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroSerializationSchema.scala deleted file mode 100644 index 6f0446d..0000000 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/MyADTConfluentAvroSerializationSchema.scala +++ /dev/null @@ -1,8 +0,0 @@ -package io.epiphanous.flinkrunner.serde - -import io.epiphanous.flinkrunner.model.{FlinkConfig, MyADT} - -class MyADTConfluentAvroSerializationSchema( - name: String, - config: FlinkConfig[MyADT]) -// extends ConfluentAvroSerializationSchema {}