Skip to content

Commit

Permalink
fix: treat wrapped arrays in serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Dec 7, 2022
1 parent 89a945c commit 8c43740
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.{
GenericEnumSymbol,
GenericFixed,
GenericRecord
}
import org.apache.avro.generic.{GenericEnumSymbol, GenericFixed, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation

import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.convert.Wrappers

/** A simple custom jackson serializer to handle serializing avro records
* (Generic or Specific)
Expand Down Expand Up @@ -166,6 +163,19 @@ class AvroJsonSerializer
)
}
gen.writeEndArray()
case (ARRAY, arr: Wrappers.MutableBufferWrapper[GenericRecord]) =>
gen.writeArrayFieldStart(name)
arr.asScala.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
i.toString,
e,
schema.getElementType,
gen,
provider
)
}
gen.writeEndArray()
case (MAP, map: collection.Map[String, Any] @unchecked) =>
gen.writeObjectFieldStart(name)
map.foreach { case (k, e) =>
Expand Down Expand Up @@ -229,6 +239,17 @@ class AvroJsonSerializer
provider
)
}
case (ARRAY, arr: Wrappers.MutableBufferWrapper[GenericRecord]) =>
arr.asScala.zipWithIndex.foreach { case (e, i) =>
_serializeElement(
name,
i.toString,
e,
schema.getElementType,
gen,
provider
)
}
case (MAP, _) => gen.writeObject(value)
case (UNION, _) =>
_serializeElement(
Expand Down

0 comments on commit 8c43740

Please sign in to comment.