From 2a22404740e95953d4a733598f71694a60cb6dc7 Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Sun, 11 Sep 2022 18:44:57 -0400 Subject: [PATCH] refactor/simplify delimited/json encoders/decoders --- build.sbt | 5 +- .../serde/AvroJsonSerializer.scala | 3 +- .../epiphanous/flinkrunner/serde/Codec.scala | 121 ++++++++++++++++++ .../flinkrunner/serde/DelimitedCodec.scala | 75 ----------- .../flinkrunner/serde/DelimitedConfig.scala | 40 +----- .../serde/DelimitedFileEncoder.scala | 35 +++-- .../serde/DelimitedRowDecoder.scala | 20 ++- .../serde/DelimitedRowEncoder.scala | 12 +- .../EmbeddedAvroDelimitedFileEncoder.scala | 59 +-------- .../serde/EmbeddedAvroJsonFileEncoder.scala | 18 +-- .../flinkrunner/serde/JsonCodec.scala | 43 ------- .../flinkrunner/serde/JsonConfig.scala | 14 ++ .../flinkrunner/serde/JsonFileEncoder.scala | 9 +- .../flinkrunner/serde/JsonRowDecoder.scala | 9 +- .../flinkrunner/serde/JsonRowEncoder.scala | 19 +-- .../serde/JsonSerializationSchema.scala | 16 ++- .../flinkrunner/serde/RowEncoder.scala | 6 +- .../flinkrunner/model/MyAvroADT.scala | 43 +++---- .../flinkrunner/model/MySimpleADT.scala | 3 - .../serde/DelimitedConfigTest.scala | 35 ----- .../serde/DelimitedEncoderTestUtils.scala | 35 +++++ .../serde/DelimitedRowDecoderTest.scala | 2 +- .../serde/DelimitedRowEncoderTest.scala | 52 ++++++++ .../EmbeddedAvroJsonFileEncoderTest.scala | 8 +- .../flinkrunner/serde/JsonCodecTest.scala | 20 --- .../serde/JsonEncoderTestUtils.scala | 8 +- .../serde/JsonFileEncoderTest.scala | 5 +- .../serde/JsonRowEncoderTest.scala | 2 +- 28 files changed, 336 insertions(+), 381 deletions(-) create mode 100644 src/main/scala/io/epiphanous/flinkrunner/serde/Codec.scala delete mode 100644 src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedCodec.scala delete mode 100644 src/main/scala/io/epiphanous/flinkrunner/serde/JsonCodec.scala create mode 100644 src/main/scala/io/epiphanous/flinkrunner/serde/JsonConfig.scala delete mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedConfigTest.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedEncoderTestUtils.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoderTest.scala delete mode 100644 src/test/scala/io/epiphanous/flinkrunner/serde/JsonCodecTest.scala diff --git a/build.sbt b/build.sbt index fb5479b4..53bd8226 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,7 @@ val V = new { val scalaTestPlus = "3.2.13.0" val scalaCheck = "1.16.0" val testContainersScala = "0.40.10" - val jackson = "2.13.3" + val jackson = "2.13.4" val circe = "0.14.2" val http4s = "0.23.12" val enumeratum = "1.7.0" @@ -45,7 +45,7 @@ val V = new { val squants = "1.8.3" val confluentAvroSerde = "7.1.1" val parquet = "1.12.3" - val awsSdk = "1.12.290" + val awsSdk = "1.12.296" val jdbcMysql = "8.0.30" val jdbcPg = "42.5.0" val jdbcMssql = "11.2.0.jre11" @@ -121,7 +121,6 @@ val otherDeps = Seq( // "com.github.pjfanning" %% "jackson-scala-reflect-extensions" % V.jackson, "com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson, "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson, - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-avro" % V.jackson, "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test, "com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test, "mysql" % "mysql-connector-java" % V.jdbcMysql % Provided, diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala index 68a7b5dd..35c6c603 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/AvroJsonSerializer.scala @@ -23,6 +23,7 @@ class AvroJsonSerializer record: GenericRecord, gen: JsonGenerator, provider: SerializerProvider): Unit = { +// logger.debug(s"serializing avro record: $record") gen.writeStartObject() record.getSchema.getFields.asScala.foreach { f => _serializeAvroValue( @@ -72,7 +73,7 @@ class AvroJsonSerializer schema.getTypes.asScala.filterNot(s => s.getType == NULL) if (nonNullTypes.size > 1) { throw new RuntimeException( - s"field $name of type union has more than one non-null types: $nonNullTypes" + s"field $name of type union has more than one non-null type: $nonNullTypes" ) } _serializeAvroValue( diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/Codec.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/Codec.scala new file mode 100644 index 00000000..a511de16 --- /dev/null +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/Codec.scala @@ -0,0 +1,121 @@ +package io.epiphanous.flinkrunner.serde + +import com.fasterxml.jackson.databind._ +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.dataformat.csv.{ + CsvGenerator, + CsvMapper, + CsvParser, + CsvSchema +} +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.avro.generic.GenericRecord + +import java.io.OutputStream +import java.nio.charset.StandardCharsets +import scala.collection.JavaConverters._ + +case class Codec[E]( + typeClass: Class[E], + jsonConfig: JsonConfig = JsonConfig(), + delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) { + + lazy val isAvro: Boolean = + classOf[GenericRecord].isAssignableFrom(typeClass) + + lazy val avroModule: SimpleModule = + new SimpleModule().addSerializer(new AvroJsonSerializer) + + lazy val jsonMapper: JsonMapper = { + val mapper = JsonMapper + .builder() + .addModule(DefaultScalaModule) + .addModule(new JavaTimeModule) + .configure( + MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, + jsonConfig.sortKeys + ) + .configure( + SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, + jsonConfig.sortKeys + ) + .configure(SerializationFeature.INDENT_OUTPUT, jsonConfig.pretty) + .configure( + DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE, + false + ) + (if (isAvro) mapper.addModule(avroModule) else mapper).build() + } + + lazy val jsonWriter: ObjectWriter = jsonMapper.writerFor(typeClass) + + lazy val jsonReader: ObjectReader = jsonMapper.readerFor(typeClass) + + lazy val csvMapper: CsvMapper = { + val builder = CsvMapper + .builder() + .addModule(DefaultScalaModule) + .addModule(new JavaTimeModule) + .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, false) + .configure(CsvGenerator.Feature.ALWAYS_QUOTE_STRINGS, false) + .configure(CsvParser.Feature.TRIM_SPACES, true) + .configure(CsvParser.Feature.SKIP_EMPTY_LINES, true) + .configure(CsvParser.Feature.ALLOW_COMMENTS, true) + .configure(CsvParser.Feature.EMPTY_STRING_AS_NULL, true) + .configure( + DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE, + false + ) + (if (isAvro) builder.addModule(avroModule) else builder).build() + } + + lazy val csvSchema: CsvSchema = { + val start = csvMapper.schemaFor(typeClass) + val updatedWithConfig = (if (isAvro) { + val columns = start + .iterator() + .asScala + .toList + .filterNot(c => + c.hasName("schema") || c.hasName( + "specificData" + ) + ) + .asJava + start + .withoutColumns() + .rebuild() + .addColumns(columns) + .build() + } else start) + .withColumnSeparator(delimitedConfig.columnSeparator) + .withLineSeparator(delimitedConfig.lineSeparator) + .withEscapeChar(delimitedConfig.escapeChar) + .withUseHeader(false) // delimited header use handled in encoder + if (delimitedConfig.useQuotes) + updatedWithConfig.withQuoteChar(delimitedConfig.quoteCharacter) + else updatedWithConfig.withoutQuoteChar() + } + + lazy val csvHeader: Array[Byte] = csvSchema + .iterator() + .asScala + .map(_.getName) + .toList + .mkString( + "", + delimitedConfig.columnSeparator.toString, + delimitedConfig.lineSeparator + ) + .getBytes(StandardCharsets.UTF_8) + + def maybeWriteHeader(stream: OutputStream): Unit = + if (delimitedConfig.useHeader) stream.write(csvHeader) + + lazy val csvWriter: ObjectWriter = csvMapper.writer(csvSchema) + + lazy val csvReader: ObjectReader = + csvMapper.readerFor(typeClass).`with`(csvSchema) +} diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedCodec.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedCodec.scala deleted file mode 100644 index 90154209..00000000 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedCodec.scala +++ /dev/null @@ -1,75 +0,0 @@ -package io.epiphanous.flinkrunner.serde - -import com.fasterxml.jackson.databind.{ - DeserializationFeature, - MapperFeature, - ObjectReader, - ObjectWriter -} -import com.fasterxml.jackson.dataformat.csv.{ - CsvGenerator, - CsvMapper, - CsvParser -} -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import collection.JavaConverters._ -import java.nio.charset.StandardCharsets - -trait DelimitedCodec { - - def getMapper: CsvMapper = CsvMapper - .builder() - .addModule(DefaultScalaModule) - .addModule(new JavaTimeModule) - .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, false) - .configure(CsvGenerator.Feature.ALWAYS_QUOTE_STRINGS, false) - .configure(CsvParser.Feature.TRIM_SPACES, true) - .configure(CsvParser.Feature.SKIP_EMPTY_LINES, true) - .configure(CsvParser.Feature.ALLOW_COMMENTS, true) - .configure(CsvParser.Feature.EMPTY_STRING_AS_NULL, true) - .configure( - DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE, - false - ) - .build() - - def getWriter[E]( - delimitedConfig: DelimitedConfig, - typeClass: Class[E]): ObjectWriter = { - val mapper = getMapper - mapper.writer( - delimitedConfig - .intoSchema(mapper.schemaFor(typeClass)) - .withUseHeader(false) - ) - } - - def getHeader[E]( - delimitedConfig: DelimitedConfig, - typeClass: Class[E], - filterOut: Seq[String] = Seq.empty): Array[Byte] = getMapper - .schemaFor(typeClass) - .iterator() - .asScala - .map(c => c.getName) - .toList - .diff(filterOut) - .mkString( - "", - delimitedConfig.columnSeparator.toString, - delimitedConfig.lineSeparator - ) - .getBytes(StandardCharsets.UTF_8) - - def getReader[E]( - delimitedConfig: DelimitedConfig, - typeClass: Class[E]): ObjectReader = { - val mapper = getMapper - val schema = delimitedConfig - .intoSchema(mapper.schemaFor(typeClass)) - .withUseHeader(false) // hardcoding this - mapper.readerFor(typeClass).`with`(schema) - } - -} diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedConfig.scala index 6b21e6e9..6f3ed4f1 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedConfig.scala @@ -1,6 +1,5 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.dataformat.csv.CsvSchema import io.epiphanous.flinkrunner.model.StreamFormatName import java.util.Properties @@ -28,43 +27,12 @@ case class DelimitedConfig( useHeader: Boolean = false, useQuotes: Boolean = false, columns: List[String] = List.empty -) { - - /** Creates a new CsvSchema object based on the DelimitedConfig settings. - * - * Note: If DelimitedConfig has a non-empty column list, any existing - * columns in the start schema will be replaced. - * - * @param start - * a starting schema that we apply our settings into - * @return - * updated CsvSchema - */ - def intoSchema(start: CsvSchema): CsvSchema = { - val csvSchema = { - val s = start - .withColumnSeparator(columnSeparator) - .withLineSeparator(lineSeparator) - .withEscapeChar(escapeChar) - .withUseHeader(useHeader) - if (useQuotes) s.withQuoteChar(quoteCharacter) - else s.withoutQuoteChar() - } - if (columns.isEmpty) csvSchema - else { - columns - .foldLeft(csvSchema.withoutColumns().rebuild())((b, f) => - b.addColumn(f) - ) - .build() - } - } -} +) object DelimitedConfig { - val CSV = DelimitedConfig() - val TSV = DelimitedConfig('\t') - val PSV = DelimitedConfig('|') + val CSV: DelimitedConfig = DelimitedConfig() + val TSV: DelimitedConfig = DelimitedConfig('\t') + val PSV: DelimitedConfig = DelimitedConfig('|') /** Produces a DelimitedConfig based on the request StreamFormatName, * properties and columns passed in. diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedFileEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedFileEncoder.scala index b4609de0..49f65d85 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedFileEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedFileEncoder.scala @@ -1,5 +1,5 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.databind.ObjectWriter +import com.nimbusds.jose.util.StandardCharset import org.apache.flink.api.common.serialization.Encoder import org.apache.flink.api.common.typeinfo.TypeInformation @@ -14,30 +14,29 @@ import java.io.OutputStream */ class DelimitedFileEncoder[E: TypeInformation]( delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) - extends Encoder[E] - with DelimitedCodec { + extends Encoder[E] { @transient - lazy val typeClass: Class[E] = - implicitly[TypeInformation[E]].getTypeClass - - @transient - lazy val header: Array[Byte] = getHeader(delimitedConfig, typeClass) - - @transient - lazy val writer: ObjectWriter = - getWriter(delimitedConfig, typeClass) + lazy val encoder: DelimitedRowEncoder[E] = + new DelimitedRowEncoder[E](delimitedConfig) @transient var out: OutputStream = _ override def encode(element: E, stream: OutputStream): Unit = { - if (delimitedConfig.useHeader) { - if (out != stream) { - out = stream - stream.write(header) - } + if (out != stream) { + out = stream + encoder.codec.maybeWriteHeader(stream) } - stream.write(writer.writeValueAsBytes(element)) + encoder + .encode(element) + .fold( + err => + throw new RuntimeException( + s"failed to delimited-encode $element", + err + ), + line => stream.write(line.getBytes(StandardCharset.UTF_8)) + ) } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoder.scala index 5f61a5a5..c2e5557f 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoder.scala @@ -1,20 +1,28 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.databind.ObjectReader import org.apache.flink.api.common.typeinfo.TypeInformation import scala.util.Try +/** Decode a delimited text line into an instance of the requested class. + * + * @param delimitedConfig + * a delimited codec config (defaults to csv) + * @tparam E + * the type to decode into + */ class DelimitedRowDecoder[E: TypeInformation]( delimitedConfig: DelimitedConfig = DelimitedConfig.CSV.copy(useHeader = false)) - extends RowDecoder[E] - with DelimitedCodec { + extends RowDecoder[E] { @transient - lazy val reader: ObjectReader = - getReader(delimitedConfig, implicitly[TypeInformation[E]].getTypeClass) + lazy val codec: Codec[E] = Codec( + implicitly[TypeInformation[E]].getTypeClass, + delimitedConfig = delimitedConfig + ) override def decode(line: String): Try[E] = - Try(reader.readValue[E](line)) + Try(codec.csvReader.readValue[E](line)) + } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoder.scala index 98cf852d..48ac588a 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoder.scala @@ -1,6 +1,5 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.databind.ObjectWriter import org.apache.flink.api.common.typeinfo.TypeInformation import scala.util.Try @@ -13,13 +12,14 @@ import scala.util.Try */ class DelimitedRowEncoder[E: TypeInformation]( delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) - extends RowEncoder[E] - with DelimitedCodec { + extends RowEncoder[E] { @transient - lazy val writer: ObjectWriter = - getWriter(delimitedConfig, implicitly[TypeInformation[E]].getTypeClass) + lazy val codec: Codec[E] = Codec( + implicitly[TypeInformation[E]].getTypeClass, + delimitedConfig = delimitedConfig + ) override def encode(element: E): Try[String] = - Try(writer.writeValueAsString(element)) + Try(codec.csvWriter.writeValueAsString(element)) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroDelimitedFileEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroDelimitedFileEncoder.scala index c9e18702..4a97f13b 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroDelimitedFileEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroDelimitedFileEncoder.scala @@ -1,13 +1,11 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.databind.{ObjectWriter, SequenceWriter} import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, FlinkEvent} import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.serialization.Encoder import org.apache.flink.api.common.typeinfo.TypeInformation import java.io.OutputStream -import scala.collection.JavaConverters._ /** A thin wrapper to emit an embedded avro record from events into an * output stream in a delimited format. @@ -26,59 +24,10 @@ class EmbeddedAvroDelimitedFileEncoder[ A <: GenericRecord: TypeInformation, ADT <: FlinkEvent]( delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) - extends Encoder[E] - with DelimitedCodec { + extends Encoder[E] { - @transient - lazy val typeClass: Class[A] = - implicitly[TypeInformation[A]].getTypeClass + val encoder = new DelimitedFileEncoder[A](delimitedConfig) - @transient - lazy val header: Array[Byte] = - getHeader(delimitedConfig, typeClass, Seq("schema", "specificData")) - - @transient - lazy val writer: ObjectWriter = - getWriter[A]( - delimitedConfig, - typeClass - ) - - @transient - var sequenceWriter: SequenceWriter = _ - - @transient - var lastStream: OutputStream = _ - - override def encode(element: E, stream: OutputStream): Unit = { - - /** this is here to support writing headers properly */ - if (stream != lastStream) { - if (Option(sequenceWriter).nonEmpty) { - sequenceWriter.close() - sequenceWriter = null - } - lastStream = stream - if (delimitedConfig.useHeader) { - stream.write(header) - } - } - if (Option(sequenceWriter).isEmpty) { - sequenceWriter = writer.writeValues(stream) - } - - val record = element.$record - val schema = record.getSchema - val data = - schema.getFields.asScala - .map { f => - val key = f.name() - (key, record.get(f.name())) - } - .toMap - .asJava - - sequenceWriter.write(data) - () // avoid discarded non-Unit value warning - } + override def encode(element: E, stream: OutputStream): Unit = + encoder.encode(element.$record, stream) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala index a47303a3..2f4aec27 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoder.scala @@ -7,14 +7,11 @@ import org.apache.flink.api.common.serialization.Encoder import org.apache.flink.api.common.typeinfo.TypeInformation import java.io.OutputStream -import java.nio.charset.StandardCharsets /** A JSON lines encoder for events with embedded avro records. * - * @param pretty - * true if you want to indent the json output (default = false) - * @param sortKeys - * true if you want to sort keys in the json output (default = false) + * @param jsonConfig + * json encoder config * @tparam E * the ADT event type that embeds an avro record of type A * @tparam A @@ -25,19 +22,14 @@ import java.nio.charset.StandardCharsets class EmbeddedAvroJsonFileEncoder[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, A <: GenericRecord: TypeInformation, - ADT <: FlinkEvent](pretty: Boolean = false, sortKeys: Boolean = false) + ADT <: FlinkEvent](jsonConfig: JsonConfig = JsonConfig()) extends Encoder[E] with LazyLogging { @transient - lazy val rowEncoder = new JsonRowEncoder[A](pretty, sortKeys) + lazy val encoder = new JsonFileEncoder[A](jsonConfig) override def encode(element: E, stream: OutputStream): Unit = - rowEncoder - .encode(element.$record) - .fold( - t => logger.error(s"failed to json encode $element", t), - s => stream.write(s.getBytes(StandardCharsets.UTF_8)) - ) + encoder.encode(element.$record, stream) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonCodec.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonCodec.scala deleted file mode 100644 index 2c5f1322..00000000 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonCodec.scala +++ /dev/null @@ -1,43 +0,0 @@ -package io.epiphanous.flinkrunner.serde - -import com.fasterxml.jackson.databind._ -import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.databind.module.SimpleModule -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.avro.generic.GenericRecord - -trait JsonCodec { - - def getMapper[E]( - typeClass: Class[E], - pretty: Boolean = false, - sortKeys: Boolean = false): JsonMapper = { - val mapper = JsonMapper - .builder() - .addModule(DefaultScalaModule) - .addModule(new JavaTimeModule) - .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, sortKeys) - .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, sortKeys) - .configure(SerializationFeature.INDENT_OUTPUT, pretty) - .configure( - DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE, - false - ) - (if (classOf[GenericRecord].isAssignableFrom(typeClass)) { - val avroModule = new SimpleModule() - avroModule.addSerializer(new AvroJsonSerializer) - mapper.addModule(avroModule) - } else mapper).build() - } - - def getWriter[E]( - pretty: Boolean = false, - sortKeys: Boolean = false, - typeClass: Class[E]): ObjectWriter = - getMapper(typeClass, pretty, sortKeys).writerFor(typeClass) - - def getReader[E](typeClass: Class[E]): ObjectReader = - getMapper(typeClass).readerFor(typeClass) - -} diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonConfig.scala new file mode 100644 index 00000000..26ffc94b --- /dev/null +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonConfig.scala @@ -0,0 +1,14 @@ +package io.epiphanous.flinkrunner.serde + +/** A Json encoder configuration + * @param pretty + * true to indent the output with whitespace (default false) + * @param sortKeys + * true to lexicographically sort json keys (default false) + * @param endOfLine + * an optional end of line string (defaults to system line separator) + */ +case class JsonConfig( + pretty: Boolean = false, + sortKeys: Boolean = false, + endOfLine: Option[String] = Some(System.lineSeparator())) diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoder.scala index 069d0d53..0f77f45f 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoder.scala @@ -8,23 +8,24 @@ import java.nio.charset.StandardCharsets /** Encoder for writing an element to a json text file output stream * + * @param jsonConfig + * a Json encoder config * @tparam E * the type to encode into the file */ class JsonFileEncoder[E: TypeInformation]( - pretty: Boolean = false, - sortKeys: Boolean = false) + jsonConfig: JsonConfig = JsonConfig()) extends Encoder[E] with LazyLogging { @transient - lazy val rowEncoder = new JsonRowEncoder[E](pretty, sortKeys) + lazy val rowEncoder = new JsonRowEncoder[E](jsonConfig) override def encode(element: E, stream: OutputStream): Unit = { rowEncoder .encode(element) .fold( - t => logger.error(s"failed to json encode $element", t), + t => logger.error(s"failed to json-encode $element", t), s => stream.write(s.getBytes(StandardCharsets.UTF_8)) ) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowDecoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowDecoder.scala index 9e419eba..d7f07f8b 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowDecoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowDecoder.scala @@ -1,19 +1,16 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.databind.ObjectReader import org.apache.flink.api.common.typeinfo.TypeInformation import scala.util.Try -class JsonRowDecoder[E: TypeInformation] - extends RowDecoder[E] - with JsonCodec { +class JsonRowDecoder[E: TypeInformation] extends RowDecoder[E] { @transient - lazy val reader: ObjectReader = getReader( + lazy val codec: Codec[E] = Codec( implicitly[TypeInformation[E]].getTypeClass ) override def decode(line: String): Try[E] = - Try(reader.readValue(line)) + Try(codec.jsonReader.readValue(line)) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoder.scala index e331f1da..6a18d1d1 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoder.scala @@ -1,28 +1,21 @@ package io.epiphanous.flinkrunner.serde -import com.fasterxml.jackson.databind.ObjectWriter import org.apache.flink.api.common.typeinfo.TypeInformation import scala.util.Try class JsonRowEncoder[E: TypeInformation]( - pretty: Boolean = false, - sortKeys: Boolean = false) - extends RowEncoder[E] - with JsonCodec { + jsonConfig: JsonConfig = JsonConfig()) + extends RowEncoder[E] { @transient - val writer: ObjectWriter = - getWriter( - pretty, - sortKeys, - implicitly[TypeInformation[E]].getTypeClass - ) + lazy val codec: Codec[E] = + Codec(implicitly[TypeInformation[E]].getTypeClass, jsonConfig) override def encode(element: E): Try[String] = { Try( - writer.writeValueAsString(element) + System - .lineSeparator() + codec.jsonWriter.writeValueAsString(element) + jsonConfig.endOfLine + .getOrElse("") ) } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala index 902df582..37394593 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala @@ -23,13 +23,15 @@ class JsonSerializationSchema[ extends SerializationSchema[E] with LazyLogging { - val name: String = sinkConfig.name - val pretty: Boolean = - sinkConfig.properties.getProperty("json.pretty", "false").toBoolean - val sortKeys: Boolean = - sinkConfig.properties.getProperty("json.sort.keys", "false").toBoolean - - val jsonRowEncoder = new JsonRowEncoder[E](pretty, sortKeys) + val name: String = sinkConfig.name + val jsonConfig: JsonConfig = JsonConfig( + sinkConfig.properties.getProperty("json.pretty", "false").toBoolean, + sinkConfig.properties.getProperty("json.sort.keys", "false").toBoolean, + Option( + sinkConfig.properties.getProperty("json.eol", System.lineSeparator()) + ).map(s => if (s.equalsIgnoreCase("none")) null else s) + ) + val jsonRowEncoder = new JsonRowEncoder[E](jsonConfig) /** Serialize an event into json-encoded byte array * @param event diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/RowEncoder.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/RowEncoder.scala index b9259dcf..53e3e412 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/RowEncoder.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/RowEncoder.scala @@ -21,8 +21,10 @@ object RowEncoder { format match { case Json => new JsonRowEncoder( - properties.getProperty("pretty", "false").toBoolean, - properties.getProperty("sort.keys", "false").toBoolean + JsonConfig( + properties.getProperty("pretty", "false").toBoolean, + properties.getProperty("sort.keys", "false").toBoolean + ) ) case _ => new DelimitedRowEncoder(DelimitedConfig.get(format, properties)) diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala index 083664f6..8fbdd742 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/MyAvroADT.scala @@ -1,6 +1,6 @@ package io.epiphanous.flinkrunner.model -import io.epiphanous.flinkrunner.serde.DelimitedConfig +import io.epiphanous.flinkrunner.serde.{DelimitedConfig, JsonConfig} import org.apache.avro.generic.GenericRecord import scala.collection.JavaConverters._ @@ -8,8 +8,7 @@ import scala.language.implicitConversions sealed trait MyAvroADT extends FlinkEvent { def toJson( - pretty: Boolean = false, - sortKeys: Boolean = false, + jsonConfig: JsonConfig = JsonConfig(), record: Option[GenericRecord] = None): String } @@ -20,37 +19,37 @@ trait TestSerializers[A <: GenericRecord] { s""""${s.replaceAll("\"", "\\\n")}"""" def toJson( - pretty: Boolean = false, - sortKeys: Boolean = false, + jsonConfig: JsonConfig = JsonConfig(), record: Option[GenericRecord] = None): String = { val rec = record.getOrElse($record) val fields = rec.getSchema.getFields.asScala.toList .map(_.name()) - val sfields = if (sortKeys) fields.sorted else fields + val sfields = if (jsonConfig.sortKeys) fields.sorted else fields sfields .map { name => val value = rec.get(name) match { - case None | null => "null" - case Some(s: String) => _serializeString(s) - case Some(r: GenericRecord) => toJson(pretty, sortKeys, Some(r)) - case Some(v) => v.toString - case seq: Seq[String] => - seq.map(_serializeString).mkString("[", ",", "]") - case seq: Seq[GenericRecord] => + case None | null => "null" + case Some(s: String) => _serializeString(s) + case Some(r: GenericRecord) => toJson(jsonConfig, Some(r)) + case Some(v) => v.toString + case seq: Seq[_] => seq - .map(r => toJson(pretty, sortKeys, Some(r))) + .map { + case s: String => _serializeString(s) + case r: GenericRecord => toJson(jsonConfig, Some(r)) + case s => s.toString + } .mkString("[", ",", "]") - case seq: Seq[_] => seq.map(_.toString).mkString("[", ",", "]") - case s: String => _serializeString(s) - case r: GenericRecord => toJson(pretty, sortKeys, Some(r)) - case v => v.toString + case s: String => _serializeString(s) + case r: GenericRecord => toJson(jsonConfig, Some(r)) + case v => v.toString } - s""""$name":${if (pretty) " " else ""}$value""" + s""""$name":${if (jsonConfig.pretty) " " else ""}$value""" } .mkString( - if (pretty) "{\n " else "{", - if (pretty) ",\n " else ",", - if (pretty) "\n}" else "}" + if (jsonConfig.pretty) "{\n " else "{", + if (jsonConfig.pretty) ",\n " else ",", + if (jsonConfig.pretty) "\n}" else "}" ) } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala index f688686c..dc430432 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala @@ -1,7 +1,5 @@ package io.epiphanous.flinkrunner.model -import com.fasterxml.jackson.databind.annotation.JsonDeserialize - import java.time.Instant sealed trait MySimpleADT extends FlinkEvent @@ -38,7 +36,6 @@ case class SimpleB( id: String, b0: String, b1: Double, - @JsonDeserialize(contentAs = classOf[java.lang.Integer]) b2: Option[Int], ts: Instant) extends MySimpleADT { diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedConfigTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedConfigTest.scala deleted file mode 100644 index 5e0b5c1a..00000000 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedConfigTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -package io.epiphanous.flinkrunner.serde - -import com.fasterxml.jackson.databind.MapperFeature -import com.fasterxml.jackson.dataformat.csv.{CsvMapper, CsvSchema} -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import io.epiphanous.flinkrunner.PropSpec -import io.epiphanous.flinkrunner.model.ARecord - -class DelimitedConfigTest extends PropSpec { - - property("intoSchema empty property") { - val delimitedConfig = - DelimitedConfig.PSV.copy(columns = List("a", "b", "c")) - val csvSchema = delimitedConfig.intoSchema(CsvSchema.emptySchema()) - csvSchema.getColumnDesc shouldEqual """["a","b","c"]""" - } - - property("intoSchema from type class property") { - val mapper = CsvMapper - .builder() - .addModule(DefaultScalaModule) - .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, false) - .build() - - val delimitedConfig = - DelimitedConfig.PSV.copy(columns = List("a", "b", "c", "d")) - - val start = mapper.schemaFor(classOf[ARecord]) - - val csvSchema = delimitedConfig.intoSchema(start) - - csvSchema.getColumnDesc shouldEqual """["a","b","c","d"]""" - } - -} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedEncoderTestUtils.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedEncoderTestUtils.scala new file mode 100644 index 00000000..81d3d8ae --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedEncoderTestUtils.scala @@ -0,0 +1,35 @@ +package io.epiphanous.flinkrunner.serde + +import io.epiphanous.flinkrunner.model.BasePropGenerators +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation +import org.scalacheck.{Arbitrary, Gen} + +trait DelimitedEncoderTestUtils extends BasePropGenerators { + case class DelimitedEncoderTest(a: Int, b: String, c: Option[Double]) { + def serialize: String = s"$a,$b,${c.getOrElse("")}\n" + } + + val genTest: Gen[DelimitedEncoderTest] = for { + a <- Gen.chooseNum[Int](1, 100) + b <- nameGen("test") + c <- Gen.option(Gen.chooseNum(100d, 900d)) + } yield DelimitedEncoderTest(a, b, c) + implicit val arbTest: Arbitrary[DelimitedEncoderTest] = Arbitrary( + genTest + ) + + /** TODO: improve support for testing different delimited configs + */ + def getFileEncoder( + delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) = + new DelimitedFileEncoder[DelimitedEncoderTest](delimitedConfig) + + def getRowEncoder( + delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) = + new DelimitedRowEncoder[DelimitedEncoderTest](delimitedConfig) + + def getTypedRowEncoder[E: TypeInformation]( + delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) = + new DelimitedRowEncoder[E](delimitedConfig) +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoderTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoderTest.scala index 68a5de24..9d6b505a 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoderTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowDecoderTest.scala @@ -15,7 +15,7 @@ class DelimitedRowDecoderTest extends PropSpec { case None => "" case x => x.toString } - .mkString(",") + "\n" + .mkString(",") + System.lineSeparator() val decodedSimpleB = decoder.decode(line) decodedSimpleB.success.value.toString shouldEqual simpleB.toString } diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoderTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoderTest.scala new file mode 100644 index 00000000..0bb8eeff --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/DelimitedRowEncoderTest.scala @@ -0,0 +1,52 @@ +package io.epiphanous.flinkrunner.serde + +import io.epiphanous.flinkrunner.PropSpec +import io.epiphanous.flinkrunner.model.{BRecord, CRecord, SimpleB} +import org.apache.flink.api.scala.createTypeInformation + +class DelimitedRowEncoderTest + extends PropSpec + with DelimitedEncoderTestUtils { + + property("encode property") { + val encoder = getRowEncoder() + forAll { test: DelimitedEncoderTest => + encoder + .encode(test) + .fold( + t => fail(t.getMessage), + _ shouldEqual test.serialize + ) + } + } + + property("encode simpleB") { + val encoder = getTypedRowEncoder[SimpleB]() + forAll { b: SimpleB => + val encoded = encoder.encode(b) + encoded should be a 'success + } + } + + property("encode non-nested avro property") { + def csvLine(b: BRecord): String = + s"${b.b0},${b.b1.getOrElse("")},${b.b2.getOrElse("")},${b.b3.toEpochMilli}${System.lineSeparator()}" + val encoder = getTypedRowEncoder[BRecord]() + forAll { test: BRecord => + encoder + .encode(test) + .fold(t => fail(t.getMessage), _ shouldEqual csvLine(test)) + } + } + + property("fail to encode nested avro property") { + val encoder = new DelimitedRowEncoder[CRecord]() + forAll { test: CRecord => + val encoded = encoder.encode(test) + if (test.bRecord.nonEmpty) + encoded should be a 'failure + else encoded should be a 'success + } + } + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoderTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoderTest.scala index 3dcb4710..5b6e8327 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoderTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonFileEncoderTest.scala @@ -16,16 +16,16 @@ class EmbeddedAvroJsonFileEncoderTest extends PropSpec { def doTest[ E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, A <: GenericRecord: TypeInformation]( - pretty: Boolean = false, - sortKeys: Boolean = false)(implicit arb: Arbitrary[E]) = { + jsonConfig: JsonConfig = JsonConfig())(implicit + arb: Arbitrary[E]) = { val pop = genPop[E]() val encoder = - new EmbeddedAvroJsonFileEncoder[E, A, MyAvroADT](pretty, sortKeys) + new EmbeddedAvroJsonFileEncoder[E, A, MyAvroADT](jsonConfig) val baos = new ByteArrayOutputStream() val lines = ArrayBuffer.empty[String] pop.foreach { w => encoder.encode(w, baos) - lines += w.toJson(pretty, sortKeys) + lines += w.toJson(jsonConfig) } lines += "" val actual = new String(baos.toByteArray, StandardCharsets.UTF_8) diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonCodecTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonCodecTest.scala deleted file mode 100644 index 979ea2d9..00000000 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonCodecTest.scala +++ /dev/null @@ -1,20 +0,0 @@ -package io.epiphanous.flinkrunner.serde - -import io.epiphanous.flinkrunner.PropSpec -import io.epiphanous.flinkrunner.model.{BRecord, CRecord} - -import java.nio.charset.StandardCharsets -import scala.util.Try - -class JsonCodecTest extends PropSpec with JsonCodec { - - property("getMapper property") { - val record = genOne[CRecord] - logger.debug(record.toString) - val output = - Try(getMapper(classOf[CRecord]).writer().writeValueAsString(record)) - println(output) - output isSuccess - } - -} diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala index b141a826..bbd8f42b 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonEncoderTestUtils.scala @@ -20,9 +20,9 @@ trait JsonEncoderTestUtils extends BasePropGenerators { /** TODO: improve support for testing pretty/sortkeys */ - def getFileEncoder(pretty: Boolean = false, sortKeys: Boolean = false) = - new JsonFileEncoder[JsonEncoderTest](pretty, sortKeys) + def getFileEncoder(jsonConfig: JsonConfig = JsonConfig()) = + new JsonFileEncoder[JsonEncoderTest](jsonConfig) - def getRowEncoder(pretty: Boolean = false, sortKeys: Boolean = false) = - new JsonRowEncoder[JsonEncoderTest](pretty, sortKeys) + def getRowEncoder(jsonConfig: JsonConfig = JsonConfig()) = + new JsonRowEncoder[JsonEncoderTest](jsonConfig) } diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoderTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoderTest.scala index fd905e6d..e3ef9a89 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoderTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonFileEncoderTest.scala @@ -17,9 +17,8 @@ class JsonFileEncoderTest extends PropSpec with JsonEncoderTestUtils { lines += test.serialize } val actual = new String(baos.toByteArray, StandardCharsets.UTF_8) - val expected = lines.mkString("", "\n", "\n") -// logger.debug("actual:\n" + actual) -// logger.debug("expected:\n" + expected) + val expected = + lines.mkString("", System.lineSeparator(), System.lineSeparator()) actual shouldEqual expected } } diff --git a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoderTest.scala b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoderTest.scala index 822f9214..ecd52594 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoderTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/serde/JsonRowEncoderTest.scala @@ -11,7 +11,7 @@ class JsonRowEncoderTest extends PropSpec with JsonEncoderTestUtils { .encode(test) .fold( t => fail(t.getMessage), - _ shouldEqual (test.serialize + "\n") + _ shouldEqual (test.serialize + System.lineSeparator()) ) } }