Skip to content

Commit

Permalink
refactor avro registry serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Dec 3, 2021
1 parent 3ffd924 commit 5f24dd2
Show file tree
Hide file tree
Showing 13 changed files with 441 additions and 163 deletions.
3 changes: 2 additions & 1 deletion src/main/resources/flink-runner.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ state {
backend = rocksdb
}
max.lateness = 5m
jobs = {}
watermark.strategy = "bounded lateness"
jobs = {}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class FlinkConfig[ADT <: FlinkEvent](
def getBucketAssigner[E <: ADT](name: String) =
factory.getBucketAssigner[E](name, this)

@deprecated(
"Use the ConfluentAvroRegistryKafkaRecordSerialization and ...Deserialization classes instead",
"4.0.0"
)
def getAvroCoder(name: String) =
factory.getAvroCoder(name, this)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.epiphanous.flinkrunner.model

import com.google.common.collect.Maps
import io.epiphanous.flinkrunner.model.FlinkConnectorName._

import java.util
import java.util.Properties

sealed trait SinkConfig {
Expand All @@ -12,6 +14,9 @@ sealed trait SinkConfig {
def label: String = s"$connector/$name"

def properties: Properties

def propertiesMap: util.HashMap[String, String] =
Maps.newHashMap(Maps.fromProperties(properties))
}

object SinkConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.epiphanous.flinkrunner.model

import com.google.common.collect.Maps
import io.epiphanous.flinkrunner.model.FlinkConnectorName._

import java.util
import java.util.Properties
import scala.concurrent.duration.DurationInt
import scala.util.Try
Expand All @@ -18,6 +20,9 @@ sealed trait SourceConfig {
def maxAllowedLateness: Long

def properties: Properties

def propertiesMap: util.HashMap[String, String] =
Maps.newHashMap(Maps.fromProperties(properties))
}

object SourceConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.{
KafkaAvroDeserializer,
KafkaAvroDeserializerConfig
}
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.epiphanous.flinkrunner.model.{
FlinkConfig,
FlinkConnectorName,
FlinkEvent,
KafkaSourceConfig
}
import io.epiphanous.flinkrunner.serde.ConfluentAvroRegistryKafkaRecordDeserializationSchema.DEFAULT_CACHE_CAPACITY
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord

import java.util
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.mutable

/**
* A schema to deserialize bytes from kafka into an ADT event using a
Expand All @@ -45,68 +38,64 @@ abstract class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
) extends KafkaRecordDeserializationSchema[E]
with LazyLogging {

val sourceConfig: KafkaSourceConfig =
config.getSourceConfig(sourceName).asInstanceOf[KafkaSourceConfig]

val topic: String = sourceConfig.topic
val sourceConfig: KafkaSourceConfig = {
val sc = config.getSourceConfig(sourceName)
if (sc.connector != FlinkConnectorName.Kafka)
throw new RuntimeException(
s"Requested source $sourceName is not a kafka source"
)
sc.asInstanceOf[KafkaSourceConfig]
}

val url: String =
sourceConfig.properties.getProperty("schema.registry.url")
val cacheCapacity: Int = sourceConfig.properties
.getProperty("schema.registry.cache.capacity", DEFAULT_CACHE_CAPACITY)
.toInt
val useSpecificAvroReader: Boolean = sourceConfig.properties
.getProperty("use.specific.avro.reader", "true")
.toBoolean
val useLogicalTypes: Boolean = sourceConfig.properties
.getProperty("use.logical.type.converters", "true")
.toBoolean
val schemaRegistryProps: util.HashMap[String, String] =
sourceConfig.propertiesMap

/** create deserializer config */
val deserializerConfig: util.Map[String, Boolean] = Map(
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> useSpecificAvroReader,
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG -> useLogicalTypes
).asJava
val topic: String = sourceConfig.topic

/** our schema registry client */
val schemaRegistryClient =
new CachedSchemaRegistryClient(url, cacheCapacity)
/**
* Implementing subclasses must provide an instance of a schema registry
* client to use, for instance a <code>CachedSchemaRegistryClient</code>
* or a <code>MockSchemaRegistryClient</code> for testing.
*/
def schemaRegistryClient: SchemaRegistryClient

/** map to store the value, and optionally, key deserializers */
val deserializers: mutable.Map[String, KafkaAvroDeserializer] =
mutable.Map(
"value" -> new KafkaAvroDeserializer(
schemaRegistryClient,
deserializerConfig
)
)
val valueDeserializer = new KafkaAvroDeserializer(
schemaRegistryClient,
schemaRegistryProps
)

/** add the key deserializer if needed */
if (sourceConfig.isKeyed) {
val keyDeserializer = new KafkaAvroDeserializer(schemaRegistryClient)
keyDeserializer.configure(deserializerConfig, true)
deserializers += ("key" -> keyDeserializer)
}
val keyDeserializer: Option[KafkaAvroDeserializer] =
if (sourceConfig.isKeyed) {
val ks = new KafkaAvroDeserializer(schemaRegistryClient)
ks.configure(schemaRegistryProps, true)
Some(ks)
} else None

/**
* Convert a kafka consumer record instance into an instance of our
* produced event type. Must be defined by implementing classes.
* @param record
* a kafka consumer record
* Convert a deserialized key/value pair of objects into an instance of
* the flink runner ADT. This method must be implemented by subclasses.
*
* The key and value are passed as AnyRefs, so implementing subclasses
* will need to pattern match.
*
* @param key
* an optional deserialized key object
* @param value
* a deserialized value object
* @return
* an instance of the flink runner ADT
*/
def fromConsumerRecord(
record: ConsumerRecord[Array[Byte], Array[Byte]]): E
def fromKeyValue(key: Option[AnyRef], value: AnyRef): E

override def deserialize(
record: ConsumerRecord[Array[Byte], Array[Byte]],
out: Collector[E]): Unit = fromConsumerRecord(record)
out: Collector[E]): Unit = {
val key =
keyDeserializer.map(ds => ds.deserialize(topic, record.key()))
val value = valueDeserializer.deserialize(topic, record.value())
if (Option(value).nonEmpty) out.collect(fromKeyValue(key, value))
}

override def getProducedType: TypeInformation[E] =
TypeInformation.of(new TypeHint[E] {})
}

object ConfluentAvroRegistryKafkaRecordDeserializationSchema {
val DEFAULT_CACHE_CAPACITY = "1000"
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.{
KafkaAvroSerializer,
KafkaAvroSerializerConfig
}
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroSerializer
import io.epiphanous.flinkrunner.model.{
FlinkConfig,
FlinkConnectorName,
FlinkEvent,
KafkaSinkConfig
}
import io.epiphanous.flinkrunner.serde.ConfluentAvroRegistryKafkaRecordSerializationSchema.DEFAULT_CACHE_CAPACITY
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord

import java.{lang, util}
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.mutable

/**
* A schema to serialize an ADT event using a confluent avro schema
Expand All @@ -44,66 +39,87 @@ abstract class ConfluentAvroRegistryKafkaRecordSerializationSchema[
) extends KafkaRecordSerializationSchema[E]
with LazyLogging {

val sinkConfig: KafkaSinkConfig =
config.getSourceConfig(sinkName).asInstanceOf[KafkaSinkConfig]

val url: String =
sinkConfig.properties.getProperty("schema.registry.url")
val cacheCapacity: Int = sinkConfig.properties
.getProperty("schema.registry.cache.capacity", DEFAULT_CACHE_CAPACITY)
.toInt
val removeJavaProps: Boolean = sinkConfig.properties
.getProperty("serializer.remove.java.props", "true")
.toBoolean
val useLogicalTypes: Boolean = sinkConfig.properties
.getProperty("serializer.use.logical.type.converters", "true")
.toBoolean

/** create serializer config */
val serializerConfig: util.Map[String, Boolean] = Map(
KafkaAvroSerializerConfig.AVRO_REMOVE_JAVA_PROPS_CONFIG -> removeJavaProps,
KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG -> useLogicalTypes
).asJava
val sinkConfig: KafkaSinkConfig = {
val sc = config.getSinkConfig(sinkName)
if (sc.connector != FlinkConnectorName.Kafka)
throw new RuntimeException(
s"Requested sink $sinkName is not a kafka sink"
)
sc.asInstanceOf[KafkaSinkConfig]
}

/** our schema registry client */
val schemaRegistryClient =
new CachedSchemaRegistryClient(url, cacheCapacity)
val schemaRegistryProps: util.HashMap[String, String] =
sinkConfig.propertiesMap

/** map to store the value, and optionally, key serializers */
val serializers: mutable.Map[String, KafkaAvroSerializer] =
mutable.Map(
"value" -> new KafkaAvroSerializer(
schemaRegistryClient,
serializerConfig
)
)
val valueSerializer = new KafkaAvroSerializer(
schemaRegistryClient,
schemaRegistryProps
)

/** add the key serializer if needed */
if (sinkConfig.isKeyed) {
val keySerializer = new KafkaAvroSerializer(schemaRegistryClient)
keySerializer.configure(serializerConfig, true)
serializers += ("key" -> keySerializer)
}
val keySerializer: Option[KafkaAvroSerializer] =
if (sinkConfig.isKeyed) {
val ks = new KafkaAvroSerializer(schemaRegistryClient)
ks.configure(schemaRegistryProps, true)
Some(ks)
} else None

val topic: String = sinkConfig.topic

/**
* A helper method to serialize an arbitary key/value pair. This should
* be used by subclasses that implement the [[toKeyValue()]] method.
*
* @param key
* the key
* @param value
* the value
* @tparam K
* the type of key
* @tparam V
* the type of value
* @return
* a tuple of byte arrays (with the key optional)
*/
// def kvSerialize[K, V](key: K, value: V): (Array[Byte], Array[Byte]) = {
// (
// keySerializer.map(s => s.serialize(topic, key)).orNull,
// valueSerializer.serialize(topic, value)
// )
// }

/**
* Convert an element into a producer record of byte arrays. Must be
* defined by implementing classes.
* Implementing subclasses must provide an instance of a schema registry
* client to use, for instance a <code>CachedSchemaRegistryClient</code>
* or a <code>MockSchemaRegistryClient</code> for testing.
*/
def schemaRegistryClient: SchemaRegistryClient

/**
* Convert a flink runner ADT instance into a key/value pair of objects
* to serialize into a kafka message. This must be defined by
* implementing subclasses.
*
* The purpose of this method is to decouple the structure of the flink
* runner ADT from the avro schemas of the underlying kafka messages.
*
* @param element
* an instance of the flinkrunner ADT
* @return
* ProducerRecord of bytes
* (Option[AnyRef], AnyRef)
*/
def toProducerRecord(
element: E): ProducerRecord[Array[Byte], Array[Byte]]
def toKeyValue(element: E): (Option[AnyRef], AnyRef)

override def serialize(
element: E,
context: KafkaRecordSerializationSchema.KafkaSinkContext,
timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] =
toProducerRecord(element)

}
timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val (k, v) = toKeyValue(element)
val key =
keySerializer.flatMap(ks => k.map(kk => ks.serialize(topic, kk)))
val value = valueSerializer.serialize(topic, v)
new ProducerRecord(topic, null, element.$timestamp, key.orNull, value)
}

object ConfluentAvroRegistryKafkaRecordSerializationSchema {
val DEFAULT_CACHE_CAPACITY = "1000"
}
28 changes: 0 additions & 28 deletions src/test/scala/io/epiphanous/flinkrunner/model/MyADT.scala

This file was deleted.

Loading

0 comments on commit 5f24dd2

Please sign in to comment.