Skip to content

Commit

Permalink
add feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Feb 13, 2024
1 parent 768df3e commit c7a5471
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
4 changes: 3 additions & 1 deletion docs/MigrationGuide.md
Expand Up @@ -77,7 +77,9 @@ To see the biggest differences please consult the [changelog](Changelog.md).
`ModelConfig` or returns the original name if the value is not configured
* [#5526](https://github.com/TouK/nussknacker/pull/5526) Added namespacing of Kafka consumer group id in both engines.
If you have namespaces configured, the consumer group id will be prefixed with `namespace` key from model config -
in that case a consumer group migration may be necessary for example to retain consumer offsets.
in that case a consumer group migration may be necessary for example to retain consumer offsets. For gradual
migration, this behaviour can be disabled by setting `useNamingStrategyInConsumerGroups = false` in `KafkaConfig`.
Note that the `useNamingStrategyInConsumerGroups` flag is intended to be removed in the future.

### REST API changes
* [#5280](https://github.com/TouK/nussknacker/pull/5280)[#5368](https://github.com/TouK/nussknacker/pull/5368) Changes in the definition API:
Expand Down
Expand Up @@ -130,7 +130,11 @@ class FlinkKafkaSource[T](

private def prepareConsumerGroupId(nodeContext: FlinkCustomNodeContext): String = {
val baseName = overriddenConsumerGroup.getOrElse(ConsumerGroupDeterminer(kafkaConfig).consumerGroup(nodeContext))
namingStrategy.prepareName(baseName)
if (kafkaConfig.useNamingStrategyForConsumerGroupId) {
namingStrategy.prepareName(baseName)
} else {
baseName
}
}

}
Expand Down
Expand Up @@ -55,7 +55,11 @@ class KafkaSingleScenarioTaskRun(
extends Task
with LazyLogging {

private val groupId = namingStrategy.prepareName(metaData.name.value)
private val groupId = if (engineConfig.kafka.useNamingStrategyForConsumerGroupId) {
namingStrategy.prepareName(metaData.name.value)
} else {
metaData.name.value
}

private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
private var producer: KafkaProducerRecordsHandler = _
Expand Down
Expand Up @@ -26,7 +26,9 @@ case class KafkaConfig(
useStringForKey: Boolean = true,
schemaRegistryCacheConfig: SchemaRegistryCacheConfig = SchemaRegistryCacheConfig(),
avroAsJsonSerialization: Option[Boolean] = None,
kafkaAddress: Option[String] = None
kafkaAddress: Option[String] = None,
// TODO: remove this feature flag in future release
useNamingStrategyForConsumerGroupId: Boolean = true
) {

def schemaRegistryClientKafkaConfig = SchemaRegistryClientKafkaConfig(
Expand Down

0 comments on commit c7a5471

Please sign in to comment.