Skip to content

Commit

Permalink
Kafka component utils unconfluentization:
Browse files Browse the repository at this point in the history
- Using confluent abstractions only when needed
- Non-confluent classes renamed and moved to desired packages
- Removed some duplication of code in: universal record fromatter implementation and AvroSchemaEvolution
- Extracted new class: SchemaIdFromMessageExtractor to make Confluent logic explicit and moved to top level
- Extracted SchemaValidator to make Confluent logic explicit and be able to compose
- Some renames: ConsumerRecordUtils -> KafkaRecordUtils
- RecordDeserializer -> AvroRecordDeserializer (also inheritance replaced by composition)
- (De)SerializerFactory - easier abstractions
- Performance fix: UniversalKafkaSource now produce GenericRecordWithSchemaId
- AvroSerializerRegistrar fixes: wasn't checked if component is a kafka component, bad path was used (config.kafka instead of kafka)
- ConfluentSchemaRegistryFactory is not necessary now - removed
- ConfluentSchemaBasedSerdeProvider.universal replaced by UniversalSchemaBasedSerdeProvider.create
  • Loading branch information
arkadius committed Jan 29, 2023
1 parent eef797f commit 8b191e5
Show file tree
Hide file tree
Showing 97 changed files with 1,309 additions and 1,164 deletions.
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -732,6 +732,7 @@ lazy val schemedKafkaComponentsUtils = (project in utils("schemed-kafka-componen
ExclusionRule("org.slf4j", "slf4j-log4j12")
),
"tech.allegro.schema.json2avro" % "converter" % "0.2.15",
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"org.scalatest" %% "scalatest" % scalaTestV % "test"
)
}
Expand Down
4 changes: 3 additions & 1 deletion docs/Changelog.md
Expand Up @@ -15,7 +15,9 @@
* [#3916](https://github.com/TouK/nussknacker/pull/3916) `environmentAlert.cssClass` setting renamed to `environmentAlert.color`
* [#3922](https://github.com/TouK/nussknacker/pull/3922) Bumps: jwks 0.19.0 -> 0.21.3, jackson: 2.11.3 -> 2.13.4
* [#3958](https://github.com/TouK/nussknacker/pull/3958) OpenAPI: specify Content-Type header based on schema

* [#3948](https://github.com/TouK/nussknacker/pull/3948)
* Performance fix: `kafka` source on Flink engine doesn't serialize schema during record serialization
* Configuration handling fixes: `avroKryoGenericRecordSchemaIdSerialization` wasn't checked properly

1.7.0 (19 Dec 2022)
------------------------
Expand Down
12 changes: 12 additions & 0 deletions docs/MigrationGuide.md
Expand Up @@ -22,6 +22,18 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#3929](https://github.com/TouK/nussknacker/pull/3929) From now, `SchemaId` value class is used in every place
where schema id was represented as an Int. For conversion between `SchemaId` and `Int` use `SchemaId.fromInt` and `SchemaId.asInt`.
Use `ConfluentUtils.toSchemaWithMetadata` instead of `SchemaWithMetadata.apply` for conversion between Confluent's `SchemaMetadata` and ours `SchemaWithMetadata`.
* [#3948](https://github.com/TouK/nussknacker/pull/3948) Now, we are less dependent from Confluent schema registry.
To make it possible, some kafka universal/avro components refactors were done. Most important changes in public API:
* ConfluentSchemaBasedSerdeProvider.universal was replaced by UniversalSchemaBasedSerdeProvider.create

Some other, internal changes:
* Non-confluent classes renamed and moved to desired packages
* Extracted new class: SchemaIdFromMessageExtractor to make Confluent logic explicit and moved to top level
* Extracted SchemaValidator to make Confluent logic explicit and be able to compose
* Some renames: ConsumerRecordUtils -> KafkaRecordUtils
* RecordDeserializer -> AvroRecordDeserializer (also inheritance replaced by composition)
* (De)SerializerFactory - easier abstractions
* ConfluentSchemaRegistryFactory is not necessary now - removed

## In version 1.7.0

Expand Down
Expand Up @@ -5,20 +5,22 @@ import com.typesafe.config.ConfigValueFactory.fromAnyRef
import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, toFicusConfig}
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.generic.sinks.GenericKafkaJsonSinkFactory
import pl.touk.nussknacker.engine.kafka.generic.sources.{GenericJsonSourceFactory, GenericTypedJsonSourceFactory}
import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.CachedConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.util.config.DocsConfig

class FlinkKafkaComponentProvider extends ComponentProvider {

protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory
protected def schemaRegistryClientFactory: SchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory

override def providerName: String = "kafka"

Expand All @@ -35,7 +37,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider {

val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)
val jsonPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory)
val universalSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(schemaRegistryClientFactory)
val universalSerdeProvider = UniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory)

lazy val lowLevelKafkaComponents = List(
ComponentDefinition("kafka-json", new GenericKafkaJsonSinkFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo),
Expand Down
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.kafka.consumerrecord.{ConsumerRecordDeserializ
import pl.touk.nussknacker.engine.kafka.serialization.schemas.BaseSimpleSerializationSchema
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryMixin._
import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, KafkaConfig, KafkaSpec, serialization}
import pl.touk.nussknacker.engine.kafka.{KafkaRecordUtils, KafkaConfig, KafkaSpec, serialization}
import pl.touk.nussknacker.engine.util.namespaces.ObjectNamingProvider
import pl.touk.nussknacker.test.PatientScalaFutures

Expand All @@ -31,7 +31,7 @@ trait KafkaSourceFactoryMixin extends AnyFunSuite with Matchers with KafkaSpec w
val sampleValue: SampleValue = SampleValue("first", "last")
val sampleKey: SampleKey = SampleKey("one", 2L)
val sampleHeadersMap: Map[String, String] = Map("headerOne" -> "valueOfHeaderOne", "headerTwo" -> null)
val sampleHeaders: Headers = ConsumerRecordUtils.toHeaders(sampleHeadersMap)
val sampleHeaders: Headers = KafkaRecordUtils.toHeaders(sampleHeadersMap)

val sampleTopic = "topic"
val constTimestamp: Long = 123L
Expand All @@ -43,7 +43,7 @@ trait KafkaSourceFactoryMixin extends AnyFunSuite with Matchers with KafkaSpec w
topic,
(obj: ObjToSerialize) => Option(obj.value).map(v => implicitly[Encoder[SampleValue]].apply(v).noSpaces).orNull,
(obj: ObjToSerialize) => Option(obj.key).map(k => implicitly[Encoder[SampleKey]].apply(k).noSpaces).orNull,
(obj: ObjToSerialize) => ConsumerRecordUtils.toHeaders(obj.headers)
(obj: ObjToSerialize) => KafkaRecordUtils.toHeaders(obj.headers)
).asInstanceOf[serialization.KafkaSerializationSchema[Any]]

protected def createTopic(name: String, partitions: Int = 1): String = {
Expand Down
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.kafka.serialization.schemas.{JsonSerialization
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.KafkaSourceFactoryState
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryMixin._
import pl.touk.nussknacker.engine.kafka.source.{KafkaContextInitializer, KafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, KafkaSpec, serialization}
import pl.touk.nussknacker.engine.kafka.{KafkaRecordUtils, KafkaSpec, serialization}
import pl.touk.nussknacker.test.PatientScalaFutures

import java.util.Collections.singletonMap
Expand Down Expand Up @@ -58,7 +58,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
ConsumerRecord.NULL_SIZE,
null,
givenObj,
ConsumerRecordUtils.emptyHeaders,
KafkaRecordUtils.emptyHeaders,
Optional.of(0: Integer)
)
pushMessage(new SimpleSerializationSchema[Any](topic, String.valueOf), givenObj, topic, timestamp = constTimestamp)
Expand All @@ -77,7 +77,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
TimestampType.CREATE_TIME,
null,
givenObj,
ConsumerRecordUtils.emptyHeaders,
KafkaRecordUtils.emptyHeaders,
Optional.of(0)
)
pushMessage(new JsonSerializationSchema[SampleValue](topic).asInstanceOf[serialization.KafkaSerializationSchema[Any]], givenObj, topic, timestamp = constTimestamp)
Expand All @@ -96,7 +96,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
TimestampType.CREATE_TIME,
null,
givenObj,
ConsumerRecordUtils.emptyHeaders,
KafkaRecordUtils.emptyHeaders,
Optional.of(0)
)
pushMessage(new JsonSerializationSchema[SampleValue](topic).asInstanceOf[serialization.KafkaSerializationSchema[Any]], givenObj, topic, timestamp = constTimestamp)
Expand All @@ -115,7 +115,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
TimestampType.CREATE_TIME,
sampleKey,
sampleValue,
ConsumerRecordUtils.toHeaders(sampleHeadersMap),
KafkaRecordUtils.toHeaders(sampleHeadersMap),
Optional.of(0)
)
pushMessage(objToSerializeSerializationSchema(topic), givenObj, topic, timestamp = constTimestamp)
Expand Down
Expand Up @@ -121,8 +121,6 @@ kafka {
"bootstrap.servers": ${kafkaAddress}
"schema.registry.url": ${schemaRegistryUrl}
}
consumerGroupNamingStrategy: processId-nodeId
avroKryoGenericRecordSchemaIdSerialization: true
}

asyncExecutionConfig {
Expand Down
Expand Up @@ -3,7 +3,6 @@ package pl.touk.nussknacker.engine.management.sample
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.parser.CronParser
import com.typesafe.config.Config
import io.circe.parser.decode
import io.circe.{Decoder, Encoder}
import net.ceedubs.ficus.Ficus._
Expand All @@ -14,14 +13,8 @@ import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{ComponentGroupName, ParameterConfig, SingleComponentConfig}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, MandatoryParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.flink.util.sink.{EmptySink, SingleValueSinkFactory}
import pl.touk.nussknacker.engine.flink.util.source.{EspDeserializationSchema, ReturningClassInstanceSource, ReturningTestCaseClass}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, SchemaRegistryCacheConfig}
import pl.touk.nussknacker.engine.kafka.consumerrecord.{ConsumerRecordToJsonFormatterFactory, FixedValueDeserializationSchemaFactory}
import pl.touk.nussknacker.engine.kafka.generic.sinks.FlinkKafkaSinkImplFactory
import pl.touk.nussknacker.engine.kafka.serialization.schemas.SimpleSerializationSchema
Expand All @@ -35,6 +28,13 @@ import pl.touk.nussknacker.engine.management.sample.helper.DateProcessHelper
import pl.touk.nussknacker.engine.management.sample.service._
import pl.touk.nussknacker.engine.management.sample.source._
import pl.touk.nussknacker.engine.management.sample.transformer._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.util.LoggingListener

import java.nio.charset.StandardCharsets
Expand All @@ -61,12 +61,10 @@ class DevProcessConfigCreator extends ProcessConfigCreator {

private def all[T](value: T): WithCategories[T] = WithCategories(value, "Category1", "Category2", "DemoFeatures", "TESTCAT" , "DevelopmentTests")

private def kafkaConfig(config: Config) = KafkaConfig.parseConfig(config)

override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = {
val confluentFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory)
val universalPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(confluentFactory)
val universalPayloadSerdeProvider = UniversalSchemaBasedSerdeProvider.create(confluentFactory)
Map(
"sendSms" -> all(new SingleValueSinkFactory(new DiscardingSink)),
"monitor" -> categories(SinkFactory.noParam(EmptySink)),
Expand All @@ -82,7 +80,7 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
override def sourceFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SourceFactory]] = {
val confluentFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val schemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory)
val universalMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(confluentFactory)
val universalMessagesSerdeProvider = UniversalSchemaBasedSerdeProvider.create(confluentFactory)
val avroSourceFactory = new KafkaAvroSourceFactory[Any, Any](confluentFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, new FlinkKafkaSourceImplFactory(None))
val universalSourceFactory = new UniversalKafkaSourceFactory[Any, Any](confluentFactory, universalMessagesSerdeProvider, processObjectDependencies, new FlinkKafkaSourceImplFactory(None))
Map(
Expand All @@ -107,7 +105,7 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
)
}

private def createSchemaRegistryClientFactory(processObjectDependencies: ProcessObjectDependencies): ConfluentSchemaRegistryClientFactory = {
private def createSchemaRegistryClientFactory(processObjectDependencies: ProcessObjectDependencies): SchemaRegistryClientFactory = {
val mockConfluent = processObjectDependencies.config.getAs[Boolean](DevProcessConfigCreator.emptyMockedSchemaRegistryProperty).contains(true)
if (mockConfluent) {
new MockConfluentSchemaRegistryClientFactory(new MockSchemaRegistryClient)
Expand Down
Expand Up @@ -7,33 +7,47 @@ import org.apache.avro.generic.GenericData
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.AvroUtils
import pl.touk.nussknacker.engine.api.component.ComponentProviderConfig
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.GenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.CachedConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.GenericRecordSchemaIdSerializationSupport

// We need it because we use avro records inside our Context class
class AvroSerializersRegistrar extends SerializersRegistrar with LazyLogging {

override def register(modelConfig: Config, executionConfig: ExecutionConfig): Unit = {
logger.debug("Registering default avro serializers")
val resolvedKafkaConfig = resolveConfig(modelConfig)
AvroUtils.getAvroUtils.addAvroSerializersIfRequired(executionConfig, classOf[GenericData.Record])
val resolvedKafkaConfig = resolveConfig(modelConfig)
registerGenericRecordSchemaIdSerializationForGlobalKafkaConfigIfNeed(resolvedKafkaConfig, executionConfig)
}

private def resolveConfig(modelConfig: Config): Option[KafkaConfig] = {
val componentsConfig = modelConfig.getAs[Map[String, ComponentProviderConfig]]("components").getOrElse(Map.empty)
val componentsKafkaConfigs = componentsConfig
.toList
.filter {
case (name, config) =>
val providerType = config.providerType.getOrElse(name)
providerType == "kafka"
}
.filterNot(_._2.disabled)
.map(_._2.config)
.flatMap(KafkaConfig.parseConfigOpt(_, "config.kafka"))
val theOnlyOneEnabledKafkaConfigOpt = componentsKafkaConfigs match {
case firstEnabledKafkaConfig :: Nil => Some(firstEnabledKafkaConfig)
.map {
case (name, config) =>
name -> KafkaConfig.parseConfig(config.config, "config")
}
componentsKafkaConfigs match {
case (componentName, kafkaConfig) :: Nil =>
logger.debug(s"Found one enabled kafka component: $componentName")
Some(kafkaConfig)
case Nil =>
val configOpt = KafkaConfig.parseConfigOpt(modelConfig)
configOpt.foreach(_ => logger.debug(s"No kafka components found, but model root kafka config found"))
configOpt
case _ => None // mechanism would be disabled in case if there is more than one kafka component enabled
}
theOnlyOneEnabledKafkaConfigOpt.orElse(KafkaConfig.parseConfigOpt(modelConfig))
}

// It registers GenericRecordSchemaIdSerialization only for first kafka component config
Expand Down Expand Up @@ -62,7 +76,7 @@ object AvroSerializersRegistrar extends LazyLogging {
registerGenericRecordSchemaIdSerializationIfNeed(config, CachedConfluentSchemaRegistryClientFactory, kafkaConfig)
}

def registerGenericRecordSchemaIdSerializationIfNeed(config: ExecutionConfig, schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory, kafkaConfig: KafkaConfig): Unit = {
def registerGenericRecordSchemaIdSerializationIfNeed(config: ExecutionConfig, schemaRegistryClientFactory: SchemaRegistryClientFactory, kafkaConfig: KafkaConfig): Unit = {
if (GenericRecordSchemaIdSerializationSupport.schemaIdSerializationEnabled(kafkaConfig)) {
logger.debug("Registering SchemaIdBasedAvroGenericRecordSerializer")
SchemaIdBasedAvroGenericRecordSerializer(schemaRegistryClientFactory, kafkaConfig).registerIn(config)
Expand Down

0 comments on commit 8b191e5

Please sign in to comment.