Skip to content

Commit

Permalink
add kafka info headers
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Nov 19, 2022
1 parent 0b66ab9 commit fbe113f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
Expand Up @@ -45,56 +45,56 @@ class AvroJsonSerializer
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (NULL, _) => gen.writeNullField(name)
case (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
case (NULL, _) => gen.writeNullField(name)
case (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
_serializeAvroValue(name, v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
case (RECORD, record: GenericRecord) =>
gen.writeFieldName(name)
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
case (ENUM, ord: Int) =>
gen.writeStringField(name, schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
case (ARRAY, seq: Seq[_]) =>
gen.writeArrayFieldStart(name)
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
}
gen.writeEndArray()
case (MAP, map: Map[String, _] @unchecked) =>
case (MAP, map: Map[String, Any] @unchecked) =>
gen.writeObjectFieldStart(name)
map.foreach { case (k, e) =>
gen.writeFieldName(k)
_serializeElement(e, schema.getValueType, gen, provider)
}
gen.writeEndObject()
case (UNION, _) =>
// todo: not a very sophisticated way to process unions, but it covers common case of [null, type]
case (UNION, _) =>
val nonNullTypes =
schema.getTypes.asScala.filterNot(s => s.getType == NULL)
if (nonNullTypes.size > 1) {
throw new RuntimeException(
s"field $name of type union has more than one non-null type: $nonNullTypes"
)
}
// if (nonNullTypes.size > 1) {
// throw new RuntimeException(
// s"field $name of type union has more than one non-null type: $nonNullTypes"
// )
// }

_serializeAvroValue(
name,
value,
nonNullTypes.head,
gen,
provider
)
case (FIXED | BYTES, bytes: Array[Byte]) =>
case (FIXED | BYTES, bytes: Array[Byte]) =>
gen.writeBinaryField(name, bytes)
case (STRING, string: String) =>
case (STRING, string: String) =>
gen.writeStringField(name, string)
case (INT, int: Int) =>
case (INT, int: Int) =>
gen.writeNumberField(name, int)
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
gen.writeBooleanField(name, boolean)
case _ =>
case _ =>
gen.writeFieldName(name)
provider
.findValueSerializer(
Expand Down
Expand Up @@ -73,7 +73,7 @@ class ConfluentAvroRegistryKafkaRecordDeserializationSchema[
.map(_.asScala.map { h =>
(h.key(), new String(h.value(), StandardCharsets.UTF_8))
}.toMap)
.getOrElse(Map.empty[String, String]) ++ (
.getOrElse(Map.empty[String, String]) ++ Map(
headerName(SerializedValueSize) -> record
.serializedValueSize()
.toString,
Expand Down

0 comments on commit fbe113f

Please sign in to comment.