-
Notifications
You must be signed in to change notification settings - Fork 171
/
__classNamePrefix__Controller.java
57 lines (47 loc) · 2.3 KB
/
__classNamePrefix__Controller.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
package ${package}.pe.processor.${packageName};
import ${package}.config.Config;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
public class ${classNamePrefix}Controller extends
FlinkDataProcessorDeclarer<${classNamePrefix}Parameters> {
private static final String EXAMPLE_KEY = "example-key";
@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("${package}.pe.processor.${packageName}")
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withLocales(Locales.EN)
.category(DataProcessorType.ENRICH)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
.supportedFormats(SupportedFormats.jsonFormat())
.supportedProtocols(SupportedProtocols.kafka())
.outputStrategy(OutputStrategies.keep())
.requiredTextParameter(Labels.withId(EXAMPLE_KEY))
.build();
}
@Override
public FlinkDataProcessorRuntime<${classNamePrefix}Parameters> getRuntime(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
String exampleString = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleString);
return new ${classNamePrefix}Program(params, Config.INSTANCE.getDebug());
}
}