/
FlinkKafkaComponentProvider.scala
80 lines (65 loc) · 5.9 KB
/
FlinkKafkaComponentProvider.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package pl.touk.nussknacker.engine.flink.util.transformer
import com.typesafe.config.Config
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, toFicusConfig}
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.generic.sinks.GenericKafkaJsonSinkFactory
import pl.touk.nussknacker.engine.kafka.generic.sources.{GenericJsonSourceFactory, GenericTypedJsonSourceFactory}
import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory
import pl.touk.nussknacker.engine.util.config.DocsConfig
class FlinkKafkaComponentProvider extends ComponentProvider {
protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory
override def providerName: String = "kafka"
override def resolveConfigForExecution(config: Config): Config = config
override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = {
val overriddenDependencies = TemporaryKafkaConfigMapping.prepareDependencies(config, dependencies)
val docsConfig: DocsConfig = new DocsConfig(config)
import docsConfig._
val avro = "DataSourcesAndSinks#schema-registry--avro-serialization"
val schemaRegistryTypedJson = "DataSourcesAndSinks#schema-registry--json-serialization"
val noTypeInfo = "DataSourcesAndSinks#no-type-information--json-serialization"
val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)
val jsonPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory)
val universalSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(schemaRegistryClientFactory)
lazy val lowLevelKafkaComponents = List(
ComponentDefinition("kafka-json", new GenericKafkaJsonSinkFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo),
ComponentDefinition("kafka-json", new GenericJsonSourceFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo),
ComponentDefinition("kafka-typed-json", new GenericTypedJsonSourceFactory(overriddenDependencies)).withRelativeDocs("DataSourcesAndSinks#manually-typed--json-serialization"),
ComponentDefinition("kafka-avro", new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro),
ComponentDefinition("kafka-avro", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro),
ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-registry-typed-json-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-avro-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro)
)
// TODO: change link to the documentation when json schema handling will be available
val universalKafkaComponents = List(
ComponentDefinition("kafka", new UniversalKafkaSourceFactory(schemaRegistryClientFactory, universalSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro),
ComponentDefinition("kafka", new UniversalKafkaSinkFactory(schemaRegistryClientFactory, universalSerdeProvider, overriddenDependencies, FlinkKafkaUniversalSinkImplFactory)).withRelativeDocs(avro)
)
val lowLevelComponentsEnabled = config.getAs[Boolean]("config.lowLevelComponentsEnabled").getOrElse(KafkaConfig.lowLevelComponentsEnabled)
if (lowLevelComponentsEnabled) {
lowLevelKafkaComponents ::: universalKafkaComponents
} else {
universalKafkaComponents
}
}
override def isCompatible(version: NussknackerVersion): Boolean = true
override def isAutoLoaded: Boolean = false
}
//FIXME: Kafka components should not depend directly on ProcessObjectDependencies, only on
//appropriate config, this class is temporary solution, where we pass modified dependencies
private[transformer] object TemporaryKafkaConfigMapping {
def prepareDependencies(config: Config, dependencies: ProcessObjectDependencies): ProcessObjectDependencies = {
val kafkaConfig = config.getConfig("config")
val kafkaConfigMergedWithGlobalConfig = dependencies.config.withValue(KafkaConfig.defaultGlobalKafkaConfigPath, fromAnyRef(kafkaConfig.root()))
dependencies.copy(config = kafkaConfigMergedWithGlobalConfig)
}
}