Skip to content

Commit

Permalink
Have the ability to provide source/sink config in cmdline for sources…
Browse files Browse the repository at this point in the history
…/sinks (#1757)
  • Loading branch information
srkukarni authored and merlimat committed May 10, 2018
1 parent 114be44 commit 8b2dfd0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
Expand Up @@ -133,6 +133,8 @@ class CreateSink extends BaseCommand {
@Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the "
+ "sink's configuration")
protected String sinkConfigFile;
@Parameter(names = "--sinkConfig", description = "Sink config key/values")
protected String sinkConfigString;

protected SinkConfig sinkConfig;

Expand Down Expand Up @@ -199,6 +201,12 @@ public void accept(String s) {
if (null == jarFile) {
throw new IllegalArgumentException("Connector JAR not specfied");
}

if (null != sinkConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, Object> sinkConfigMap = new Gson().fromJson(sinkConfigString, type);
sinkConfig.setConfigs(sinkConfigMap);
}
}

@Override
Expand Down
Expand Up @@ -133,6 +133,8 @@ class CreateSource extends BaseCommand {
@Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the "
+ "source's configuration")
protected String sourceConfigFile;
@Parameter(names = "--sourceConfig", description = "Source config key/values")
protected String sourceConfigString;

protected SourceConfig sourceConfig;

Expand Down Expand Up @@ -184,6 +186,12 @@ void processArguments() throws Exception {
if (null == jarFile) {
throw new IllegalArgumentException("Connector JAR not specfied");
}

if (null != sourceConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, Object> sourceConfigMap = new Gson().fromJson(sourceConfigString, type);
sourceConfig.setConfigs(sourceConfigMap);
}
}

@Override
Expand Down
Expand Up @@ -38,7 +38,7 @@ public class SinkConfig {
private String name;
private String className;
private Map<String, String> topicToSerdeClassName;
private Map<String, String> configs = new HashMap<>();
private Map<String, Object> configs = new HashMap<>();
private int parallelism = 1;
private FunctionConfig.ProcessingGuarantees processingGuarantees;
}

0 comments on commit 8b2dfd0

Please sign in to comment.