From fd7a5f43990319db6ab8d7f936b620b04973f267 Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Wed, 7 Sep 2022 07:11:57 -0400 Subject: [PATCH] refactor embedded avro json file encoder --- .../serde/EmbeddedAvroJsonFileEncoder.scala | 50 +++++++++---------- .../flinkrunner/model/MyAvroADT.scala | 42 ++++++++++++++-- .../serde/JsonEncoderTestUtils.scala | 9 ++-- 3 files changed, 68 insertions(+), 33 deletions(-) diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala index 7edddb32..9e4e3509 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala @@ -1,18 +1,15 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator} import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, FlinkEvent} -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} -import org.apache.avro.io.{EncoderFactory, JsonEncoder} -import org.apache.avro.specific.SpecificRecord -import org.apache.commons.io.output.ByteArrayOutputStream +import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.serialization.Encoder import org.apache.flink.api.common.typeinfo.TypeInformation import java.io.OutputStream import java.nio.charset.StandardCharsets +import java.util +import scala.collection.JavaConverters._ import scala.util.Try /** A thin wrapper to emit an embedded avro record from events into an @@ -30,32 +27,31 @@ import scala.util.Try class EmbeddedAvroJsonFileEncoder[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, A <: GenericRecord: TypeInformation, - ADT <: FlinkEvent](schemaOpt: Option[Schema] = None) + ADT <: FlinkEvent](pretty: Boolean = false, sortKeys: Boolean = false) extends Encoder[E] + with JsonCodec with LazyLogging { - @transient - lazy val avroClass: Class[A] = - implicitly[TypeInformation[A]].getTypeClass - lazy val lineEndBytes: Array[Byte] = System.lineSeparator().getBytes(StandardCharsets.UTF_8) - override def encode(element: E, stream: OutputStream): Unit = { - val record = element.$record - val schema = record.getSchema - // is this inefficient? - val encoder = EncoderFactory.get().jsonEncoder(schema, stream) - val writer = new GenericDatumWriter[GenericRecord](schema) + def asMap(record: A): util.Map[String, AnyRef] = + record.getSchema.getFields.asScala.toList + .map { f => + val name = f.name() + name -> record.get(name) + } + .toMap + .asJava + + override def encode(element: E, stream: OutputStream): Unit = + Try(getMapper().writeValueAsBytes(asMap(element.$record))) + .fold( + t => logger.error(s"failed to encode json $element", t), + bytes => { + stream.write(bytes) + stream.write(lineEndBytes) + } + ) - Try { - writer.write(record, encoder) - encoder.flush() - stream.write(lineEndBytes) - }.fold( - error => - logger.error(s"Failed to encode avro record $record", error), - _ => () - ) - } } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala index 6c2ec27b..c77ac0cd 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala @@ -1,12 +1,47 @@ package io.epiphanous.flinkrunner.model +import io.epiphanous.flinkrunner.serde.DelimitedConfig +import org.apache.avro.generic.GenericRecord + +import scala.collection.JavaConverters._ import scala.language.implicitConversions -sealed trait MyAvroADT extends FlinkEvent +sealed trait MyAvroADT extends FlinkEvent { + def toJson(pretty: Boolean = false, sortKeys: Boolean = false): String +} + +trait TestSerializers[A <: GenericRecord] { + def $record: A + def toJson( + pretty: Boolean = false, + sortKeys: Boolean = false): String = { + val fields = $record.getSchema.getFields.asScala.toList.map(_.name()) + val sfields = if (sortKeys) fields.sorted else fields + sfields + .map { name => + val value = $record.get(name) match { + case None | null => "null" + case s: String => s""""${s.replaceAll("\"", "\\\n")}"""" + case value => value.toString + } + s""""$name":${if (pretty) " " else ""}$value""" + } + .mkString( + if (pretty) "{\n " else "{", + if (pretty) ",\n " else ",", + if (pretty) "\n}" else "}" + ) + } + + def toDelimited( + delimitedConfig: DelimitedConfig = DelimitedConfig.CSV): String = + "" +} case class AWrapper(value: ARecord) extends MyAvroADT - with EmbeddedAvroRecord[ARecord] { + with EmbeddedAvroRecord[ARecord] + with TestSerializers[ARecord] { override val $id: String = value.a0 override val $key: String = $id override val $timestamp: Long = value.a3.toEpochMilli @@ -24,7 +59,8 @@ object AWrapper extends EmbeddedAvroRecordFactory[AWrapper, ARecord] { case class BWrapper(value: BRecord) extends MyAvroADT - with EmbeddedAvroRecord[BRecord] { + with EmbeddedAvroRecord[BRecord] + with TestSerializers[BRecord] { override val $id: String = value.b0 override val $key: String = $id override val $timestamp: Long = value.b3.toEpochMilli diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala index 590249b6..b141a826 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala @@ -5,18 +5,21 @@ import org.apache.flink.api.scala.createTypeInformation import org.scalacheck.{Arbitrary, Gen} trait JsonEncoderTestUtils extends BasePropGenerators { - case class JsonEncoderTest(a: Int, b: String) { - def serialize: String = s"""{"a":$a,"b":"$b"}""" + case class JsonEncoderTest(a: Int, b: String, c: Option[Double]) { + def serialize: String = s"""{"a":$a,"b":"$b","c":${c.orNull}}""" } val genTest: Gen[JsonEncoderTest] = for { a <- Gen.chooseNum[Int](1, 100) b <- nameGen("test") - } yield JsonEncoderTest(a, b) + c <- Gen.option(Gen.chooseNum(100d, 900d)) + } yield JsonEncoderTest(a, b, c) implicit val arbTest: Arbitrary[JsonEncoderTest] = Arbitrary( genTest ) + /** TODO: improve support for testing pretty/sortkeys + */ def getFileEncoder(pretty: Boolean = false, sortKeys: Boolean = false) = new JsonFileEncoder[JsonEncoderTest](pretty, sortKeys)