Skip to content

Commit

Permalink
Kafka universal source/sink unconfluentization
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 25, 2023
1 parent 749741a commit 26875f1
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 36 deletions.
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.schemedkafka

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.avro.{AvroSchema, AvroSchemaUtils}
import io.confluent.kafka.serializers.NonRecordContainer
import org.apache.avro.Conversions.{DecimalConversion, UUIDConversion}
import org.apache.avro.Schema
Expand Down Expand Up @@ -97,6 +97,12 @@ object AvroUtils extends LazyLogging {
def nonRestrictiveParseSchema(avroSchema: String): Schema =
parserNotValidatingDefaults.parse(avroSchema)


// This method use Confluent's class under the hood but it hasn't got any Confluent specific logic
def getSchema(obj: Any): Schema = {
AvroSchemaUtils.getSchema(obj)
}

// Copy from LogicalTypesAvroFactory
def extractAvroSpecificSchema(clazz: Class[_]): Schema = {
tryExtractAvroSchemaViaInstance(clazz).getOrElse(specificData.getSchema(clazz))
Expand Down
Expand Up @@ -10,12 +10,12 @@ import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder
import java.nio.charset.StandardCharsets

trait RecordFormatterSupport {
def formatMessage(client: SchemaRegistryClient, data: Any): Json
def formatMessage(data: Any): Json
def readMessage(client: SchemaRegistryClient, subject: String, schemaOpt: Option[ParsedSchema], jsonObj: Json): Array[Byte]
}

object JsonPayloadRecordFormatterSupport extends RecordFormatterSupport {
override def formatMessage(client: SchemaRegistryClient, data: Any): Json = BestEffortJsonEncoder(failOnUnkown = false, classLoader = getClass.getClassLoader).encode(data)
override def formatMessage(data: Any): Json = BestEffortJsonEncoder(failOnUnkown = false, classLoader = getClass.getClassLoader).encode(data)

override def readMessage(client: SchemaRegistryClient, subject: String, schemaOpt: Option[ParsedSchema], jsonObj: Json): Array[Byte] = jsonObj match {
// we handle strings this way because we want to keep result value compact and JString is formatted in quotes
Expand All @@ -26,7 +26,7 @@ object JsonPayloadRecordFormatterSupport extends RecordFormatterSupport {

object AvroPayloadRecordFromatterSupport extends RecordFormatterSupport {

override def formatMessage(client: SchemaRegistryClient, data: Any): Json = new ConfluentAvroMessageFormatter(client).asJson(data)
override def formatMessage(data: Any): Json = ConfluentAvroMessageFormatter.asJson(data)

override def readMessage(client: SchemaRegistryClient, subject: String, schemaOpt: Option[ParsedSchema], jsonObj: Json): Array[Byte] =
new ConfluentAvroMessageReader(client)
Expand Down
@@ -1,28 +1,21 @@
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.formatter

import io.circe.Json
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.io.EncoderFactory
import org.apache.kafka.common.errors.SerializationException
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils
import pl.touk.nussknacker.engine.schemedkafka.schema.DatumReaderWriterMixin

import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

/**
* @param schemaRegistryClient schema registry client
*/
private[confluent] class ConfluentAvroMessageFormatter(schemaRegistryClient: SchemaRegistryClient) extends AbstractKafkaAvroDeserializer with DatumReaderWriterMixin {
private[schemaregistry] object ConfluentAvroMessageFormatter extends DatumReaderWriterMixin {

private val encoderFactory = EncoderFactory.get

schemaRegistry = schemaRegistryClient

def asJson(obj: Any): Json = {
val schema = AvroSchemaUtils.getSchema(obj)
val schema = AvroUtils.getSchema(obj)
val bos = new ByteArrayOutputStream()
val output = new PrintStream(bos, true, StandardCharsets.UTF_8.toString)

Expand Down
Expand Up @@ -26,10 +26,9 @@ class ConfluentAvroToJsonFormatterFactory(schemaRegistryClientFactory: Confluent
override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig, kafkaSourceDeserializationSchema: serialization.KafkaDeserializationSchema[ConsumerRecord[K, V]]): RecordFormatter = {

val schemaRegistryClient = schemaRegistryClientFactory.create(kafkaConfig)
val messageFormatter = new ConfluentAvroMessageFormatter(schemaRegistryClient.client)
val messageReader = new ConfluentAvroMessageReader(schemaRegistryClient.client)

new ConfluentAvroToJsonFormatter(kafkaConfig, schemaRegistryClient.client, messageFormatter, messageReader, kafkaSourceDeserializationSchema)
new ConfluentAvroToJsonFormatter(kafkaConfig, schemaRegistryClient.client, messageReader, kafkaSourceDeserializationSchema)
}

}
Expand All @@ -43,7 +42,6 @@ class ConfluentAvroToJsonFormatterFactory(schemaRegistryClientFactory: Confluent
*/
class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
schemaRegistryClient: SchemaRegistryClient,
messageFormatter: ConfluentAvroMessageFormatter,
messageReader: ConfluentAvroMessageReader,
deserializationSchema: serialization.KafkaDeserializationSchema[ConsumerRecord[K, V]]
) extends RecordFormatter {
Expand Down Expand Up @@ -93,28 +91,20 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC
record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
}

protected def createKeyEncoder(messageFormatter: ConfluentAvroMessageFormatter): Encoder[K] = {
new Encoder[K] {
override def apply(key: K): Json = key match {
case str: String => Json.fromString(str)
case _ => messageFormatter.asJson(key) // generic or specific record
}
implicit protected val serializableRecordDecoder: Decoder[SerializableConsumerRecord[Json, Json]] = deriveConfiguredDecoder
protected val consumerRecordDecoder: Decoder[AvroSerializableConsumerRecord[Json, Json]] = deriveConfiguredDecoder

implicit protected val keyEncoder: Encoder[K] = new Encoder[K] {
override def apply(key: K): Json = key match {
case str: String => Json.fromString(str)
case _ => ConfluentAvroMessageFormatter.asJson(key) // generic or specific record
}
}

protected def createValueEncoder(messageFormatter: ConfluentAvroMessageFormatter): Encoder[V] = {
new Encoder[V] {
override def apply(value: V): Json = {
messageFormatter.asJson(value)
}
implicit protected val valueEncoder: Encoder[V] = new Encoder[V] {
override def apply(value: V): Json = {
ConfluentAvroMessageFormatter.asJson(value)
}
}

implicit protected val serializableRecordDecoder: Decoder[SerializableConsumerRecord[Json, Json]] = deriveConfiguredDecoder
protected val consumerRecordDecoder: Decoder[AvroSerializableConsumerRecord[Json, Json]] = deriveConfiguredDecoder

implicit protected val keyEncoder: Encoder[K] = createKeyEncoder(messageFormatter)
implicit protected val valueEncoder: Encoder[V] = createValueEncoder(messageFormatter)
implicit protected val serializableRecordEncoder: Encoder[SerializableConsumerRecord[K, V]] = deriveConfiguredEncoder
protected val consumerRecordEncoder: Encoder[AvroSerializableConsumerRecord[K, V]] = deriveConfiguredEncoder

Expand Down
Expand Up @@ -46,7 +46,7 @@ class UniversalToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfi
private def formatMessage(schemaOpt: Option[ParsedSchema], data: Any) = {
// We do not support formatting AVRO messages without schemaId to json. So when schema is missing we assume it must be JSON payload.
val support = schemaOpt.map(_.schemaType()).map(UniversalSchemaSupport.forSchemaType).map(_.recordFormatterSupport).getOrElse(JsonPayloadRecordFormatterSupport)
support.formatMessage(schemaRegistryClient.client, data)
support.formatMessage(data)
}

private def readMessage(schemaOpt: Option[ParsedSchema], subject: String, jsonObj: Json) = {
Expand Down

0 comments on commit 26875f1

Please sign in to comment.