/
KafkaSourceFactory.scala
231 lines (202 loc) · 10.4 KB
/
KafkaSourceFactory.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package pl.touk.nussknacker.engine.kafka.source
import cats.Id
import io.circe.Json
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.record.TimestampType
import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent
import pl.touk.nussknacker.engine.api.context.transformation._
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.definition._
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.test.TestRecord
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown}
import pl.touk.nussknacker.engine.api.{MetaData, NodeId, Params}
import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName
import pl.touk.nussknacker.engine.kafka._
import pl.touk.nussknacker.engine.kafka.serialization.{KafkaDeserializationSchema, KafkaDeserializationSchemaFactory}
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory._
import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator
import scala.reflect.ClassTag
/**
* Base factory for Kafka sources with additional metadata variable.
* It is based on [[pl.touk.nussknacker.engine.api.context.transformation.SingleInputDynamicComponent]]
* that allows custom ValidationContext and Context transformations, which are provided by [[KafkaContextInitializer]]
* Can be used for single- or multi- topic sources (as csv, see topicNameSeparator and extractTopics).
*
* Features:
* - fetch latest N records which can be later used to test process in UI
* Fetching data is defined in source which may
* extends [[pl.touk.nussknacker.engine.api.process.TestDataGenerator]]. See [[pl.touk.nussknacker.engine.kafka.KafkaUtils#readLastMessages]]
* - reset Kafka's offset to latest value - `forceLatestRead` property, see [[pl.touk.nussknacker.engine.kafka.KafkaUtils#setOffsetToLatest]]
*
* @tparam K - type of key of kafka event that is generated by raw source (SourceFunction).
* @tparam V - type of value of kafka event that is generated by raw source (SourceFunction).
* */
class KafkaSourceFactory[K: ClassTag, V: ClassTag](
protected val deserializationSchemaFactory: KafkaDeserializationSchemaFactory[ConsumerRecord[K, V]],
protected val formatterFactory: RecordFormatterFactory,
protected val modelDependencies: ProcessObjectDependencies,
protected val implProvider: KafkaSourceImplFactory[K, V]
) extends SourceFactory
with SingleInputDynamicComponent[Source]
with WithCachedTopicsExistenceValidator
with WithExplicitTypesToExtract
with UnboundedStreamComponent {
protected val topicNameSeparator = ","
protected lazy val keyTypingResult: TypingResult = Typed(implicitly[ClassTag[K]].runtimeClass)
protected lazy val valueTypingResult: TypingResult = Typed(implicitly[ClassTag[V]].runtimeClass)
// Node validation and compilation refers to ValidationContext, that returns TypingResult's of all variables returned by the source.
// Variable suggestion uses DefinitionExtractor that requires proper type definitions for DynamicComponent (which in general does not have a specified "returnType"):
// - for TypeClass (which is a default scenario) - it is necessary to provide all explicit TypeClass definitions as possibleVariableClasses
// - for TypedObjectTypingResult - suggested variables are defined as explicit "fields"
// Example:
// - validation context indicates that #input is TypedClass(classOf(SampleProduct)), that is used by node compilation and validation
// - definition extractor provides detailed definition of "pl.touk.nussknacker.engine.management.sample.dto.SampleProduct"
override def typesToExtract: List[TypingResult] =
List(keyTypingResult, valueTypingResult, Typed.typedClass[TimestampType])
override type State = KafkaSourceFactoryState[K, V]
private def initialStep(context: ValidationContext, dependencies: List[NodeDependencyValue])(
implicit nodeId: NodeId
): ContextTransformationDefinition = { case step @ TransformationStep(Nil, _) =>
NextParameters(prepareInitialParameters)
}
protected def topicsValidationErrors(
topic: String
)(implicit nodeId: NodeId): List[ProcessCompilationError.CustomNodeError] = {
val topics = topic.split(topicNameSeparator).map(_.trim).toList
val preparedTopics =
topics.map(KafkaComponentsUtils.prepareKafkaTopic(_, modelDependencies)).map(_.prepared)
validateTopics(preparedTopics).swap.toList.map(_.toCustomNodeError(nodeId.id, Some(TopicParamName)))
}
protected def nextSteps(context: ValidationContext, dependencies: List[NodeDependencyValue])(
implicit nodeId: NodeId
): ContextTransformationDefinition = {
case step @ TransformationStep((TopicParamName, DefinedEagerParameter(topic: String, _)) :: _, None) =>
prepareSourceFinalResults(
context,
dependencies,
step.parameters,
keyTypingResult,
valueTypingResult,
topicsValidationErrors(topic)
)
case step @ TransformationStep((TopicParamName, _) :: _, None) =>
// Edge case - for some reason Topic is not defined, e.g. when topic does not match DefinedEagerParameter(String, _):
// 1. FailedToDefineParameter
// 2. not resolved as a valid String
// Those errors are identified by parameter validation and handled elsewhere, hence empty list of errors.
prepareSourceFinalErrors(context, dependencies, step.parameters, errors = Nil)
}
protected def prepareSourceFinalResults(
context: ValidationContext,
dependencies: List[NodeDependencyValue],
parameters: List[(ParameterName, DefinedParameter)],
keyTypingResult: TypingResult,
valueTypingResult: TypingResult,
errors: List[ProcessCompilationError]
)(implicit nodeId: NodeId): FinalResults = {
val kafkaContextInitializer =
prepareContextInitializer(dependencies, parameters, keyTypingResult, valueTypingResult)
FinalResults.forValidation(context, errors, Some(KafkaSourceFactoryState(kafkaContextInitializer)))(
kafkaContextInitializer.validationContext
)
}
// Source specific FinalResults with errors
protected def prepareSourceFinalErrors(
context: ValidationContext,
dependencies: List[NodeDependencyValue],
parameters: List[(ParameterName, DefinedParameter)],
errors: List[ProcessCompilationError]
)(implicit nodeId: NodeId): FinalResults = {
val initializerWithUnknown = prepareContextInitializer(dependencies, parameters, Unknown, Unknown)
FinalResults.forValidation(context, errors)(initializerWithUnknown.validationContext)
}
// Overwrite this for dynamic type definitions.
protected def prepareContextInitializer(
dependencies: List[NodeDependencyValue],
parameters: List[(ParameterName, DefinedParameter)],
keyTypingResult: TypingResult,
valueTypingResult: TypingResult
): ContextInitializer[ConsumerRecord[K, V]] =
new KafkaContextInitializer[K, V](
OutputVariableNameDependency.extract(dependencies),
keyTypingResult,
valueTypingResult
)
/**
* contextTransformation should handle exceptions raised by prepareInitialParameters
*/
override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(
implicit nodeId: NodeId
): ContextTransformationDefinition =
initialStep(context, dependencies) orElse
nextSteps(context, dependencies)
/**
* Common set of operations required to create basic KafkaSource.
*/
override def implementation(
params: Params,
dependencies: List[NodeDependencyValue],
finalState: Option[State]
): Source = {
val topics = extractTopics(params)
val preparedTopics = topics.map(KafkaComponentsUtils.prepareKafkaTopic(_, modelDependencies))
val deserializationSchema = deserializationSchemaFactory.create(topics, kafkaConfig)
val formatter = formatterFactory.create(kafkaConfig, deserializationSchema)
val contextInitializer = finalState.get.contextInitializer
implProvider.createSource(
params,
dependencies,
finalState.get,
preparedTopics,
kafkaConfig,
deserializationSchema,
formatter,
contextInitializer,
KafkaTestParametersInfo.empty,
modelDependencies.namingStrategy
)
}
/**
* Basic implementation of definition of single topic parameter.
* In case of fetching topics from external repository: return list of topics or raise exception.
*/
protected def prepareInitialParameters: List[Parameter] = topicParameterDeclaration.createParameter() :: Nil
protected val topicParameterDeclaration: ParameterCreatorWithNoDependency with ParameterExtractor[String] =
ParameterDeclaration
.mandatory[String](TopicParamName)
.withCreator(modify = _.copy(validators = List(MandatoryParameterValidator, NotBlankParameterValidator)))
/**
* Extracts topics from default topic parameter.
*/
protected def extractTopics(params: Params): List[String] = {
val paramValue = topicParameterDeclaration.extractValueUnsafe(params)
paramValue.split(topicNameSeparator).map(_.trim).toList
}
override def nodeDependencies: List[NodeDependency] =
List(TypedNodeDependency[MetaData], TypedNodeDependency[NodeId], OutputVariableNameDependency)
override protected val kafkaConfig: KafkaConfig = KafkaConfig.parseConfig(modelDependencies.config)
}
object KafkaSourceFactory {
case class KafkaSourceFactoryState[K, V](contextInitializer: ContextInitializer[ConsumerRecord[K, V]])
case class KafkaTestParametersInfo(parametersDefinition: List[Parameter], createTestRecord: Any => TestRecord)
object KafkaTestParametersInfo {
def empty: KafkaTestParametersInfo = KafkaTestParametersInfo(Nil, _ => TestRecord(Json.Null))
}
trait KafkaSourceImplFactory[K, V] {
def createSource(
params: Params,
dependencies: List[NodeDependencyValue],
finalState: Any,
preparedTopics: List[PreparedKafkaTopic],
kafkaConfig: KafkaConfig,
deserializationSchema: KafkaDeserializationSchema[ConsumerRecord[K, V]],
formatter: RecordFormatter,
contextInitializer: ContextInitializer[ConsumerRecord[K, V]],
testParametersInfo: KafkaTestParametersInfo,
namingStrategy: NamingStrategy
): Source
}
}