Skip to content

Commit

Permalink
adding support to serialize scala..MapWrapper same way as a regular Map
Browse files Browse the repository at this point in the history
  • Loading branch information
LizaMo committed Nov 21, 2022
1 parent 018068e commit 583f898
Showing 1 changed file with 11 additions and 9 deletions.
@@ -1,8 +1,8 @@
package io.epiphanous.flinkrunner.serde

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
Expand All @@ -12,6 +12,7 @@ import org.apache.flink.api.scala.createTypeInformation

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`

/** A simple custom jackson serializer to handle serializing avro records
* (Generic or Specific)
Expand Down Expand Up @@ -61,6 +62,7 @@ class AvroJsonSerializer
}
gen.writeEndArray()
case (MAP, map: Map[String, _] @unchecked) =>
case (MAP, map: scala.collection.convert.Wrappers.MapWrapper[String,Any] @unchecked) =>
gen.writeObjectFieldStart(name)
map.foreach { case (k, e) =>
gen.writeFieldName(k)
Expand Down Expand Up @@ -95,13 +97,13 @@ class AvroJsonSerializer
case (BOOLEAN, boolean: Boolean) =>
gen.writeBooleanField(name, boolean)
case _ =>
gen.writeFieldName(name)
provider
.findValueSerializer(
implicitly[TypeInformation[T]].getTypeClass
)
.asInstanceOf[JsonSerializer[T]]
.serialize(value, gen, provider)
gen.writeFieldName(name)
provider
.findValueSerializer(
implicitly[TypeInformation[T]].getTypeClass
)
.asInstanceOf[JsonSerializer[T]]
.serialize(value, gen, provider)
}
}

Expand All @@ -122,7 +124,7 @@ class AvroJsonSerializer
_serializeElement(e, schema.getElementType, gen, provider)
}
case (MAP, _) => gen.writeObject(value)
case (UNION, _) => // todo
case (UNION, _) => gen.writeString(value.toString)
case (STRING, string: String) => gen.writeString(string)
case (INT, int: Int) => gen.writeNumber(int)
case (LONG, long: Long) => gen.writeNumber(long)
Expand Down

0 comments on commit 583f898

Please sign in to comment.