Skip to content

Commit

Permalink
Reduce the number of calls on FlowSpec initialization where possible,…
Browse files Browse the repository at this point in the history
… and configToProperties/vice versa
  • Loading branch information
Will-Lo committed Jul 26, 2021
1 parent 48af4c6 commit 7181b29
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,18 @@ public String apply(String input) {
}

private static Properties resolveTemplate(Properties jobProps, JobSpecResolver resolver) throws IOException {
// If there is no job template, do not spend resources creating a new JobSpec
if (!jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
return jobProps;
}

try {
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(ConfigUtils.propertiesToConfig(jobProps));
if (jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
JobTemplate jobTemplate = ResourceBasedJobTemplate
.forResourcePath(jobProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH),
new PackagedTemplatesJobCatalogDecorator());
jobSpecBuilder.withTemplate(jobTemplate);
}
return ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpecBuilder.build()).getConfig());
return resolver.resolveJobSpec(jobSpecBuilder.build()).getConfigAsProperties();
} catch (JobTemplate.TemplateException | SpecNotFoundException | URISyntaxException exc) {
throw new IOException(exc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static FlowId deserializeFlowId(String serialized) throws IOException {
}

public static String serializeFlowConfig(FlowConfig flowConfig) throws IOException {
Properties properties = ConfigUtils.configToProperties(ConfigFactory.parseMap(flowConfig.getProperties()));
Properties properties = PropertiesUtils.stringMapToProperties(flowConfig.getProperties());
properties.setProperty(FLOWCONFIG_ID_NAME, flowConfig.getId().getFlowName());
properties.setProperty(FLOWCONFIG_ID_GROUP, flowConfig.getId().getFlowGroup());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ private void scheduleSpecsFromCatalog() {
while (specUris.hasNext()) {
Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
try {
// Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if (spec instanceof FlowSpec) {
// Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
if (spec instanceof FlowSpec && ((FlowSpec) spec).getConfigAsProperties()
.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false").equals("true")) {
Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
onAddSpec(modifiedSpec);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,12 @@ public static String prettyPrintProperties(Properties properties) {
.map(entry -> "\"" + entry.getKey() + "\"" + " : " + "\"" + entry.getValue() + "\"")
.collect(Collectors.joining(",\n"));
}

public static Properties stringMapToProperties(Map<String, String> values) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : values.entrySet()) {
properties.setProperty(entry.getKey(), entry.getValue());
}
return properties;
}
}

0 comments on commit 7181b29

Please sign in to comment.