Skip to content

Commit

Permalink
Less code
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 28, 2023
1 parent 572745a commit 8f97d6e
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 40 deletions.
Expand Up @@ -2,17 +2,15 @@ package pl.touk.nussknacker.engine.schemedkafka.encode

import cats.data.ValidatedNel
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.{AvroRuntimeException, Schema}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils
import pl.touk.nussknacker.engine.schemedkafka.schema.{Address, Company, FullNameV1, StringForcingDatumReaderProvider}
import pl.touk.nussknacker.engine.schemedkafka.schema.{Address, Company, FullNameV1}
import pl.touk.nussknacker.test.EitherValuesDetailedMessage

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets
import java.time._
import java.util.UUID
Expand Down Expand Up @@ -308,15 +306,9 @@ class BestEffortAvroEncoderSpec extends AnyFunSpec with Matchers with EitherValu
readRecord
}

private def roundTripWriteRead(givenRecord: GenericData.Record) = {
val bos = new ByteArrayOutputStream()
val encoder = EncoderFactory.get().binaryEncoder(bos, null)
val schema = givenRecord.getSchema
new GenericDatumWriter[GenericRecord](schema, AvroUtils.genericData).write(givenRecord, encoder)
encoder.flush()
val decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(bos.toByteArray), null)
val readRecord = StringForcingDatumReaderProvider.genericDatumReader[GenericRecord](schema, schema, AvroUtils.genericData).read(null, decoder)
readRecord
private def roundTripWriteRead(givenRecord: GenericRecord) = {
val bytes = AvroUtils.serializeContainerToBytesArray(givenRecord)
AvroUtils.deserialize[GenericRecord](bytes, givenRecord.getSchema)
}

}
Expand Up @@ -7,7 +7,7 @@ import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.Conversions.{DecimalConversion, UUIDConversion}
import org.apache.avro.Schema
import org.apache.avro.data.TimeConversions
import org.apache.avro.generic.{GenericContainer, GenericData, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.{GenericContainer, GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DatumReader, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.{SpecificData, SpecificDatumWriter, SpecificRecord}
Expand Down Expand Up @@ -46,12 +46,16 @@ object AvroUtils extends LazyLogging {
}
}

override def createDatumReader(writer: Schema, reader: Schema): DatumReader[_] = StringForcingDatumReaderProvider
.genericDatumReader(writer, reader, this.asInstanceOf[GenericData])
override def createDatumReader(writer: Schema, reader: Schema): DatumReader[_] =
createGenericDatumReader(writer, reader)

override def createDatumReader(schema: Schema): DatumReader[_] = createDatumReader(schema, schema)
})

def createGenericDatumReader[T](writer: Schema, reader: Schema): GenericDatumReader[T] = {
StringForcingDatumReaderProvider.genericDatumReader[T](writer, reader, AvroUtils.genericData)
}

def specificData: SpecificData = addLogicalTypeConversions(new SpecificData(_) {
override def createDatumReader(writer: Schema, reader: Schema): DatumReader[_] = StringForcingDatumReaderProvider
.specificDatumReader(writer, reader, this.asInstanceOf[SpecificData])
Expand Down
Expand Up @@ -6,7 +6,7 @@ import org.apache.avro.io.{DatumReader, DecoderFactory}

import java.nio.ByteBuffer

class RecordDeserializer(decoderFactory: DecoderFactory) {
class AvroRecordDeserializer(decoderFactory: DecoderFactory) {

def deserializeRecord(readerSchema: Schema, reader: DatumReader[AnyRef], buffer: ByteBuffer, bufferDataStart: Int): AnyRef = {
val length = buffer.limit() - bufferDataStart
Expand Down
Expand Up @@ -4,7 +4,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericContainer

trait AvroSchemaEvolution {
def alignRecordToSchema(record: GenericContainer, schema: Schema): Any
def alignRecordToSchema(record: GenericContainer, schema: Schema): GenericContainer
def canBeEvolved(record: GenericContainer, schema: Schema): Boolean
}

Expand Down
@@ -1,8 +1,9 @@
package pl.touk.nussknacker.engine.schemedkafka.schema

import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.Schema
import org.apache.avro.generic._
import org.apache.avro.io.{DatumReader, DecoderFactory}
import org.apache.avro.io.DecoderFactory
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils

import java.io.IOException
Expand All @@ -21,7 +22,7 @@ import scala.util.Try
*/
class DefaultAvroSchemaEvolution extends AvroSchemaEvolution with DatumReaderWriterMixin {

private def recordDeserializer = new RecordDeserializer(DecoderFactory.get())
private def recordDeserializer = new AvroRecordDeserializer(DecoderFactory.get())

override def canBeEvolved(record: GenericContainer, schema: Schema): Boolean =
Try(alignRecordToSchema(record, schema)).isSuccess
Expand All @@ -40,12 +41,16 @@ class DefaultAvroSchemaEvolution extends AvroSchemaEvolution with DatumReaderWri
* It's copy paste from AbstractKafkaAvroDeserializer#DeserializationContext.read with some modification.
* We pass there record buffer data and schema which will be used to convert record.
*/
protected def deserializePayloadToSchema(payload: Array[Byte], writerSchema: Schema, readerSchema: Schema): GenericContainer = {
private def deserializePayloadToSchema(payload: Array[Byte], writerSchema: Schema, readerSchema: Schema): GenericContainer = {
try {
// We always want to create generic record at the end, because speecific can has other fields than expected
val reader = StringForcingDatumReaderProvider.genericDatumReader[AnyRef](writerSchema, readerSchema, AvroUtils.genericData).asInstanceOf[DatumReader[AnyRef]]
// We always want to create generic record at the end, because specific can has other fields than expected
val reader = AvroUtils.createGenericDatumReader[AnyRef](writerSchema, readerSchema)
val buffer = ByteBuffer.wrap(payload)
recordDeserializer.deserializeRecord(readerSchema, reader, buffer, 0).asInstanceOf[GenericContainer]
val data = recordDeserializer.deserializeRecord(readerSchema, reader, buffer, 0)
data match {
case c: GenericContainer => c
case _ => new NonRecordContainer(readerSchema, data)
}
} catch {
case exc@(_: RuntimeException | _: IOException) =>
// avro deserialization may throw IOException, AvroRuntimeException, NullPointerException, etc
Expand Down
Expand Up @@ -2,26 +2,20 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent

import cats.data.Validated
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.{AvroSchema, AvroSchemaProvider, AvroSchemaUtils}
import io.confluent.kafka.schemaregistry.avro.{AvroSchema, AvroSchemaProvider}
import io.confluent.kafka.schemaregistry.client.SchemaMetadata
import io.confluent.kafka.schemaregistry.json.JsonSchema
import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericContainer, GenericDatumWriter}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.avro.specific.{SpecificDatumWriter, SpecificRecord}
import org.apache.avro.generic.GenericContainer
import org.apache.kafka.common.errors.SerializationException
import org.everit.json.schema.{Schema => EveritSchema}
import pl.touk.nussknacker.engine.kafka.SchemaRegistryClientKafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils
import pl.touk.nussknacker.engine.schemedkafka.schema.StringForcingDatumReaderProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{AvroSchemaWithJsonPayload, OpenAPIJsonSchema}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaId, SchemaWithMetadata}

import java.io.{ByteArrayOutputStream, DataOutputStream, OutputStream}
import java.nio.ByteBuffer
import java.util

object ConfluentUtils extends LazyLogging {

Expand Down
Expand Up @@ -87,7 +87,7 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte
}
} else {
val keySchemaOpt = record.keySchemaId.map(schemaRegistryClient.getSchemaById).map(_.schema)
keyOpt.map(keyJson => readKeyMessage(keySchemaOpt, topic, keyJson))
keyOpt.map(keyJson => readRecordKeyMessage(keySchemaOpt, topic, keyJson))
.getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
}
val valueSchemaOpt = record.valueSchemaId.map(schemaRegistryClient.getSchemaById).map(_.schema)
Expand All @@ -98,7 +98,7 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte
record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
}

protected def readKeyMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte]
protected def readRecordKeyMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte]

protected def readValueMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte]

Expand Down
Expand Up @@ -30,7 +30,7 @@ private[schemaregistry] class AvroMessageReader(serializer: Serializer[Any]) {
val reader: DatumReader[AnyRef] = GenericData.get().createDatumReader(schema).asInstanceOf[DatumReader[AnyRef]]
val obj = reader.read(null, decoderFactory.jsonDecoder(schema, jsonString))
if (schema.getType == Type.STRING)
obj.asInstanceOf[Utf8].toString
obj.toString
else
obj
} catch {
Expand Down
Expand Up @@ -46,7 +46,7 @@ class AvroToJsonFormatter[K: ClassTag, V: ClassTag](protected val kafkaConfig: K
override protected def formatMessage(schemaIdOpt: Option[SchemaId], data: Any): Json =
AvroMessageFormatter.asJson(data)

override protected def readKeyMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte] = {
override protected def readRecordKeyMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte] = {
val avroSchema = AvroUtils.extractSchema(schemaOpt.getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id")))
keyMessageReader.readJson(jsonObj, avroSchema, topic)
}
Expand Down
Expand Up @@ -6,7 +6,7 @@ import org.apache.avro.Schema
import org.apache.avro.io.DecoderFactory
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.schemedkafka.schema.{DatumReaderWriterMixin, RecordDeserializer}
import pl.touk.nussknacker.engine.schemedkafka.schema.{DatumReaderWriterMixin, AvroRecordDeserializer}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.jsonpayload.JsonPayloadToAvroConverter
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.GenericRecordSchemaIdSerializationSupport
Expand All @@ -30,7 +30,7 @@ class AvroPayloadDeserializer(useSchemaReflection: Boolean,
decoderFactory: DecoderFactory)
extends DatumReaderWriterMixin with UniversalSchemaPayloadDeserializer {

private val recordDeserializer = new RecordDeserializer(decoderFactory)
private val recordDeserializer = new AvroRecordDeserializer(decoderFactory)

override def deserialize(expectedSchemaData: Option[RuntimeSchemaData[ParsedSchema]], writerSchemaData: RuntimeSchemaData[ParsedSchema], buffer: ByteBuffer, bufferDataStart: Int): AnyRef = {
val avroExpectedSchemaData = expectedSchemaData.asInstanceOf[Option[RuntimeSchemaData[AvroSchema]]]
Expand Down
Expand Up @@ -47,7 +47,7 @@ class UniversalToJsonFormatter[K: ClassTag, V: ClassTag](protected val kafkaConf
support.formatMessage(data)
}

protected def readKeyMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte] = {
protected def readRecordKeyMessage(schemaOpt: Option[ParsedSchema], topic: String, jsonObj: Json): Array[Byte] = {
// We do not support reading AVRO messages without schemaId. So when schema is missing we assume it must be JSON payload.
val support = schemaOpt.map(_.schemaType()).map(recordFormatterSupportDispatcher.forSchemaType).getOrElse(JsonPayloadRecordFormatterSupport)
support.readKeyMessage(topic, schemaOpt, jsonObj)
Expand Down

0 comments on commit 8f97d6e

Please sign in to comment.