Skip to content

Commit

Permalink
Fix for: UniversalSchemaPayloadDeserializer was created for each message
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 1, 2023
1 parent b1d9f7d commit fb7beab
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 33 deletions.
Expand Up @@ -15,7 +15,7 @@ import pl.touk.nussknacker.engine.flink.util.keyed
import pl.touk.nussknacker.engine.flink.util.keyed.KeyedValueMapper
import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PartitionByKeyFlinkKafkaProducer, PreparedKafkaTopic}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupport
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher
import pl.touk.nussknacker.engine.util.KeyedValue

class FlinkKafkaUniversalSink(preparedTopic: PreparedKafkaTopic,
Expand All @@ -30,6 +30,8 @@ class FlinkKafkaUniversalSink(preparedTopic: PreparedKafkaTopic,

type Value = KeyedValue[AnyRef, AnyRef]

private lazy val schemaSupportDispatcher = UniversalSchemaSupportDispatcher(kafkaConfig)

override def registerSink(dataStream: DataStream[ValueWithContext[Value]], flinkNodeContext: FlinkCustomNodeContext): DataStreamSink[_] =
dataStream
.map(new EncodeAvroRecordFunction(flinkNodeContext))
Expand All @@ -56,7 +58,7 @@ class FlinkKafkaUniversalSink(preparedTopic: PreparedKafkaTopic,
override def map(ctx: ValueWithContext[KeyedValue[AnyRef, AnyRef]]): KeyedValue[AnyRef, AnyRef] = {
ctx.value.mapValue { data =>
exceptionHandler.handling(Some(NodeComponentInfo(nodeId, "flinkKafkaAvroSink", ComponentType.Sink)), ctx.context) {
val encode = UniversalSchemaSupport.forSchemaType(schema.getParsedSchema.schemaType()).sinkValueEncoder(schema.getParsedSchema, validationMode)
val encode = schemaSupportDispatcher.forSchemaType(schema.getParsedSchema.schemaType()).sinkValueEncoder(schema.getParsedSchema, validationMode)
encode(data)
}.orNull
}
Expand Down
Expand Up @@ -9,15 +9,15 @@ import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic}
import pl.touk.nussknacker.engine.lite.api.utils.sinks.LazyParamSink
import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupport
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalSchemaSupport, UniversalSchemaSupportDispatcher}
import pl.touk.nussknacker.engine.schemedkafka.sink.UniversalKafkaSinkImplFactory
import pl.touk.nussknacker.engine.util.{KeyedValue, ThreadUtils}

object LiteKafkaUniversalSinkImplFactory extends UniversalKafkaSinkImplFactory {
override def createSink(preparedTopic: PreparedKafkaTopic, keyParam: LazyParameter[AnyRef], valueParam: LazyParameter[AnyRef],
kafkaConfig: KafkaConfig, serializationSchema: KafkaSerializationSchema[KeyedValue[AnyRef, AnyRef]], clientId: String,
schema: RuntimeSchemaData[ParsedSchema], validationMode: ValidationMode): Sink = {
lazy val encode = UniversalSchemaSupport.forSchemaType(schema.schema.schemaType()).sinkValueEncoder(schema.schema,validationMode)
lazy val encode = UniversalSchemaSupportDispatcher(kafkaConfig).forSchemaType(schema.schema.schemaType()).sinkValueEncoder(schema.schema,validationMode)

new LazyParamSink[ProducerRecord[Array[Byte], Array[Byte]]] {
override def prepareResponse(implicit evaluateLazyParameter: LazyParameterInterpreter): LazyParameter[ProducerRecord[Array[Byte], Array[Byte]]] = {
Expand Down
Expand Up @@ -13,6 +13,7 @@ import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator
import pl.touk.nussknacker.engine.kafka.{KafkaComponentsUtils, KafkaConfig, PreparedKafkaTopic}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher

object KafkaUniversalComponentTransformer {
final val SchemaVersionParamName = "Schema version"
Expand Down Expand Up @@ -45,6 +46,8 @@ trait KafkaUniversalComponentTransformer[T] extends SingleInputGenericNodeTransf

protected val kafkaConfig: KafkaConfig = prepareKafkaConfig

protected val schemaSupportDispatcher: UniversalSchemaSupportDispatcher = UniversalSchemaSupportDispatcher(kafkaConfig)

protected def prepareKafkaConfig: KafkaConfig = {
KafkaConfig.parseConfig(processObjectDependencies.config)
}
Expand Down
Expand Up @@ -32,16 +32,16 @@ sealed trait ParsedSchemaSupport[+S <: ParsedSchema] extends UniversalSchemaSupp
}
}

object AvroSchemaSupport extends ParsedSchemaSupport[AvroSchema] {
override def payloadDeserializer(kafkaConfig: KafkaConfig): UniversalSchemaPayloadDeserializer = {
class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[AvroSchema] {
override val payloadDeserializer: UniversalSchemaPayloadDeserializer = {
if (kafkaConfig.avroAsJsonSerialization.contains(true)) {
JsonPayloadDeserializer
} else {
AvroPayloadDeserializer(kafkaConfig)
}
}

override def serializer(schemaOpt: Option[ParsedSchema], client: SchemaRegistryClient, kafkaConfig: KafkaConfig, isKey: Boolean): Serializer[Any] = {
override def serializer(schemaOpt: Option[ParsedSchema], client: SchemaRegistryClient, isKey: Boolean): Serializer[Any] = {
client match {
case confluentClient: ConfluentSchemaRegistryClient if kafkaConfig.avroAsJsonSerialization.contains(true) =>
new ConfluentJsonPayloadKafkaSerializer(kafkaConfig, confluentClient, new DefaultAvroSchemaEvolution, schemaOpt.map(_.cast()), isKey = isKey)
Expand All @@ -66,13 +66,13 @@ object AvroSchemaSupport extends ParsedSchemaSupport[AvroSchema] {
override def validateRawOutput(schema: ParsedSchema, t: TypingResult, mode: ValidationMode): ValidatedNel[OutputValidatorError, Unit] =
new AvroSchemaOutputValidator(mode).validateTypingResultAgainstSchema(t, schema.cast().rawSchema())

override def recordFormatterSupport(kafkaConfig: KafkaConfig, schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport = {
override def recordFormatterSupport(schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport = {
if (kafkaConfig.avroAsJsonSerialization.contains(true)) {
JsonPayloadRecordFormatterSupport
} else {
// We pass None to schema, because message readers should not do schema evolution.
// It is done this way because we want to keep messages in the original format as they were serialized on Kafka
val createSerializer = serializer(None, schemaRegistryClient, kafkaConfig, _)
val createSerializer = serializer(None, schemaRegistryClient, _)
val avroKeySerializer = createSerializer(true)
val avroValueSerializer = createSerializer(false)
new AvroPayloadRecordFormatterSupport(new AvroMessageReader(avroKeySerializer), new AvroMessageReader(avroValueSerializer))
Expand All @@ -82,9 +82,9 @@ object AvroSchemaSupport extends ParsedSchemaSupport[AvroSchema] {


object JsonSchemaSupport extends ParsedSchemaSupport[OpenAPIJsonSchema] {
override def payloadDeserializer(k: KafkaConfig): UniversalSchemaPayloadDeserializer = JsonSchemaPayloadDeserializer
override val payloadDeserializer: UniversalSchemaPayloadDeserializer = JsonSchemaPayloadDeserializer

override def serializer(schemaOpt: Option[ParsedSchema], c: SchemaRegistryClient, k: KafkaConfig, isKey: Boolean): Serializer[Any] =
override def serializer(schemaOpt: Option[ParsedSchema], c: SchemaRegistryClient, isKey: Boolean): Serializer[Any] =
(topic: String, data: Any) => data match {
case j: Json => j.noSpaces.getBytes()
case _ => throw new SerializationException(s"Expecting json but got: $data")
Expand All @@ -102,6 +102,6 @@ object JsonSchemaSupport extends ParsedSchemaSupport[OpenAPIJsonSchema] {
override def validateRawOutput(schema: ParsedSchema, t: TypingResult, mode: ValidationMode): ValidatedNel[OutputValidatorError, Unit] =
new JsonSchemaOutputValidator(mode).validateTypingResultAgainstSchema(t, schema.cast().rawSchema())

override def recordFormatterSupport(kafkaConfig: KafkaConfig, schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport =
override def recordFormatterSupport(schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport =
JsonPayloadRecordFormatterSupport
}
Expand Up @@ -7,14 +7,15 @@ import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.{AvroMessageFormatter, AvroMessageReader}
import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder
import pl.touk.nussknacker.engine.util.Implicits._

import java.nio.charset.StandardCharsets

class RecordFormatterSupportDispatcher(kafkaConfig: KafkaConfig, schemaRegistryClient: SchemaRegistryClient) {

private val supportBySchemaType = UniversalSchemaSupport.supportedSchemaTypes.toList.map { schemaType =>
schemaType -> UniversalSchemaSupport.forSchemaType(schemaType).recordFormatterSupport(kafkaConfig, schemaRegistryClient)
}.toMap
private val supportBySchemaType =
UniversalSchemaSupportDispatcher(kafkaConfig).supportBySchemaType
.mapValuesNow(_.recordFormatterSupport(schemaRegistryClient))

def forSchemaType(schemaType: String): RecordFormatterSupport =
supportBySchemaType.getOrElse(schemaType, throw new UnsupportedSchemaType(schemaType))
Expand Down
Expand Up @@ -17,6 +17,8 @@ class UniversalKafkaDeserializer[T](schemaRegistryClient: SchemaRegistryClient,
readerSchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
isKey: Boolean) extends Deserializer[T] {

private val schemaSupportDispatcher = UniversalSchemaSupportDispatcher(kafkaConfig)

override def deserialize(topic: String, data: Array[Byte]): T = {
throw new IllegalAccessException(s"Operation not supported. ${this.getClass.getSimpleName} requires kafka headers to perform deserialization.")
}
Expand All @@ -34,8 +36,8 @@ class UniversalKafkaDeserializer[T](schemaRegistryClient: SchemaRegistryClient,

val writerSchemaData = new RuntimeSchemaData(new NkSerializableParsedSchema[ParsedSchema](writerSchema), Some(writerSchemaId.value))

UniversalSchemaSupport.forSchemaType(writerSchema.schemaType())
.payloadDeserializer(kafkaConfig)
schemaSupportDispatcher.forSchemaType(writerSchema.schemaType())
.payloadDeserializer
.deserialize(readerSchemaDataOpt, writerSchemaData, writerSchemaId.buffer)
.asInstanceOf[T]
}
Expand Down
Expand Up @@ -14,25 +14,30 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClie
import pl.touk.nussknacker.engine.util.output.OutputValidatorError
import pl.touk.nussknacker.engine.util.sinkvalue.SinkValueData.SinkValueParameter

object UniversalSchemaSupport {
val supportedSchemaTypes: Set[String] = Set(AvroSchema.TYPE, JsonSchema.TYPE)
class UniversalSchemaSupportDispatcher private(kafkaConfig: KafkaConfig) {

def forSchemaType(schemaType: String): UniversalSchemaSupport = schemaType match {
case AvroSchema.TYPE => AvroSchemaSupport
case JsonSchema.TYPE => JsonSchemaSupport
case _ => throw new UnsupportedSchemaType(schemaType)
}
val supportBySchemaType: Map[String, UniversalSchemaSupport] =
Map(
AvroSchema.TYPE -> new AvroSchemaSupport(kafkaConfig),
JsonSchema.TYPE -> JsonSchemaSupport)

def forSchemaType(schemaType: String): UniversalSchemaSupport =
supportBySchemaType.getOrElse(schemaType, throw new UnsupportedSchemaType(schemaType))

}

object UniversalSchemaSupportDispatcher {
def apply(kafkaConfig: KafkaConfig): UniversalSchemaSupportDispatcher = new UniversalSchemaSupportDispatcher(kafkaConfig)
}

trait UniversalSchemaSupport {
def payloadDeserializer(k: KafkaConfig): UniversalSchemaPayloadDeserializer
def serializer(schemaOpt: Option[ParsedSchema], c: SchemaRegistryClient, k: KafkaConfig, isKey: Boolean): Serializer[Any]
def payloadDeserializer: UniversalSchemaPayloadDeserializer
def serializer(schemaOpt: Option[ParsedSchema], c: SchemaRegistryClient, isKey: Boolean): Serializer[Any]
def typeDefinition(schema: ParsedSchema): TypingResult
def extractSinkValueParameter(schema: ParsedSchema)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SinkValueParameter]
def sinkValueEncoder(schema: ParsedSchema, mode: ValidationMode): Any => AnyRef
def validateRawOutput(schema: ParsedSchema, t: TypingResult, mode: ValidationMode): ValidatedNel[OutputValidatorError, Unit]
def recordFormatterSupport(kafkaConfig: KafkaConfig, schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport
def recordFormatterSupport(schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport
}

class UnsupportedSchemaType(schemaType: String) extends IllegalArgumentException(s"Unsupported schema type: $schemaType")
Expand Up @@ -14,7 +14,7 @@ object UniversalSerializerFactory extends SchemaRegistryBasedSerializerFactory {
schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
isKey: Boolean): Serializer[Any] = {
val schema: ParsedSchema = schemaDataOpt.map(_.schema).getOrElse(throw new IllegalArgumentException("SchemaData should be defined for universal serializer"))
UniversalSchemaSupport.forSchemaType(schema.schemaType()).serializer(Some(schema), schemaRegistryClient, kafkaConfig, isKey = false)
UniversalSchemaSupportDispatcher(kafkaConfig).forSchemaType(schema.schemaType()).serializer(Some(schema), schemaRegistryClient, isKey = false)
}

}
Expand Up @@ -10,7 +10,6 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Sink,
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData, NodeId}
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupport
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.UniversalKafkaSinkFactory.TransformationState
import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData, SchemaDeterminerErrorHandler}
Expand Down Expand Up @@ -67,7 +66,7 @@ class UniversalKafkaSinkFactory(val schemaRegistryClientFactory: SchemaRegistryC
.leftMap(_.map(e => CustomNodeError(nodeId.id, e.getMessage, None)))
.map(_ => schema)
}.andThen { schema =>
UniversalSchemaSupport.forSchemaType(schema.schemaType())
schemaSupportDispatcher.forSchemaType(schema.schemaType())
.validateRawOutput(schema, value.returnType, extractValidationMode(mode))
.leftMap(outputValidatorErrorsConverter.convertValidationErrors)
.leftMap(NonEmptyList.one)
Expand All @@ -94,7 +93,7 @@ class UniversalKafkaSinkFactory(val schemaRegistryClientFactory: SchemaRegistryC
.leftMap(_.map(e => CustomNodeError(nodeId.id, e.getMessage, None)))
}
validatedSchema.andThen { schemaData =>
UniversalSchemaSupport.forSchemaType(schemaData.schema.schemaType())
schemaSupportDispatcher.forSchemaType(schemaData.schema.schemaType())
.extractSinkValueParameter(schemaData.schema)
.map { valueParam =>
val state = TransformationState(schemaData, valueParam)
Expand Down
Expand Up @@ -16,7 +16,6 @@ import pl.touk.nussknacker.engine.kafka.source.KafkaContextInitializer
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.KafkaSourceImplFactory
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.SchemaVersionParamName
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupport
import pl.touk.nussknacker.engine.schemedkafka.source.UniversalKafkaSourceFactory.UniversalKafkaSourceFactoryState
import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData}

Expand Down Expand Up @@ -61,7 +60,7 @@ class UniversalKafkaSourceFactory[K: ClassTag, V: ClassTag](val schemaRegistryCl
Validated[ProcessCompilationError, (Option[RuntimeSchemaData[ParsedSchema]], TypingResult)] = {
schemaDeterminer.determineSchemaUsedInTyping.map { schemaData =>
val schema = schemaData.schema
(Some(schemaData), UniversalSchemaSupport.forSchemaType(schema.schemaType()).typeDefinition(schema))
(Some(schemaData), schemaSupportDispatcher.forSchemaType(schema.schemaType()).typeDefinition(schema))
}.leftMap(error => CustomNodeError(error.getMessage, paramName))
}

Expand Down

0 comments on commit fb7beab

Please sign in to comment.