Skip to content

Commit

Permalink
refactor embedded avro json file encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 7, 2022
1 parent 449bd23 commit fd7a5f4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package io.epiphanous.flinkrunner.serde

import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator}
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, FlinkEvent}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{EncoderFactory, JsonEncoder}
import org.apache.avro.specific.SpecificRecord
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.serialization.Encoder
import org.apache.flink.api.common.typeinfo.TypeInformation

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import java.util
import scala.collection.JavaConverters._
import scala.util.Try

/** A thin wrapper to emit an embedded avro record from events into an
Expand All @@ -30,32 +27,31 @@ import scala.util.Try
class EmbeddedAvroJsonFileEncoder[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation,
ADT <: FlinkEvent](schemaOpt: Option[Schema] = None)
ADT <: FlinkEvent](pretty: Boolean = false, sortKeys: Boolean = false)
extends Encoder[E]
with JsonCodec
with LazyLogging {

@transient
lazy val avroClass: Class[A] =
implicitly[TypeInformation[A]].getTypeClass

lazy val lineEndBytes: Array[Byte] =
System.lineSeparator().getBytes(StandardCharsets.UTF_8)

override def encode(element: E, stream: OutputStream): Unit = {
val record = element.$record
val schema = record.getSchema
// is this inefficient?
val encoder = EncoderFactory.get().jsonEncoder(schema, stream)
val writer = new GenericDatumWriter[GenericRecord](schema)
def asMap(record: A): util.Map[String, AnyRef] =
record.getSchema.getFields.asScala.toList
.map { f =>
val name = f.name()
name -> record.get(name)
}
.toMap
.asJava

override def encode(element: E, stream: OutputStream): Unit =
Try(getMapper().writeValueAsBytes(asMap(element.$record)))
.fold(
t => logger.error(s"failed to encode json $element", t),
bytes => {
stream.write(bytes)
stream.write(lineEndBytes)
}
)

Try {
writer.write(record, encoder)
encoder.flush()
stream.write(lineEndBytes)
}.fold(
error =>
logger.error(s"Failed to encode avro record $record", error),
_ => ()
)
}
}
42 changes: 39 additions & 3 deletions src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,47 @@
package io.epiphanous.flinkrunner.model

import io.epiphanous.flinkrunner.serde.DelimitedConfig
import org.apache.avro.generic.GenericRecord

import scala.collection.JavaConverters._
import scala.language.implicitConversions

sealed trait MyAvroADT extends FlinkEvent
sealed trait MyAvroADT extends FlinkEvent {
def toJson(pretty: Boolean = false, sortKeys: Boolean = false): String
}

trait TestSerializers[A <: GenericRecord] {
def $record: A
def toJson(
pretty: Boolean = false,
sortKeys: Boolean = false): String = {
val fields = $record.getSchema.getFields.asScala.toList.map(_.name())
val sfields = if (sortKeys) fields.sorted else fields
sfields
.map { name =>
val value = $record.get(name) match {
case None | null => "null"
case s: String => s""""${s.replaceAll("\"", "\\\n")}""""
case value => value.toString
}
s""""$name":${if (pretty) " " else ""}$value"""
}
.mkString(
if (pretty) "{\n " else "{",
if (pretty) ",\n " else ",",
if (pretty) "\n}" else "}"
)
}

def toDelimited(
delimitedConfig: DelimitedConfig = DelimitedConfig.CSV): String =
""
}

case class AWrapper(value: ARecord)
extends MyAvroADT
with EmbeddedAvroRecord[ARecord] {
with EmbeddedAvroRecord[ARecord]
with TestSerializers[ARecord] {
override val $id: String = value.a0
override val $key: String = $id
override val $timestamp: Long = value.a3.toEpochMilli
Expand All @@ -24,7 +59,8 @@ object AWrapper extends EmbeddedAvroRecordFactory[AWrapper, ARecord] {

case class BWrapper(value: BRecord)
extends MyAvroADT
with EmbeddedAvroRecord[BRecord] {
with EmbeddedAvroRecord[BRecord]
with TestSerializers[BRecord] {
override val $id: String = value.b0
override val $key: String = $id
override val $timestamp: Long = value.b3.toEpochMilli
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import org.apache.flink.api.scala.createTypeInformation
import org.scalacheck.{Arbitrary, Gen}

trait JsonEncoderTestUtils extends BasePropGenerators {
case class JsonEncoderTest(a: Int, b: String) {
def serialize: String = s"""{"a":$a,"b":"$b"}"""
case class JsonEncoderTest(a: Int, b: String, c: Option[Double]) {
def serialize: String = s"""{"a":$a,"b":"$b","c":${c.orNull}}"""
}

val genTest: Gen[JsonEncoderTest] = for {
a <- Gen.chooseNum[Int](1, 100)
b <- nameGen("test")
} yield JsonEncoderTest(a, b)
c <- Gen.option(Gen.chooseNum(100d, 900d))
} yield JsonEncoderTest(a, b, c)
implicit val arbTest: Arbitrary[JsonEncoderTest] = Arbitrary(
genTest
)

/** TODO: improve support for testing pretty/sortkeys
*/
def getFileEncoder(pretty: Boolean = false, sortKeys: Boolean = false) =
new JsonFileEncoder[JsonEncoderTest](pretty, sortKeys)

Expand Down

0 comments on commit fd7a5f4

Please sign in to comment.