Skip to content

Commit

Permalink
[GOBBLIN-1497] Reduce the number of calls on FlowSpec initialization …
Browse files Browse the repository at this point in the history
…where possible,… (#3340)

* Reduce the number of calls on FlowSpec initialization where possible, and configToProperties/vice versa

* address review

* Address comments
  • Loading branch information
Will-Lo committed Jul 28, 2021
1 parent 9a9e239 commit 1367614
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
Expand Up @@ -171,15 +171,18 @@ public String apply(String input) {
}

private static Properties resolveTemplate(Properties jobProps, JobSpecResolver resolver) throws IOException {
// If there is no job template, do not spend resources creating a new JobSpec
if (!jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
return jobProps;
}

try {
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(ConfigUtils.propertiesToConfig(jobProps));
if (jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
JobTemplate jobTemplate = ResourceBasedJobTemplate
.forResourcePath(jobProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH),
new PackagedTemplatesJobCatalogDecorator());
jobSpecBuilder.withTemplate(jobTemplate);
}
return ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpecBuilder.build()).getConfig());
return resolver.resolveJobSpec(jobSpecBuilder.build()).getConfigAsProperties();
} catch (JobTemplate.TemplateException | SpecNotFoundException | URISyntaxException exc) {
throw new IOException(exc);
}
Expand Down
Expand Up @@ -22,12 +22,10 @@

import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;


Expand Down Expand Up @@ -60,7 +58,8 @@ public static FlowId deserializeFlowId(String serialized) throws IOException {
}

public static String serializeFlowConfig(FlowConfig flowConfig) throws IOException {
Properties properties = ConfigUtils.configToProperties(ConfigFactory.parseMap(flowConfig.getProperties()));
Properties properties = new Properties();
properties.putAll(flowConfig.getProperties());
properties.setProperty(FLOWCONFIG_ID_NAME, flowConfig.getId().getFlowName());
properties.setProperty(FLOWCONFIG_ID_GROUP, flowConfig.getId().getFlowGroup());

Expand Down
Expand Up @@ -202,8 +202,9 @@ private void scheduleSpecsFromCatalog() {
while (specUris.hasNext()) {
Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
try {
// Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if (spec instanceof FlowSpec) {
// Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
(FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
onAddSpec(modifiedSpec);
} else {
Expand Down

0 comments on commit 1367614

Please sign in to comment.