Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka universal unconfluentization #3948

Merged
merged 9 commits into from Jan 31, 2023
@@ -1,17 +1,17 @@
package pl.touk.nussknacker.engine.benchmarks.serialization.avro

import com.typesafe.config.ConfigFactory
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import org.apache.avro.generic.GenericData
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.openjdk.jmh.annotations._
import pl.touk.nussknacker.engine.benchmarks.serialization.SerializationBenchmarkSetup
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.MockSchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.benchmarks.serialization.SerializationBenchmarkSetup
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, SchemaRegistryClientKafkaConfig}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory

import java.util.concurrent.TimeUnit

Expand All @@ -28,11 +28,7 @@ class AvroBenchmark {
val schemaRegistryMockClient = new MockSchemaRegistryClient
val parsedSchema = ConfluentUtils.convertToAvroSchema(AvroSamples.sampleSchema, Some(1))
schemaRegistryMockClient.register("foo-value", parsedSchema, 1, AvroSamples.sampleSchemaId.asInt)
val factory: CachedConfluentSchemaRegistryClientFactory =
new CachedConfluentSchemaRegistryClientFactory {
override protected def confluentClient(kafkaConfig: SchemaRegistryClientKafkaConfig): SchemaRegistryClient =
schemaRegistryMockClient
}
val factory = MockSchemaRegistryClientFactory.confluentBased(schemaRegistryMockClient)
val serializer = SchemaIdBasedAvroGenericRecordSerializer(factory, KafkaConfig(Some(Map("bootstrap.servers" -> "fooKafkaAddress")), None))
config.getRegisteredTypesWithKryoSerializers.put(serializer.clazz, new ExecutionConfig.SerializableSerializer(serializer))
config.getDefaultKryoSerializers.put(serializer.clazz, new ExecutionConfig.SerializableSerializer(serializer))
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -732,6 +732,7 @@ lazy val schemedKafkaComponentsUtils = (project in utils("schemed-kafka-componen
ExclusionRule("org.slf4j", "slf4j-log4j12")
),
"tech.allegro.schema.json2avro" % "converter" % "0.2.15",
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"org.scalatest" %% "scalatest" % scalaTestV % "test"
)
}
Expand Down
5 changes: 4 additions & 1 deletion docs/Changelog.md
Expand Up @@ -16,7 +16,10 @@
* [#3916](https://github.com/TouK/nussknacker/pull/3916) `environmentAlert.cssClass` setting renamed to `environmentAlert.color`
* [#3922](https://github.com/TouK/nussknacker/pull/3922) Bumps: jwks 0.19.0 -> 0.21.3, jackson: 2.11.3 -> 2.13.4
* [#3958](https://github.com/TouK/nussknacker/pull/3958) OpenAPI: specify Content-Type header based on schema

* [#3948](https://github.com/TouK/nussknacker/pull/3948)
* Performance fix: `kafka` source on Flink engine doesn't serialize schema during record serialization
* Configuration handling fixes: `avroKryoGenericRecordSchemaIdSerialization` wasn't checked properly
* Avro: added support for top level array schema

1.7.0 (19 Dec 2022)
------------------------
Expand Down
12 changes: 12 additions & 0 deletions docs/MigrationGuide.md
Expand Up @@ -35,6 +35,18 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#3929](https://github.com/TouK/nussknacker/pull/3929) From now, `SchemaId` value class is used in every place
where schema id was represented as an Int. For conversion between `SchemaId` and `Int` use `SchemaId.fromInt` and `SchemaId.asInt`.
Use `ConfluentUtils.toSchemaWithMetadata` instead of `SchemaWithMetadata.apply` for conversion between Confluent's `SchemaMetadata` and ours `SchemaWithMetadata`.
* [#3948](https://github.com/TouK/nussknacker/pull/3948) Now, we are less dependent from Confluent schema registry.
To make it possible, some kafka universal/avro components refactors were done. Most important changes in public API:
* ConfluentSchemaBasedSerdeProvider.universal was replaced by UniversalSchemaBasedSerdeProvider.create

Some other, internal changes:
* Non-confluent classes renamed and moved to desired packages
* Extracted new class: SchemaIdFromMessageExtractor to make Confluent logic explicit and moved to top level
* Extracted SchemaValidator to make Confluent logic explicit and be able to compose
* Some renames: ConsumerRecordUtils -> KafkaRecordUtils
* RecordDeserializer -> AvroRecordDeserializer (also inheritance replaced by composition)
* (De)SerializerFactory - easier abstractions
* ConfluentSchemaRegistryFactory is not necessary now - removed

## In version 1.7.0

Expand Down
Expand Up @@ -5,20 +5,21 @@ import com.typesafe.config.ConfigValueFactory.fromAnyRef
import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, toFicusConfig}
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.generic.sinks.GenericKafkaJsonSinkFactory
import pl.touk.nussknacker.engine.kafka.generic.sources.{GenericJsonSourceFactory, GenericTypedJsonSourceFactory}
import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalSchemaBasedSerdeProvider, UniversalSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.util.config.DocsConfig

class FlinkKafkaComponentProvider extends ComponentProvider {

protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory
protected def schemaRegistryClientFactory: SchemaRegistryClientFactory = UniversalSchemaRegistryClientFactory

override def providerName: String = "kafka"

Expand All @@ -35,7 +36,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider {

val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)
val jsonPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory)
val universalSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(schemaRegistryClientFactory)
val universalSerdeProvider = UniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory)

lazy val lowLevelKafkaComponents = List(
ComponentDefinition("kafka-json", new GenericKafkaJsonSinkFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo),
Expand Down
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.kafka.consumerrecord.{ConsumerRecordDeserializ
import pl.touk.nussknacker.engine.kafka.serialization.schemas.BaseSimpleSerializationSchema
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryMixin._
import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, KafkaConfig, KafkaSpec, serialization}
import pl.touk.nussknacker.engine.kafka.{KafkaRecordUtils, KafkaConfig, KafkaSpec, serialization}
import pl.touk.nussknacker.engine.util.namespaces.ObjectNamingProvider
import pl.touk.nussknacker.test.PatientScalaFutures

Expand All @@ -31,7 +31,7 @@ trait KafkaSourceFactoryMixin extends AnyFunSuite with Matchers with KafkaSpec w
val sampleValue: SampleValue = SampleValue("first", "last")
val sampleKey: SampleKey = SampleKey("one", 2L)
val sampleHeadersMap: Map[String, String] = Map("headerOne" -> "valueOfHeaderOne", "headerTwo" -> null)
val sampleHeaders: Headers = ConsumerRecordUtils.toHeaders(sampleHeadersMap)
val sampleHeaders: Headers = KafkaRecordUtils.toHeaders(sampleHeadersMap)

val sampleTopic = "topic"
val constTimestamp: Long = 123L
Expand All @@ -43,7 +43,7 @@ trait KafkaSourceFactoryMixin extends AnyFunSuite with Matchers with KafkaSpec w
topic,
(obj: ObjToSerialize) => Option(obj.value).map(v => implicitly[Encoder[SampleValue]].apply(v).noSpaces).orNull,
(obj: ObjToSerialize) => Option(obj.key).map(k => implicitly[Encoder[SampleKey]].apply(k).noSpaces).orNull,
(obj: ObjToSerialize) => ConsumerRecordUtils.toHeaders(obj.headers)
(obj: ObjToSerialize) => KafkaRecordUtils.toHeaders(obj.headers)
).asInstanceOf[serialization.KafkaSerializationSchema[Any]]

protected def createTopic(name: String, partitions: Int = 1): String = {
Expand Down
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.kafka.serialization.schemas.{JsonSerialization
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.KafkaSourceFactoryState
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryMixin._
import pl.touk.nussknacker.engine.kafka.source.{KafkaContextInitializer, KafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, KafkaSpec, serialization}
import pl.touk.nussknacker.engine.kafka.{KafkaRecordUtils, KafkaSpec, serialization}
import pl.touk.nussknacker.test.PatientScalaFutures

import java.util.Collections.singletonMap
Expand Down Expand Up @@ -58,7 +58,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
ConsumerRecord.NULL_SIZE,
null,
givenObj,
ConsumerRecordUtils.emptyHeaders,
KafkaRecordUtils.emptyHeaders,
Optional.of(0: Integer)
)
pushMessage(new SimpleSerializationSchema[Any](topic, String.valueOf), givenObj, topic, timestamp = constTimestamp)
Expand All @@ -77,7 +77,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
TimestampType.CREATE_TIME,
null,
givenObj,
ConsumerRecordUtils.emptyHeaders,
KafkaRecordUtils.emptyHeaders,
Optional.of(0)
)
pushMessage(new JsonSerializationSchema[SampleValue](topic).asInstanceOf[serialization.KafkaSerializationSchema[Any]], givenObj, topic, timestamp = constTimestamp)
Expand All @@ -96,7 +96,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
TimestampType.CREATE_TIME,
null,
givenObj,
ConsumerRecordUtils.emptyHeaders,
KafkaRecordUtils.emptyHeaders,
Optional.of(0)
)
pushMessage(new JsonSerializationSchema[SampleValue](topic).asInstanceOf[serialization.KafkaSerializationSchema[Any]], givenObj, topic, timestamp = constTimestamp)
Expand All @@ -115,7 +115,7 @@ class KafkaSourceFactorySpec extends AnyFunSuite with Matchers with KafkaSpec wi
TimestampType.CREATE_TIME,
sampleKey,
sampleValue,
ConsumerRecordUtils.toHeaders(sampleHeadersMap),
KafkaRecordUtils.toHeaders(sampleHeadersMap),
Optional.of(0)
)
pushMessage(objToSerializeSerializationSchema(topic), givenObj, topic, timestamp = constTimestamp)
Expand Down
Expand Up @@ -121,8 +121,6 @@ kafka {
"bootstrap.servers": ${kafkaAddress}
"schema.registry.url": ${schemaRegistryUrl}
}
consumerGroupNamingStrategy: processId-nodeId
avroKryoGenericRecordSchemaIdSerialization: true
}

asyncExecutionConfig {
Expand Down
Expand Up @@ -13,11 +13,6 @@ import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{ComponentGroupName, ParameterConfig, SingleComponentConfig}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, MandatoryParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.flink.util.sink.{EmptySink, SingleValueSinkFactory}
import pl.touk.nussknacker.engine.flink.util.source.{EspDeserializationSchema, ReturningClassInstanceSource, ReturningTestCaseClass}
import pl.touk.nussknacker.engine.kafka.consumerrecord.{ConsumerRecordToJsonFormatterFactory, FixedValueDeserializationSchemaFactory}
Expand All @@ -33,6 +28,13 @@ import pl.touk.nussknacker.engine.management.sample.helper.DateProcessHelper
import pl.touk.nussknacker.engine.management.sample.service._
import pl.touk.nussknacker.engine.management.sample.source._
import pl.touk.nussknacker.engine.management.sample.transformer._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.MockSchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{MockSchemaRegistryClientFactory, UniversalSchemaBasedSerdeProvider, UniversalSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.util.LoggingListener

import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -62,27 +64,27 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
private def all[T](value: T): WithCategories[T] = WithCategories(value, "Category1", "Category2", "DemoFeatures", "TESTCAT" , "DevelopmentTests")

override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = {
val confluentFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory)
val universalPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(confluentFactory)
val schemaRegistryFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryFactory)
val universalPayloadSerdeProvider = UniversalSchemaBasedSerdeProvider.create(schemaRegistryFactory)
Map(
"sendSms" -> all(new SingleValueSinkFactory(new DiscardingSink)),
"monitor" -> categories(SinkFactory.noParam(EmptySink)),
"communicationSink" -> categories(DynamicParametersSink),
"kafka-string" -> all(new KafkaSinkFactory(new SimpleSerializationSchema[AnyRef](_, String.valueOf), processObjectDependencies, FlinkKafkaSinkImplFactory)),
"kafka-avro" -> all(new KafkaAvroSinkFactoryWithEditor(confluentFactory, avroPayloadSerdeProvider, processObjectDependencies, FlinkKafkaAvroSinkImplFactory)),
"kafka" -> all(new UniversalKafkaSinkFactory(confluentFactory, universalPayloadSerdeProvider, processObjectDependencies, FlinkKafkaUniversalSinkImplFactory))
"kafka-avro" -> all(new KafkaAvroSinkFactoryWithEditor(schemaRegistryFactory, avroPayloadSerdeProvider, processObjectDependencies, FlinkKafkaAvroSinkImplFactory)),
"kafka" -> all(new UniversalKafkaSinkFactory(schemaRegistryFactory, universalPayloadSerdeProvider, processObjectDependencies, FlinkKafkaUniversalSinkImplFactory))
)
}

override def listeners(processObjectDependencies: ProcessObjectDependencies) = List(LoggingListener)

override def sourceFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SourceFactory]] = {
val confluentFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val schemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory)
val universalMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(confluentFactory)
val avroSourceFactory = new KafkaAvroSourceFactory[Any, Any](confluentFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, new FlinkKafkaSourceImplFactory(None))
val universalSourceFactory = new UniversalKafkaSourceFactory[Any, Any](confluentFactory, universalMessagesSerdeProvider, processObjectDependencies, new FlinkKafkaSourceImplFactory(None))
val schemaRegistryFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val schemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryFactory)
val universalMessagesSerdeProvider = UniversalSchemaBasedSerdeProvider.create(schemaRegistryFactory)
val avroSourceFactory = new KafkaAvroSourceFactory[Any, Any](schemaRegistryFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, new FlinkKafkaSourceImplFactory(None))
val universalSourceFactory = new UniversalKafkaSourceFactory[Any, Any](schemaRegistryFactory, universalMessagesSerdeProvider, processObjectDependencies, new FlinkKafkaSourceImplFactory(None))
Map(
"real-kafka" -> all(fixedValueKafkaSource[String](
processObjectDependencies,
Expand All @@ -105,12 +107,12 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
)
}

private def createSchemaRegistryClientFactory(processObjectDependencies: ProcessObjectDependencies): ConfluentSchemaRegistryClientFactory = {
private def createSchemaRegistryClientFactory(processObjectDependencies: ProcessObjectDependencies): SchemaRegistryClientFactory = {
val mockConfluent = processObjectDependencies.config.getAs[Boolean](DevProcessConfigCreator.emptyMockedSchemaRegistryProperty).contains(true)
if (mockConfluent) {
new MockConfluentSchemaRegistryClientFactory(new MockSchemaRegistryClient)
MockSchemaRegistryClientFactory.confluentBased(new MockSchemaRegistryClient)
} else {
CachedConfluentSchemaRegistryClientFactory
UniversalSchemaRegistryClientFactory
}
}

Expand Down