Skip to content

Commit

Permalink
allow multiple events to be generated from a single kafka key/value r…
Browse files Browse the repository at this point in the history
…ecord
  • Loading branch information
nextdude committed Dec 4, 2021
1 parent 977b831 commit 988a283
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.epiphanous.flinkrunner.model.{
FlinkEvent,
KafkaSourceConfig
}
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.flink.util.Collector
Expand All @@ -19,13 +20,19 @@ import java.util
/**
* A deserialization schema that uses the provided confluent avro schema
* registry client and `fromKV` partial function to deserialize a kafka
* key/value pair into an instance of a flink runner ADT.
* key/value pair into instances of a flink runner ADT.
*
* In order to decouple the shape of the flink runner ADT types from the
* types that are serialized in kafka, a user of this class must provide a
* `fromKV` partial function that maps from the specific key and value pair
* (key is optional) deserialized from the kafka source into an instance of
* the flink runner ADT.
* (key is optional) deserialized from the kafka source into a sequence of
* instances of the flink runner ADT. This means a single kafka record
* could generate multiple events.
*
* Note: that the number and size of the produced events should be
* relatively small. Depending on the source implementation records can be
* buffered in memory or collecting records might delay emitting the next
* checkpoint barrier.
*
* Usually, `fromKV` is as simple as providing a set of cases. Consider the
* following example, where `A` and `B` are subclasses of the flink runner
Expand All @@ -34,11 +41,12 @@ import java.util
* ignore the key and have defined our ADT types to be wrappers around the
* deserialized records. However, you can use the deserialized key and
* value in any way that makes sense for your application.
*
* {{{
* {
* // (key,value) => ADT
* case (_, a:ASpecific) => A(a)
* case (_, b:BSpecific) => B(b)
* case (_, a:ASpecific) => Seq(A(a))
* case (_, b:BSpecific) => Seq(B(b))
* }
* }}}
* @param sourceName
Expand All @@ -48,18 +56,17 @@ import java.util
* @param schemaRegistryClient
* the schema registry client
* @param fromKV
* a partial function that should return a flink runner adt instance when
* passed a deserialized kafka key/value pair
* a partial function that should return a sequence of zero or more flink
* runner adt instances when passed a deserialized kafka key/value pair
* @tparam ADT
* the flink runner ADT type
*/
class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
ADT <: FlinkEvent
](
ADT <: FlinkEvent](
sourceName: String,
config: FlinkConfig[ADT],
schemaRegistryClient: SchemaRegistryClient,
fromKV: PartialFunction[(Option[AnyRef], AnyRef), ADT]
fromKV: PartialFunction[(Option[AnyRef], AnyRef), Seq[ADT]]
) extends KafkaRecordDeserializationSchema[ADT]
with LazyLogging {

Expand Down Expand Up @@ -95,7 +102,7 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
val key =
keyDeserializer.map(ds => ds.deserialize(topic, record.key()))
val value = valueDeserializer.deserialize(topic, record.value())
if (Option(value).nonEmpty) out.collect(fromKV(key, value))
if (Option(value).nonEmpty) fromKV(key, value).map(out.collect)
}

override def getProducedType: TypeInformation[ADT] =
Expand Down

0 comments on commit 988a283

Please sign in to comment.