Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka info headers to deserialized avro messages #56

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package io.epiphanous.flinkrunner.flink.state
import scala.collection.JavaConverters._

object RichStateUtils {
implicit class RichListState[T](listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
implicit class RichListState[T](
listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
def contains(element: T): Boolean = _iterator.contains(element)
def find(element: T) : Option[T]= _iterator.find(v => v.equals(element))
def length: Int = _iterator.length
def find(element: T): Option[T] =
_iterator.find(v => v.equals(element))
def length: Int = _iterator.length
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.epiphanous.flinkrunner.model

import enumeratum._

import scala.collection.immutable

sealed trait KafkaInfoHeader extends EnumEntry

object KafkaInfoHeader extends Enum[KafkaInfoHeader] {

case object SerializedValueSize extends KafkaInfoHeader
case object SerializedKeySize extends KafkaInfoHeader
case object Offset extends KafkaInfoHeader
case object Partition extends KafkaInfoHeader
case object Timestamp extends KafkaInfoHeader
case object TimestampType extends KafkaInfoHeader
case object Topic extends KafkaInfoHeader

def headerName(h: KafkaInfoHeader) = s"Kafka.${h.entryName}"

override def values: immutable.IndexedSeq[KafkaInfoHeader] = findValues
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.{
GenericEnumSymbol,
GenericFixed,
GenericRecord
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation

import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -37,6 +42,101 @@ class AvroJsonSerializer
gen.writeEndObject()
}

/** Based on a value, determine which, if any, of the possible schemas in
* a union apply to the value.
* @param name
* name of the field which has the union type
* @param value
* the value that needs serializing
* @param unionSchemas
* the admissible schemas for the union
* @tparam T
* the type of value
* @return
* Option[Schema]
*/
def findSchemaOf[T](
name: String,
value: T,
unionSchemas: List[Schema]): Schema =
(if (value == null) {

unionSchemas
.find(_.getType.name().equalsIgnoreCase("null"))

} else {

val valueClass = value.getClass
val valueClassName = valueClass.getCanonicalName

def clsMatch[S](s: Schema, cls: Class[S]): Option[Schema] =
if (
cls.isAssignableFrom(valueClass) || valueClassName.equals(
s.getFullName
)
) Some(s)
else None

unionSchemas
.flatMap(s =>
s.getType match {
case INT =>
clsMatch(s, classOf[Int]) orElse
clsMatch(
s,
classOf[java.lang.Integer]
)
case DOUBLE =>
clsMatch(s, classOf[Double]) orElse clsMatch(
s,
classOf[java.lang.Double]
)
case FLOAT =>
clsMatch(s, classOf[Float]) orElse clsMatch(
s,
classOf[java.lang.Float]
)
case LONG =>
clsMatch(s, classOf[Long]) orElse clsMatch(
s,
classOf[java.lang.Long]
)
case BOOLEAN =>
clsMatch(s, classOf[Boolean]) orElse clsMatch(
s,
classOf[java.lang.Boolean]
)
case STRING => clsMatch(s, classOf[String])
case ARRAY =>
clsMatch(s, classOf[Seq[_]]) orElse clsMatch(
s,
classOf[java.util.List[_]]
)
case MAP =>
clsMatch(s, classOf[collection.Map[_, _]]) orElse clsMatch(
s,
classOf[java.util.Map[_, _]]
)
case BYTES =>
clsMatch(s, classOf[ByteBuffer]) orElse clsMatch(
s,
classOf[Array[Byte]]
)
case ENUM => clsMatch(s, classOf[GenericEnumSymbol[_]])
case FIXED => clsMatch(s, classOf[GenericFixed])
case RECORD => clsMatch(s, classOf[GenericRecord])
case _ => None
}
)
.headOption
}).getOrElse(
throw new RuntimeException(
s"field $name has value ($value) of an unexpected type; should be in (${unionSchemas
.map(_.getType.name())
.mkString(", ")})"
)
)

@tailrec
private def _serializeAvroValue[T: TypeInformation](
name: String,
Expand All @@ -45,56 +145,54 @@ class AvroJsonSerializer
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (NULL, _) => gen.writeNullField(name)
case (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
case (NULL, _) | (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
_serializeAvroValue(name, v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
case (RECORD, record: GenericRecord) =>
gen.writeFieldName(name)
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
case (ENUM, ord: Int) =>
gen.writeStringField(name, schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
case (ARRAY, seq: Seq[_]) =>
gen.writeArrayFieldStart(name)
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
seq.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
i.toString,
e,
schema.getElementType,
gen,
provider
)
}
gen.writeEndArray()
case (MAP, map: Map[String, _] @unchecked) =>
case (MAP, map: collection.Map[String, Any] @unchecked) =>
gen.writeObjectFieldStart(name)
map.foreach { case (k, e) =>
gen.writeFieldName(k)
_serializeElement(e, schema.getValueType, gen, provider)
_serializeElement(name, k, e, schema.getValueType, gen, provider)
}
gen.writeEndObject()
case (UNION, _) =>
// todo: not a very sophisticated way to process unions, but it covers common case of [null, type]
val nonNullTypes =
schema.getTypes.asScala.filterNot(s => s.getType == NULL)
if (nonNullTypes.size > 1) {
throw new RuntimeException(
s"field $name of type union has more than one non-null type: $nonNullTypes"
)
}
case (UNION, _) =>
_serializeAvroValue(
name,
value,
nonNullTypes.head,
findSchemaOf(name, value, schema.getTypes.asScala.toList),
gen,
provider
)
case (FIXED | BYTES, bytes: Array[Byte]) =>
case (FIXED | BYTES, bytes: Array[Byte]) => // TODO: test this
gen.writeBinaryField(name, bytes)
case (STRING, string: String) =>
case (STRING, string: String) =>
gen.writeStringField(name, string)
case (INT, int: Int) =>
case (INT, int: Int) =>
gen.writeNumberField(name, int)
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
gen.writeBooleanField(name, boolean)
case _ =>
case _ =>
gen.writeFieldName(name)
provider
.findValueSerializer(
Expand All @@ -106,31 +204,54 @@ class AvroJsonSerializer
}

private def _serializeElement(
name: String,
key: String,
value: Any,
schema: Schema,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (_, null | None) => gen.writeNull()
case (_, Some(v)) => _serializeElement(v, schema, gen, provider)
case (_, Some(v)) =>
_serializeElement(name, key, v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
gen.writeString(schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
seq.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
s"$key[$i]",
e,
schema.getElementType,
gen,
provider
)
}
case (MAP, _) => gen.writeObject(value)
case (UNION, _) => // todo
case (UNION, _) =>
_serializeElement(
name,
key,
value,
findSchemaOf(
s"$name[$key]",
value,
schema.getTypes.asScala.toList
),
gen,
provider
)
case (STRING, string: String) => gen.writeString(string)
case (INT, int: Int) => gen.writeNumber(int)
case (LONG, long: Long) => gen.writeNumber(long)
case (DOUBLE, double: Double) => gen.writeNumber(double)
case (BOOLEAN, boolean: Boolean) => gen.writeBoolean(boolean)
case _ =>
logger.error(
s"no serializer for array element type ${schema.getType.name()}"
s"no serializer found for $name[$key] element with type ${schema.getType
.name()} and value $value"
)
// todo
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.KafkaInfoHeader._
import io.epiphanous.flinkrunner.model.source.KafkaSourceConfig
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
Expand Down Expand Up @@ -72,7 +73,21 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
.map(_.asScala.map { h =>
(h.key(), new String(h.value(), StandardCharsets.UTF_8))
}.toMap)
.getOrElse(Map.empty[String, String])
.getOrElse(Map.empty[String, String]) ++ Map(
headerName(SerializedValueSize) -> record
.serializedValueSize()
.toString,
headerName(SerializedKeySize) -> record
.serializedKeySize()
.toString,
headerName(Offset) -> record.offset().toString,
headerName(Partition) -> record.partition().toString,
headerName(Timestamp) -> record.timestamp().toString,
headerName(TimestampType) -> record
.timestampType()
.name(),
headerName(Topic) -> record.topic()
)

val key = Option(record.key()).map(keyBytes =>
new String(keyBytes, StandardCharsets.UTF_8)
Expand Down
Loading