Skip to content

Commit

Permalink
add kafka info headers to deserialized avro messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Nov 17, 2022
1 parent 018068e commit 0b66ab9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.epiphanous.flinkrunner.model

import enumeratum._

import scala.collection.immutable

sealed trait KafkaInfoHeader extends EnumEntry

object KafkaInfoHeader extends Enum[KafkaInfoHeader] {

case object SerializedValueSize extends KafkaInfoHeader
case object SerializedKeySize extends KafkaInfoHeader
case object Offset extends KafkaInfoHeader
case object Partition extends KafkaInfoHeader
case object Timestamp extends KafkaInfoHeader
case object TimestampType extends KafkaInfoHeader
case object Topic extends KafkaInfoHeader

def headerName(h: KafkaInfoHeader) = s"Kafka.${h.entryName}"

override def values: immutable.IndexedSeq[KafkaInfoHeader] = findValues
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecordInfo,
FlinkEvent
}
import io.epiphanous.flinkrunner.model.KafkaInfoHeader._
import io.epiphanous.flinkrunner.util.AvroUtils.{
isSpecific,
schemaOf,
Expand Down Expand Up @@ -72,7 +73,21 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
.map(_.asScala.map { h =>
(h.key(), new String(h.value(), StandardCharsets.UTF_8))
}.toMap)
.getOrElse(Map.empty[String, String])
.getOrElse(Map.empty[String, String]) ++ (
headerName(SerializedValueSize) -> record
.serializedValueSize()
.toString,
headerName(SerializedKeySize) -> record
.serializedKeySize()
.toString,
headerName(Offset) -> record.offset().toString,
headerName(Partition) -> record.partition().toString,
headerName(Timestamp) -> record.timestamp().toString,
headerName(TimestampType) -> record
.timestampType()
.name(),
headerName(Topic) -> record.topic()
)

val key = Option(record.key()).map(keyBytes =>
new String(keyBytes, StandardCharsets.UTF_8)
Expand Down

0 comments on commit 0b66ab9

Please sign in to comment.