diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala index 35c6c60..8aadbbf 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala @@ -1,8 +1,8 @@ package io.epiphanous.flinkrunner.serde import com.fasterxml.jackson.core.JsonGenerator -import com.fasterxml.jackson.databind.ser.std.StdSerializer import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider} +import com.fasterxml.jackson.databind.ser.std.StdSerializer import com.typesafe.scalalogging.LazyLogging import org.apache.avro.Schema import org.apache.avro.Schema.Type._ @@ -12,6 +12,7 @@ import org.apache.flink.api.scala.createTypeInformation import scala.annotation.tailrec import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions.`map AsScala` /** A simple custom jackson serializer to handle serializing avro records * (Generic or Specific) @@ -61,6 +62,7 @@ class AvroJsonSerializer } gen.writeEndArray() case (MAP, map: Map[String, _] @unchecked) => + case (MAP, map: scala.collection.convert.Wrappers.MapWrapper[String,Any] @unchecked) => gen.writeObjectFieldStart(name) map.foreach { case (k, e) => gen.writeFieldName(k) @@ -95,13 +97,13 @@ class AvroJsonSerializer case (BOOLEAN, boolean: Boolean) => gen.writeBooleanField(name, boolean) case _ => - gen.writeFieldName(name) - provider - .findValueSerializer( - implicitly[TypeInformation[T]].getTypeClass - ) - .asInstanceOf[JsonSerializer[T]] - .serialize(value, gen, provider) + gen.writeFieldName(name) + provider + .findValueSerializer( + implicitly[TypeInformation[T]].getTypeClass + ) + .asInstanceOf[JsonSerializer[T]] + .serialize(value, gen, provider) } } @@ -122,7 +124,7 @@ class AvroJsonSerializer _serializeElement(e, schema.getElementType, gen, provider) } case (MAP, _) => gen.writeObject(value) - case (UNION, _) => // todo + case (UNION, _) => gen.writeString(value.toString) case (STRING, string: String) => gen.writeString(string) case (INT, int: Int) => gen.writeNumber(int) case (LONG, long: Long) => gen.writeNumber(long)