Skip to content

Commit

Permalink
Merge 3128a43 into 018068e
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Dec 7, 2022
2 parents 018068e + 3128a43 commit f76c921
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package io.epiphanous.flinkrunner.flink.state
import scala.collection.JavaConverters._

object RichStateUtils {
implicit class RichListState[T](listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
implicit class RichListState[T](
listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
def contains(element: T): Boolean = _iterator.contains(element)
def find(element: T) : Option[T]= _iterator.find(v => v.equals(element))
def length: Int = _iterator.length
def find(element: T): Option[T] =
_iterator.find(v => v.equals(element))
def length: Int = _iterator.length
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.epiphanous.flinkrunner.model

import enumeratum._

import scala.collection.immutable

sealed trait KafkaInfoHeader extends EnumEntry

object KafkaInfoHeader extends Enum[KafkaInfoHeader] {

case object SerializedValueSize extends KafkaInfoHeader
case object SerializedKeySize extends KafkaInfoHeader
case object Offset extends KafkaInfoHeader
case object Partition extends KafkaInfoHeader
case object Timestamp extends KafkaInfoHeader
case object TimestampType extends KafkaInfoHeader
case object Topic extends KafkaInfoHeader

def headerName(h: KafkaInfoHeader) = s"Kafka.${h.entryName}"

override def values: immutable.IndexedSeq[KafkaInfoHeader] = findValues
}
210 changes: 176 additions & 34 deletions src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.{GenericEnumSymbol, GenericFixed, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation

import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.convert.Wrappers

/** A simple custom jackson serializer to handle serializing avro records
* (Generic or Specific)
Expand All @@ -37,6 +39,101 @@ class AvroJsonSerializer
gen.writeEndObject()
}

/** Based on a value, determine which, if any, of the possible schemas in
* a union apply to the value.
* @param name
* name of the field which has the union type
* @param value
* the value that needs serializing
* @param unionSchemas
* the admissible schemas for the union
* @tparam T
* the type of value
* @return
* Option[Schema]
*/
def findSchemaOf[T](
name: String,
value: T,
unionSchemas: List[Schema]): Schema =
(if (value == null) {

unionSchemas
.find(_.getType.name().equalsIgnoreCase("null"))

} else {

val valueClass = value.getClass
val valueClassName = valueClass.getCanonicalName

def clsMatch[S](s: Schema, cls: Class[S]): Option[Schema] =
if (
cls.isAssignableFrom(valueClass) || valueClassName.equals(
s.getFullName
)
) Some(s)
else None

unionSchemas
.flatMap(s =>
s.getType match {
case INT =>
clsMatch(s, classOf[Int]) orElse
clsMatch(
s,
classOf[java.lang.Integer]
)
case DOUBLE =>
clsMatch(s, classOf[Double]) orElse clsMatch(
s,
classOf[java.lang.Double]
)
case FLOAT =>
clsMatch(s, classOf[Float]) orElse clsMatch(
s,
classOf[java.lang.Float]
)
case LONG =>
clsMatch(s, classOf[Long]) orElse clsMatch(
s,
classOf[java.lang.Long]
)
case BOOLEAN =>
clsMatch(s, classOf[Boolean]) orElse clsMatch(
s,
classOf[java.lang.Boolean]
)
case STRING => clsMatch(s, classOf[String])
case ARRAY =>
clsMatch(s, classOf[Seq[_]]) orElse clsMatch(
s,
classOf[java.util.List[_]]
)
case MAP =>
clsMatch(s, classOf[collection.Map[_, _]]) orElse clsMatch(
s,
classOf[java.util.Map[_, _]]
)
case BYTES =>
clsMatch(s, classOf[ByteBuffer]) orElse clsMatch(
s,
classOf[Array[Byte]]
)
case ENUM => clsMatch(s, classOf[GenericEnumSymbol[_]])
case FIXED => clsMatch(s, classOf[GenericFixed])
case RECORD => clsMatch(s, classOf[GenericRecord])
case _ => None
}
)
.headOption
}).getOrElse(
throw new RuntimeException(
s"field $name has value ($value) of an unexpected type; should be in (${unionSchemas
.map(_.getType.name())
.mkString(", ")})"
)
)

@tailrec
private def _serializeAvroValue[T: TypeInformation](
name: String,
Expand All @@ -45,56 +142,67 @@ class AvroJsonSerializer
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (NULL, _) => gen.writeNullField(name)
case (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
case (NULL, _) | (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
_serializeAvroValue(name, v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
case (RECORD, record: GenericRecord) =>
gen.writeFieldName(name)
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
case (ENUM, ord: Int) =>
gen.writeStringField(name, schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
case (ARRAY, seq: Seq[_]) =>
gen.writeArrayFieldStart(name)
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
seq.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
i.toString,
e,
schema.getElementType,
gen,
provider
)
}
gen.writeEndArray()
case (MAP, map: Map[String, _] @unchecked) =>
case (ARRAY, arr: Wrappers.MutableBufferWrapper[GenericRecord]) =>
gen.writeArrayFieldStart(name)
arr.asScala.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
i.toString,
e,
schema.getElementType,
gen,
provider
)
}
gen.writeEndArray()
case (MAP, map: collection.Map[String, Any] @unchecked) =>
gen.writeObjectFieldStart(name)
map.foreach { case (k, e) =>
gen.writeFieldName(k)
_serializeElement(e, schema.getValueType, gen, provider)
_serializeElement(name, k, e, schema.getValueType, gen, provider)
}
gen.writeEndObject()
case (UNION, _) =>
// todo: not a very sophisticated way to process unions, but it covers common case of [null, type]
val nonNullTypes =
schema.getTypes.asScala.filterNot(s => s.getType == NULL)
if (nonNullTypes.size > 1) {
throw new RuntimeException(
s"field $name of type union has more than one non-null type: $nonNullTypes"
)
}
case (UNION, _) =>
_serializeAvroValue(
name,
value,
nonNullTypes.head,
findSchemaOf(name, value, schema.getTypes.asScala.toList),
gen,
provider
)
case (FIXED | BYTES, bytes: Array[Byte]) =>
case (FIXED | BYTES, bytes: Array[Byte]) => // TODO: test this
gen.writeBinaryField(name, bytes)
case (STRING, string: String) =>
case (STRING, string: String) =>
gen.writeStringField(name, string)
case (INT, int: Int) =>
case (INT, int: Int) =>
gen.writeNumberField(name, int)
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
gen.writeBooleanField(name, boolean)
case _ =>
case _ =>
gen.writeFieldName(name)
provider
.findValueSerializer(
Expand All @@ -106,31 +214,65 @@ class AvroJsonSerializer
}

private def _serializeElement(
name: String,
key: String,
value: Any,
schema: Schema,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (_, null | None) => gen.writeNull()
case (_, Some(v)) => _serializeElement(v, schema, gen, provider)
case (_, Some(v)) =>
_serializeElement(name, key, v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
gen.writeString(schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
seq.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
s"$key[$i]",
e,
schema.getElementType,
gen,
provider
)
}
case (ARRAY, arr: Wrappers.MutableBufferWrapper[GenericRecord]) =>
arr.asScala.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
i.toString,
e,
schema.getElementType,
gen,
provider
)
}
case (MAP, _) => gen.writeObject(value)
case (UNION, _) => // todo
case (UNION, _) =>
_serializeElement(
name,
key,
value,
findSchemaOf(
s"$name[$key]",
value,
schema.getTypes.asScala.toList
),
gen,
provider
)
case (STRING, string: String) => gen.writeString(string)
case (INT, int: Int) => gen.writeNumber(int)
case (LONG, long: Long) => gen.writeNumber(long)
case (DOUBLE, double: Double) => gen.writeNumber(double)
case (BOOLEAN, boolean: Boolean) => gen.writeBoolean(boolean)
case _ =>
logger.error(
s"no serializer for array element type ${schema.getType.name()}"
s"no serializer found for $name[$key] element with type ${schema.getType
.name()} and value $value"
)
// todo
}
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/io/epiphanous/flinkrunner/serde/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ case class Codec[E](
jsonConfig.sortKeys
)
.configure(SerializationFeature.INDENT_OUTPUT, jsonConfig.pretty)
.configure(
DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE,
false
)
(if (isAvro) mapper.addModule(avroModule) else mapper)
.build() :: ScalaReflectExtensions
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.KafkaInfoHeader._
import io.epiphanous.flinkrunner.model.source.KafkaSourceConfig
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
Expand Down Expand Up @@ -72,7 +73,21 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
.map(_.asScala.map { h =>
(h.key(), new String(h.value(), StandardCharsets.UTF_8))
}.toMap)
.getOrElse(Map.empty[String, String])
.getOrElse(Map.empty[String, String]) ++ Map(
headerName(SerializedValueSize) -> record
.serializedValueSize()
.toString,
headerName(SerializedKeySize) -> record
.serializedKeySize()
.toString,
headerName(Offset) -> record.offset().toString,
headerName(Partition) -> record.partition().toString,
headerName(Timestamp) -> record.timestamp().toString,
headerName(TimestampType) -> record
.timestampType()
.name(),
headerName(Topic) -> record.topic()
)

val key = Option(record.key()).map(keyBytes =>
new String(keyBytes, StandardCharsets.UTF_8)
Expand Down

0 comments on commit f76c921

Please sign in to comment.