Skip to content

Commit

Permalink
refactor confluent avro serde (#45)
Browse files Browse the repository at this point in the history
* refactor confluent avro serde

* refine the deserializer implementation

* update tests

* clean up
  • Loading branch information
nextdude committed Oct 14, 2022
1 parent 465faa6 commit e71e44c
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 129 deletions.
14 changes: 7 additions & 7 deletions src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala
Expand Up @@ -89,11 +89,11 @@ abstract class StreamJob[

/** A specialized connected source that combines a control stream with a
* data stream. The control stream indicates when the data stream should
* be considered active (by the control element's $active method). When
* the control stream indicates the data stream is active, data elements
* are emitted. Otherwise, data elements are ignored. The result is a
* stream of active data elements filtered dynamically by the control
* stream.
* be considered active (by the control element's `\$active` method).
* When the control stream indicates the data stream is active, data
* elements are emitted. Otherwise, data elements are ignored. The
* result is a stream of active data elements filtered dynamically by
* the control stream.
* @param controlName
* name of the configured control stream
* @param dataName
Expand Down Expand Up @@ -283,8 +283,8 @@ abstract class StreamJob[
/** A specialized connected avro source that combines an avro control
* stream with an avro data stream. The control stream indicates when
* the data stream should be considered active (by the control element's
* $active method). When the control stream indicates the data stream is
* active, data elements are emitted. Otherwise, data elements are
* `\$active` method). When the control stream indicates the data stream
* is active, data elements are emitted. Otherwise, data elements are
* ignored. The result is a stream of active data elements filtered
* dynamically by the control stream.
* @param controlName
Expand Down
Expand Up @@ -2,9 +2,8 @@ package io.epiphanous.flinkrunner.model

import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.IntStream
import java.util.{Properties, Random}
import collection.JavaConverters._
import scala.collection.JavaConverters._

/** Configuration for a data generator.
* @param rowsPerSecond
Expand Down Expand Up @@ -69,7 +68,7 @@ case class GeneratorConfig(
if (rng.nextDouble() <= probOutOfOrder) -1
else 1
val increment = direction * rng.nextInt(maxTimeStep)
increment
increment.toLong
})

/** Return true if next value generated should be null
Expand Down Expand Up @@ -112,7 +111,7 @@ case class GeneratorConfig(
rng
.ints(48, 123)
.filter(i => (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
.limit(maxLength)
.limit(maxLength.toLong)
.iterator()
.asScala
.map(_.toChar)
Expand Down
@@ -1,35 +1,27 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.ConfigObject
import io.confluent.kafka.schemaregistry.client.{
CachedSchemaRegistryClient,
SchemaRegistryClient
}
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps

import java.util
import scala.util.Try

case class SchemaRegistryConfig(
isDeserializing: Boolean = false,
url: String = "http://schema-registry:8082",
cacheCapacity: Int = 1000,
headers: util.HashMap[String, String] = new util.HashMap(),
props: util.HashMap[String, String] = new util.HashMap()) {
val isSerializing: Boolean = !isDeserializing
props.put("schema.registry.url", url)
props.put("specific.avro.reader", "false") // don't make this true!
props.putIfAbsent("use.logical.type.converters", "true")
props.putIfAbsent("specific.avro.reader", "true")
def getClient: SchemaRegistryClient = {
new CachedSchemaRegistryClient(
url,
cacheCapacity,
props,
headers
)
}
}
object SchemaRegistryConfig {
def apply(configOpt: Option[ConfigObject]): SchemaRegistryConfig =
def apply(
isDeserializing: Boolean,
configOpt: Option[ConfigObject]): SchemaRegistryConfig =
configOpt
.map { o =>
val c = o.toConfig
Expand All @@ -41,11 +33,12 @@ object SchemaRegistryConfig {
val props =
Try(c.getObject("props")).toOption.asProperties.asJavaMap
SchemaRegistryConfig(
isDeserializing = isDeserializing,
url = url,
cacheCapacity = cacheCapacity,
headers = headers,
props = props
)
}
.getOrElse(SchemaRegistryConfig())
.getOrElse(SchemaRegistryConfig(isDeserializing))
}
Expand Up @@ -7,8 +7,6 @@ import io.epiphanous.flinkrunner.serde.{
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps._
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
Expand All @@ -19,8 +17,8 @@ import org.apache.flink.connector.kafka.sink.{
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream

import java.time.Duration
import java.util.Properties
import scala.util.Try

/** Kafka sink config.
*
Expand Down Expand Up @@ -56,11 +54,11 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
.map(s => s.toLowerCase.replaceAll("[^a-z]+", "-")) match {
case Some("at-least-once") =>
DeliveryGuarantee.AT_LEAST_ONCE
case Some("none") =>
case Some("exactly-once") =>
DeliveryGuarantee.EXACTLY_ONCE
case Some("none") =>
DeliveryGuarantee.NONE
case _ => DeliveryGuarantee.AT_LEAST_ONCE
case _ => DeliveryGuarantee.AT_LEAST_ONCE
}

/** ensure transaction.timeout.ms is set */
Expand All @@ -71,10 +69,24 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
}

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
.getObjectOption(pfx("schema.registry"))
)

val cacheConcurrencyLevel: Int =
config.getIntOpt(pfx("cache.concurrency.level")).getOrElse(4)

val cacheMaxSize: Long =
config.getLongOpt(pfx("cache.max.size")).getOrElse(10000L)

val cacheExpireAfter: Duration = config
.getDurationOpt(pfx("cache.expire.after"))
.getOrElse(Duration.ofHours(1))

val cacheRecordStats: Boolean =
config.getBooleanOpt(pfx("cache.record.stats")).getOrElse(true)

/** Return an confluent avro serialization schema */
def getAvroSerializationSchema[
E <: ADT with EmbeddedAvroRecord[A],
Expand Down
Expand Up @@ -7,7 +7,6 @@ import io.epiphanous.flinkrunner.serde.{
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps._
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.{Source, SourceSplit}
Expand All @@ -18,7 +17,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.kafka.clients.consumer.OffsetResetStrategy

import java.util.Properties
import scala.util.Try

/** A source config for using a kafka as a source for a flink job. For
* example, the following config can be used to read from a topic in kafka
Expand Down Expand Up @@ -127,10 +125,13 @@ case class KafkaSourceConfig[ADT <: FlinkEvent](
.getOrElse(s"${config.jobName}.$name")

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
.getObjectOption(pfx("schema.registry"))
)

val schemaOpt: Option[String] = config.getStringOpt(pfx("avro.schema"))

/** Returns a confluent avro registry aware deserialization schema for
* kafka.
*
Expand All @@ -150,7 +151,8 @@ case class KafkaSourceConfig[ADT <: FlinkEvent](
fromKV: EmbeddedAvroRecordInfo[A] => E)
: KafkaRecordDeserializationSchema[E] = {
new ConfluentAvroRegistryKafkaRecordDeserializationSchema[E, A, ADT](
this
this,
schemaOpt
)
}

Expand Down
Expand Up @@ -36,7 +36,7 @@ import org.apache.flink.configuration.Configuration
* @param identifier
* a function that creates a unique string from the incoming event to
* determine if it exists in the bloom filter (defaults to the event's
* <code>$id</code> member)
* `\$id` member)
* @tparam E
* the event stream type
* @tparam ADT
Expand Down
@@ -1,20 +1,22 @@
package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.epiphanous.flinkrunner.model.source.KafkaSourceConfig
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
EmbeddedAvroRecordInfo,
FlinkEvent,
SchemaRegistryConfig
FlinkEvent
}
import io.epiphanous.flinkrunner.util.AvroUtils.{
isSpecific,
schemaOf,
toEmbeddedAvroInstance
}
import io.epiphanous.flinkrunner.util.AvroUtils.toEmbeddedAvroInstance
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord

Expand All @@ -26,63 +28,57 @@ import scala.collection.JavaConverters._
* that also implements the EmbeddedAvroRecord trait.
* @param sourceConfig
* config for the kafka source
* @param schemaRegistryClientOpt
* an optional schema registry client
* @param schemaOpt
* optional avro schema string, which is required if A is GenericRecord
* @tparam E
* event type being deserialized, with an embedded avro record
* @tparam A
* avro record type embedded within E
* @tparam ADT
* flinkrunner algebraic data type
*/
class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation,
ADT <: FlinkEvent
](
sourceConfig: KafkaSourceConfig[ADT],
schemaRegistryClientOpt: Option[SchemaRegistryClient] = None
schemaOpt: Option[String] = None
)(implicit fromKV: EmbeddedAvroRecordInfo[A] => E)
extends KafkaRecordDeserializationSchema[E]
with LazyLogging {

val avroClass: Class[A] = implicitly[TypeInformation[A]].getTypeClass

var valueDeserializer: KafkaAvroDeserializer = _
var keyDeserializer: Option[KafkaAvroDeserializer] = _

override def open(
context: DeserializationSchema.InitializationContext): Unit = {

val schemaRegistryConfig: SchemaRegistryConfig =
sourceConfig.schemaRegistryConfig
require(
isSpecific(avroClass) || schemaOpt.nonEmpty,
s"You must provide an avro record schema in the configuration of source `${sourceConfig.name}`" +
" if you want to deserialize into a generic record type"
)

val schemaRegistryClient: SchemaRegistryClient =
schemaRegistryClientOpt.getOrElse(
schemaRegistryConfig.getClient
)

valueDeserializer = new KafkaAvroDeserializer(
schemaRegistryClient,
schemaRegistryConfig.props
@transient lazy val deserializer
: RegistryAvroDeserializationSchema[GenericRecord] =
ConfluentRegistryAvroDeserializationSchema.forGeneric(
schemaOf(avroClass, schemaOpt),
sourceConfig.schemaRegistryConfig.url,
sourceConfig.schemaRegistryConfig.props
)

keyDeserializer = if (sourceConfig.isKeyed) {
val ks = new KafkaAvroDeserializer(schemaRegistryClient)
ks.configure(schemaRegistryConfig.props, true)
Some(ks)
} else None
}

override def deserialize(
record: ConsumerRecord[Array[Byte], Array[Byte]],
out: Collector[E]): Unit = {
val topic = sourceConfig.topic

val headers = Option(record.headers())
.map(_.asScala.map { h =>
(h.key(), new String(h.value(), StandardCharsets.UTF_8))
}.toMap)
.getOrElse(Map.empty[String, String])
val key =
keyDeserializer.map(ds =>
ds.deserialize(topic, record.key()).toString
)
valueDeserializer
.deserialize(topic, record.value()) match {

val key = Option(record.key()).map(keyBytes =>
new String(keyBytes, StandardCharsets.UTF_8)
)

deserializer.deserialize(record.value()) match {
case a: GenericRecord =>
out.collect(
toEmbeddedAvroInstance[E, A, ADT](a, avroClass, key, headers)
Expand Down

0 comments on commit e71e44c

Please sign in to comment.