Skip to content

Commit

Permalink
Merge 26281e8 into 465faa6
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Oct 14, 2022
2 parents 465faa6 + 26281e8 commit 2728eb8
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package io.epiphanous.flinkrunner.model

import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.IntStream
import java.util.{Properties, Random}
import collection.JavaConverters._
import scala.collection.JavaConverters._

/** Configuration for a data generator.
* @param rowsPerSecond
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.ConfigObject
import io.confluent.kafka.schemaregistry.client.{
CachedSchemaRegistryClient,
SchemaRegistryClient
}
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps

import java.util
import scala.util.Try

case class SchemaRegistryConfig(
isDeserializing: Boolean = false,
url: String = "http://schema-registry:8082",
cacheCapacity: Int = 1000,
headers: util.HashMap[String, String] = new util.HashMap(),
props: util.HashMap[String, String] = new util.HashMap()) {
val isSerializing: Boolean = !isDeserializing
props.put("schema.registry.url", url)
props.put("specific.avro.reader", "false") // don't make this true!
props.putIfAbsent("use.logical.type.converters", "true")
props.putIfAbsent("specific.avro.reader", "true")
def getClient: SchemaRegistryClient = {
new CachedSchemaRegistryClient(
url,
cacheCapacity,
props,
headers
)
}
}
object SchemaRegistryConfig {
def apply(configOpt: Option[ConfigObject]): SchemaRegistryConfig =
def apply(
isDeserializing: Boolean,
configOpt: Option[ConfigObject]): SchemaRegistryConfig =
configOpt
.map { o =>
val c = o.toConfig
Expand All @@ -41,11 +33,12 @@ object SchemaRegistryConfig {
val props =
Try(c.getObject("props")).toOption.asProperties.asJavaMap
SchemaRegistryConfig(
isDeserializing = isDeserializing,
url = url,
cacheCapacity = cacheCapacity,
headers = headers,
props = props
)
}
.getOrElse(SchemaRegistryConfig())
.getOrElse(SchemaRegistryConfig(isDeserializing))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import io.epiphanous.flinkrunner.serde.{
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps._
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
Expand All @@ -19,8 +17,8 @@ import org.apache.flink.connector.kafka.sink.{
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream

import java.time.Duration
import java.util.Properties
import scala.util.Try

/** Kafka sink config.
*
Expand Down Expand Up @@ -56,11 +54,11 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
.map(s => s.toLowerCase.replaceAll("[^a-z]+", "-")) match {
case Some("at-least-once") =>
DeliveryGuarantee.AT_LEAST_ONCE
case Some("none") =>
case Some("exactly-once") =>
DeliveryGuarantee.EXACTLY_ONCE
case Some("none") =>
DeliveryGuarantee.NONE
case _ => DeliveryGuarantee.AT_LEAST_ONCE
case _ => DeliveryGuarantee.AT_LEAST_ONCE
}

/** ensure transaction.timeout.ms is set */
Expand All @@ -71,10 +69,24 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
}

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
.getObjectOption(pfx("schema.registry"))
)

val cacheConcurrencyLevel: Int =
config.getIntOpt(pfx("cache.concurrency.level")).getOrElse(4)

val cacheMaxSize: Long =
config.getLongOpt(pfx("cache.max.size")).getOrElse(10000L)

val cacheExpireAfter: Duration = config
.getDurationOpt(pfx("cache.expire.after"))
.getOrElse(Duration.ofHours(1))

val cacheRecordStats: Boolean =
config.getBooleanOpt(pfx("cache.record.stats")).getOrElse(true)

/** Return an confluent avro serialization schema */
def getAvroSerializationSchema[
E <: ADT with EmbeddedAvroRecord[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import io.epiphanous.flinkrunner.serde.{
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps._
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.{Source, SourceSplit}
Expand All @@ -18,7 +17,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.kafka.clients.consumer.OffsetResetStrategy

import java.util.Properties
import scala.util.Try

/** A source config for using a kafka as a source for a flink job. For
* example, the following config can be used to read from a topic in kafka
Expand Down Expand Up @@ -127,10 +125,13 @@ case class KafkaSourceConfig[ADT <: FlinkEvent](
.getOrElse(s"${config.jobName}.$name")

val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
isDeserializing = false,
config
.getObjectOption(pfx("schema.registry"))
)

val schemaOpt: Option[String] = config.getStringOpt(pfx("avro.schema"))

/** Returns a confluent avro registry aware deserialization schema for
* kafka.
*
Expand All @@ -150,7 +151,8 @@ case class KafkaSourceConfig[ADT <: FlinkEvent](
fromKV: EmbeddedAvroRecordInfo[A] => E)
: KafkaRecordDeserializationSchema[E] = {
new ConfluentAvroRegistryKafkaRecordDeserializationSchema[E, A, ADT](
this
this,
schemaOpt
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.epiphanous.flinkrunner.model.source.KafkaSourceConfig
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
EmbeddedAvroRecordInfo,
FlinkEvent,
SchemaRegistryConfig
FlinkEvent
}
import io.epiphanous.flinkrunner.util.AvroUtils.{
isSpecific,
schemaOf,
toEmbeddedAvroInstance
}
import io.epiphanous.flinkrunner.util.AvroUtils.toEmbeddedAvroInstance
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord

Expand All @@ -26,63 +28,57 @@ import scala.collection.JavaConverters._
* that also implements the EmbeddedAvroRecord trait.
* @param sourceConfig
* config for the kafka source
* @param schemaRegistryClientOpt
* an optional schema registry client
* @param schemaOpt
* optional avro schema string, which is required if A is GenericRecord
* @tparam E
* event type being deserialized, with an embedded avro record
* @tparam A
* avro record type embedded within E
* @tparam ADT
* flinkrunner algebraic data type
*/
class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation,
ADT <: FlinkEvent
](
sourceConfig: KafkaSourceConfig[ADT],
schemaRegistryClientOpt: Option[SchemaRegistryClient] = None
schemaOpt: Option[String] = None
)(implicit fromKV: EmbeddedAvroRecordInfo[A] => E)
extends KafkaRecordDeserializationSchema[E]
with LazyLogging {

val avroClass: Class[A] = implicitly[TypeInformation[A]].getTypeClass

var valueDeserializer: KafkaAvroDeserializer = _
var keyDeserializer: Option[KafkaAvroDeserializer] = _

override def open(
context: DeserializationSchema.InitializationContext): Unit = {

val schemaRegistryConfig: SchemaRegistryConfig =
sourceConfig.schemaRegistryConfig
require(
isSpecific(avroClass) || schemaOpt.nonEmpty,
s"You must provide an avro record schema in the configuration of source `${sourceConfig.name}`" +
" if you want to deserialize into a generic record type"
)

val schemaRegistryClient: SchemaRegistryClient =
schemaRegistryClientOpt.getOrElse(
schemaRegistryConfig.getClient
)

valueDeserializer = new KafkaAvroDeserializer(
schemaRegistryClient,
schemaRegistryConfig.props
@transient lazy val deserializer
: RegistryAvroDeserializationSchema[GenericRecord] =
ConfluentRegistryAvroDeserializationSchema.forGeneric(
schemaOf(avroClass, schemaOpt),
sourceConfig.schemaRegistryConfig.url,
sourceConfig.schemaRegistryConfig.props
)

keyDeserializer = if (sourceConfig.isKeyed) {
val ks = new KafkaAvroDeserializer(schemaRegistryClient)
ks.configure(schemaRegistryConfig.props, true)
Some(ks)
} else None
}

override def deserialize(
record: ConsumerRecord[Array[Byte], Array[Byte]],
out: Collector[E]): Unit = {
val topic = sourceConfig.topic

val headers = Option(record.headers())
.map(_.asScala.map { h =>
(h.key(), new String(h.value(), StandardCharsets.UTF_8))
}.toMap)
.getOrElse(Map.empty[String, String])
val key =
keyDeserializer.map(ds =>
ds.deserialize(topic, record.key()).toString
)
valueDeserializer
.deserialize(topic, record.value()) match {

val key = Option(record.key()).map(keyBytes =>
new String(keyBytes, StandardCharsets.UTF_8)
)

deserializer.deserialize(record.value()) match {
case a: GenericRecord =>
out.collect(
toEmbeddedAvroInstance[E, A, ADT](a, avroClass, key, headers)
Expand Down
Loading

0 comments on commit 2728eb8

Please sign in to comment.