diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala index 9fe9c95..57c0d5a 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/ConfluentAvroRegistryKafkaRecordDeserializationSchema.scala @@ -9,6 +9,7 @@ import io.epiphanous.flinkrunner.model.{ FlinkEvent, KafkaSourceConfig } +import org.apache.avro.specific.SpecificRecord import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema import org.apache.flink.util.Collector @@ -19,13 +20,19 @@ import java.util /** * A deserialization schema that uses the provided confluent avro schema * registry client and `fromKV` partial function to deserialize a kafka - * key/value pair into an instance of a flink runner ADT. + * key/value pair into instances of a flink runner ADT. * * In order to decouple the shape of the flink runner ADT types from the * types that are serialized in kafka, a user of this class must provide a * `fromKV` partial function that maps from the specific key and value pair - * (key is optional) deserialized from the kafka source into an instance of - * the flink runner ADT. + * (key is optional) deserialized from the kafka source into a sequence of + * instances of the flink runner ADT. This means a single kafka record + * could generate multiple events. + * + * Note: that the number and size of the produced events should be + * relatively small. Depending on the source implementation records can be + * buffered in memory or collecting records might delay emitting the next + * checkpoint barrier. * * Usually, `fromKV` is as simple as providing a set of cases. Consider the * following example, where `A` and `B` are subclasses of the flink runner @@ -34,11 +41,12 @@ import java.util * ignore the key and have defined our ADT types to be wrappers around the * deserialized records. However, you can use the deserialized key and * value in any way that makes sense for your application. + * * {{{ * { * // (key,value) => ADT - * case (_, a:ASpecific) => A(a) - * case (_, b:BSpecific) => B(b) + * case (_, a:ASpecific) => Seq(A(a)) + * case (_, b:BSpecific) => Seq(B(b)) * } * }}} * @param sourceName @@ -48,18 +56,17 @@ import java.util * @param schemaRegistryClient * the schema registry client * @param fromKV - * a partial function that should return a flink runner adt instance when - * passed a deserialized kafka key/value pair + * a partial function that should return a sequence of zero or more flink + * runner adt instances when passed a deserialized kafka key/value pair * @tparam ADT * the flink runner ADT type */ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[ - ADT <: FlinkEvent -]( + ADT <: FlinkEvent]( sourceName: String, config: FlinkConfig[ADT], schemaRegistryClient: SchemaRegistryClient, - fromKV: PartialFunction[(Option[AnyRef], AnyRef), ADT] + fromKV: PartialFunction[(Option[AnyRef], AnyRef), Seq[ADT]] ) extends KafkaRecordDeserializationSchema[ADT] with LazyLogging { @@ -95,7 +102,7 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[ val key = keyDeserializer.map(ds => ds.deserialize(topic, record.key())) val value = valueDeserializer.deserialize(topic, record.value()) - if (Option(value).nonEmpty) out.collect(fromKV(key, value)) + if (Option(value).nonEmpty) fromKV(key, value).map(out.collect) } override def getProducedType: TypeInformation[ADT] =