Skip to content

Commit

Permalink
refactor/simplify delimited/json encoders/decoders
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 11, 2022
1 parent cfd3862 commit 2a22404
Show file tree
Hide file tree
Showing 28 changed files with 336 additions and 381 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Expand Up @@ -36,7 +36,7 @@ val V = new {
val scalaTestPlus = "3.2.13.0"
val scalaCheck = "1.16.0"
val testContainersScala = "0.40.10"
val jackson = "2.13.3"
val jackson = "2.13.4"
val circe = "0.14.2"
val http4s = "0.23.12"
val enumeratum = "1.7.0"
Expand All @@ -45,7 +45,7 @@ val V = new {
val squants = "1.8.3"
val confluentAvroSerde = "7.1.1"
val parquet = "1.12.3"
val awsSdk = "1.12.290"
val awsSdk = "1.12.296"
val jdbcMysql = "8.0.30"
val jdbcPg = "42.5.0"
val jdbcMssql = "11.2.0.jre11"
Expand Down Expand Up @@ -121,7 +121,6 @@ val otherDeps = Seq(
// "com.github.pjfanning" %% "jackson-scala-reflect-extensions" % V.jackson,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-avro" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
Expand Down
Expand Up @@ -23,6 +23,7 @@ class AvroJsonSerializer
record: GenericRecord,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
// logger.debug(s"serializing avro record: $record")
gen.writeStartObject()
record.getSchema.getFields.asScala.foreach { f =>
_serializeAvroValue(
Expand Down Expand Up @@ -72,7 +73,7 @@ class AvroJsonSerializer
schema.getTypes.asScala.filterNot(s => s.getType == NULL)
if (nonNullTypes.size > 1) {
throw new RuntimeException(
s"field $name of type union has more than one non-null types: $nonNullTypes"
s"field $name of type union has more than one non-null type: $nonNullTypes"
)
}
_serializeAvroValue(
Expand Down
121 changes: 121 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/serde/Codec.scala
@@ -0,0 +1,121 @@
package io.epiphanous.flinkrunner.serde

import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.dataformat.csv.{
CsvGenerator,
CsvMapper,
CsvParser,
CsvSchema
}
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.avro.generic.GenericRecord

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._

case class Codec[E](
typeClass: Class[E],
jsonConfig: JsonConfig = JsonConfig(),
delimitedConfig: DelimitedConfig = DelimitedConfig.CSV) {

lazy val isAvro: Boolean =
classOf[GenericRecord].isAssignableFrom(typeClass)

lazy val avroModule: SimpleModule =
new SimpleModule().addSerializer(new AvroJsonSerializer)

lazy val jsonMapper: JsonMapper = {
val mapper = JsonMapper
.builder()
.addModule(DefaultScalaModule)
.addModule(new JavaTimeModule)
.configure(
MapperFeature.SORT_PROPERTIES_ALPHABETICALLY,
jsonConfig.sortKeys
)
.configure(
SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS,
jsonConfig.sortKeys
)
.configure(SerializationFeature.INDENT_OUTPUT, jsonConfig.pretty)
.configure(
DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE,
false
)
(if (isAvro) mapper.addModule(avroModule) else mapper).build()
}

lazy val jsonWriter: ObjectWriter = jsonMapper.writerFor(typeClass)

lazy val jsonReader: ObjectReader = jsonMapper.readerFor(typeClass)

lazy val csvMapper: CsvMapper = {
val builder = CsvMapper
.builder()
.addModule(DefaultScalaModule)
.addModule(new JavaTimeModule)
.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, false)
.configure(CsvGenerator.Feature.ALWAYS_QUOTE_STRINGS, false)
.configure(CsvParser.Feature.TRIM_SPACES, true)
.configure(CsvParser.Feature.SKIP_EMPTY_LINES, true)
.configure(CsvParser.Feature.ALLOW_COMMENTS, true)
.configure(CsvParser.Feature.EMPTY_STRING_AS_NULL, true)
.configure(
DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE,
false
)
(if (isAvro) builder.addModule(avroModule) else builder).build()
}

lazy val csvSchema: CsvSchema = {
val start = csvMapper.schemaFor(typeClass)
val updatedWithConfig = (if (isAvro) {
val columns = start
.iterator()
.asScala
.toList
.filterNot(c =>
c.hasName("schema") || c.hasName(
"specificData"
)
)
.asJava
start
.withoutColumns()
.rebuild()
.addColumns(columns)
.build()
} else start)
.withColumnSeparator(delimitedConfig.columnSeparator)
.withLineSeparator(delimitedConfig.lineSeparator)
.withEscapeChar(delimitedConfig.escapeChar)
.withUseHeader(false) // delimited header use handled in encoder
if (delimitedConfig.useQuotes)
updatedWithConfig.withQuoteChar(delimitedConfig.quoteCharacter)
else updatedWithConfig.withoutQuoteChar()
}

lazy val csvHeader: Array[Byte] = csvSchema
.iterator()
.asScala
.map(_.getName)
.toList
.mkString(
"",
delimitedConfig.columnSeparator.toString,
delimitedConfig.lineSeparator
)
.getBytes(StandardCharsets.UTF_8)

def maybeWriteHeader(stream: OutputStream): Unit =
if (delimitedConfig.useHeader) stream.write(csvHeader)

lazy val csvWriter: ObjectWriter = csvMapper.writer(csvSchema)

lazy val csvReader: ObjectReader =
csvMapper.readerFor(typeClass).`with`(csvSchema)
}

This file was deleted.

@@ -1,6 +1,5 @@
package io.epiphanous.flinkrunner.serde

import com.fasterxml.jackson.dataformat.csv.CsvSchema
import io.epiphanous.flinkrunner.model.StreamFormatName

import java.util.Properties
Expand Down Expand Up @@ -28,43 +27,12 @@ case class DelimitedConfig(
useHeader: Boolean = false,
useQuotes: Boolean = false,
columns: List[String] = List.empty
) {

/** Creates a new CsvSchema object based on the DelimitedConfig settings.
*
* Note: If DelimitedConfig has a non-empty column list, any existing
* columns in the start schema will be replaced.
*
* @param start
* a starting schema that we apply our settings into
* @return
* updated CsvSchema
*/
def intoSchema(start: CsvSchema): CsvSchema = {
val csvSchema = {
val s = start
.withColumnSeparator(columnSeparator)
.withLineSeparator(lineSeparator)
.withEscapeChar(escapeChar)
.withUseHeader(useHeader)
if (useQuotes) s.withQuoteChar(quoteCharacter)
else s.withoutQuoteChar()
}
if (columns.isEmpty) csvSchema
else {
columns
.foldLeft(csvSchema.withoutColumns().rebuild())((b, f) =>
b.addColumn(f)
)
.build()
}
}
}
)

object DelimitedConfig {
val CSV = DelimitedConfig()
val TSV = DelimitedConfig('\t')
val PSV = DelimitedConfig('|')
val CSV: DelimitedConfig = DelimitedConfig()
val TSV: DelimitedConfig = DelimitedConfig('\t')
val PSV: DelimitedConfig = DelimitedConfig('|')

/** Produces a DelimitedConfig based on the request StreamFormatName,
* properties and columns passed in.
Expand Down
@@ -1,5 +1,5 @@
package io.epiphanous.flinkrunner.serde
import com.fasterxml.jackson.databind.ObjectWriter
import com.nimbusds.jose.util.StandardCharset
import org.apache.flink.api.common.serialization.Encoder
import org.apache.flink.api.common.typeinfo.TypeInformation

Expand All @@ -14,30 +14,29 @@ import java.io.OutputStream
*/
class DelimitedFileEncoder[E: TypeInformation](
delimitedConfig: DelimitedConfig = DelimitedConfig.CSV)
extends Encoder[E]
with DelimitedCodec {
extends Encoder[E] {

@transient
lazy val typeClass: Class[E] =
implicitly[TypeInformation[E]].getTypeClass

@transient
lazy val header: Array[Byte] = getHeader(delimitedConfig, typeClass)

@transient
lazy val writer: ObjectWriter =
getWriter(delimitedConfig, typeClass)
lazy val encoder: DelimitedRowEncoder[E] =
new DelimitedRowEncoder[E](delimitedConfig)

@transient
var out: OutputStream = _

override def encode(element: E, stream: OutputStream): Unit = {
if (delimitedConfig.useHeader) {
if (out != stream) {
out = stream
stream.write(header)
}
if (out != stream) {
out = stream
encoder.codec.maybeWriteHeader(stream)
}
stream.write(writer.writeValueAsBytes(element))
encoder
.encode(element)
.fold(
err =>
throw new RuntimeException(
s"failed to delimited-encode $element",
err
),
line => stream.write(line.getBytes(StandardCharset.UTF_8))
)
}
}
@@ -1,20 +1,28 @@
package io.epiphanous.flinkrunner.serde

import com.fasterxml.jackson.databind.ObjectReader
import org.apache.flink.api.common.typeinfo.TypeInformation

import scala.util.Try

/** Decode a delimited text line into an instance of the requested class.
*
* @param delimitedConfig
* a delimited codec config (defaults to csv)
* @tparam E
* the type to decode into
*/
class DelimitedRowDecoder[E: TypeInformation](
delimitedConfig: DelimitedConfig =
DelimitedConfig.CSV.copy(useHeader = false))
extends RowDecoder[E]
with DelimitedCodec {
extends RowDecoder[E] {

@transient
lazy val reader: ObjectReader =
getReader(delimitedConfig, implicitly[TypeInformation[E]].getTypeClass)
lazy val codec: Codec[E] = Codec(
implicitly[TypeInformation[E]].getTypeClass,
delimitedConfig = delimitedConfig
)

override def decode(line: String): Try[E] =
Try(reader.readValue[E](line))
Try(codec.csvReader.readValue[E](line))

}

0 comments on commit 2a22404

Please sign in to comment.