diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java index 3fa9c76c4b7..1dd1b3b533d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java @@ -171,15 +171,18 @@ public String apply(String input) { } private static Properties resolveTemplate(Properties jobProps, JobSpecResolver resolver) throws IOException { + // If there is no job template, do not spend resources creating a new JobSpec + if (!jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) { + return jobProps; + } + try { JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(ConfigUtils.propertiesToConfig(jobProps)); - if (jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) { JobTemplate jobTemplate = ResourceBasedJobTemplate .forResourcePath(jobProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH), new PackagedTemplatesJobCatalogDecorator()); jobSpecBuilder.withTemplate(jobTemplate); - } - return ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpecBuilder.build()).getConfig()); + return resolver.resolveJobSpec(jobSpecBuilder.build()).getConfigAsProperties(); } catch (JobTemplate.TemplateException | SpecNotFoundException | URISyntaxException exc) { throw new IOException(exc); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java index a59fc8546cf..3af1cf25eaa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java @@ -22,12 +22,10 @@ import com.google.common.collect.Maps; import com.linkedin.data.template.StringMap; -import com.typesafe.config.ConfigFactory; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.Schedule; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; @@ -60,7 +58,8 @@ public static FlowId deserializeFlowId(String serialized) throws IOException { } public static String serializeFlowConfig(FlowConfig flowConfig) throws IOException { - Properties properties = ConfigUtils.configToProperties(ConfigFactory.parseMap(flowConfig.getProperties())); + Properties properties = new Properties(); + properties.putAll(flowConfig.getProperties()); properties.setProperty(FLOWCONFIG_ID_NAME, flowConfig.getId().getFlowName()); properties.setProperty(FLOWCONFIG_ID_GROUP, flowConfig.getId().getFlowGroup()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 1a257f2ac3c..f83bf7d6063 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -202,8 +202,9 @@ private void scheduleSpecsFromCatalog() { while (specUris.hasNext()) { Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next()); try { - // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change - if (spec instanceof FlowSpec) { + // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true + if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean(( + (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec); onAddSpec(modifiedSpec); } else {