From c7a5471f4af554bbff210bb01a83c747c1916556 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Tue, 13 Feb 2024 10:18:31 +0100 Subject: [PATCH] add feature flag --- docs/MigrationGuide.md | 4 +++- .../engine/kafka/source/flink/FlinkKafkaSource.scala | 6 +++++- .../engine/lite/kafka/KafkaSingleScenarioTaskRun.scala | 6 +++++- .../pl/touk/nussknacker/engine/kafka/KafkaConfig.scala | 4 +++- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 2475b38b25e..b1f03ec6d07 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -77,7 +77,9 @@ To see the biggest differences please consult the [changelog](Changelog.md). `ModelConfig` or returns the original name if the value is not configured * [#5526](https://github.com/TouK/nussknacker/pull/5526) Added namespacing of Kafka consumer group id in both engines. If you have namespaces configured, the consumer group id will be prefixed with `namespace` key from model config - - in that case a consumer group migration may be necessary for example to retain consumer offsets. + in that case a consumer group migration may be necessary for example to retain consumer offsets. For gradual + migration, this behaviour can be disabled by setting `useNamingStrategyInConsumerGroups = false` in `KafkaConfig`. + Note that the `useNamingStrategyInConsumerGroups` flag is intended to be removed in the future. ### REST API changes * [#5280](https://github.com/TouK/nussknacker/pull/5280)[#5368](https://github.com/TouK/nussknacker/pull/5368) Changes in the definition API: diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala index 55f61e469a4..0e7f88e306f 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala @@ -130,7 +130,11 @@ class FlinkKafkaSource[T]( private def prepareConsumerGroupId(nodeContext: FlinkCustomNodeContext): String = { val baseName = overriddenConsumerGroup.getOrElse(ConsumerGroupDeterminer(kafkaConfig).consumerGroup(nodeContext)) - namingStrategy.prepareName(baseName) + if (kafkaConfig.useNamingStrategyForConsumerGroupId) { + namingStrategy.prepareName(baseName) + } else { + baseName + } } } diff --git a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala index ca3f133ae2c..869f9f33dcb 100644 --- a/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala +++ b/engine/lite/kafka/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.scala @@ -55,7 +55,11 @@ class KafkaSingleScenarioTaskRun( extends Task with LazyLogging { - private val groupId = namingStrategy.prepareName(metaData.name.value) + private val groupId = if (engineConfig.kafka.useNamingStrategyForConsumerGroupId) { + namingStrategy.prepareName(metaData.name.value) + } else { + metaData.name.value + } private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _ private var producer: KafkaProducerRecordsHandler = _ diff --git a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaConfig.scala b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaConfig.scala index 9a70d6edc13..1a6298e60cf 100644 --- a/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaConfig.scala +++ b/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaConfig.scala @@ -26,7 +26,9 @@ case class KafkaConfig( useStringForKey: Boolean = true, schemaRegistryCacheConfig: SchemaRegistryCacheConfig = SchemaRegistryCacheConfig(), avroAsJsonSerialization: Option[Boolean] = None, - kafkaAddress: Option[String] = None + kafkaAddress: Option[String] = None, + // TODO: remove this feature flag in future release + useNamingStrategyForConsumerGroupId: Boolean = true ) { def schemaRegistryClientKafkaConfig = SchemaRegistryClientKafkaConfig(