diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index e155b58b09..0cb80b20fc 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -262,13 +262,15 @@ public void run() { MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config); metadataResourceUtil.createResources(); - // fan out the startpoints - StartpointManager startpointManager = createStartpointManager(); - startpointManager.start(); - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); - } finally { - startpointManager.stop(); + // fan out the startpoints if startpoints is enabled + if (new JobConfig(config).getStartpointEnabled()) { + StartpointManager startpointManager = createStartpointManager(); + startpointManager.start(); + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); + } finally { + startpointManager.stop(); + } } // Remap changelog partitions to tasks diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index 37bfe37118..dff2991da2 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -146,6 +146,8 @@ public class JobConfig extends MapConfig { public static final String CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED = "samza.cluster.based.job.coordinator.dependency.isolation.enabled"; + private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled"; + public JobConfig(Config config) { super(config); } @@ -404,4 +406,8 @@ public String getCoordinatorStreamFactory() { public Optional getConfigLoaderFactory() { return Optional.ofNullable(get(CONFIG_LOADER_FACTORY)); } + + public boolean getStartpointEnabled() { + return getBoolean(JOB_STARTPOINT_ENABLED, true); + } } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 5a395207d0..ed0c875295 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -396,8 +396,10 @@ SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { // Metadata store lifecycle managed outside of the SamzaContainer. // All manager lifecycles are managed in the SamzaContainer including startpointManager StartpointManager startpointManager = null; - if (metadataStore != null) { + if (metadataStore != null && new JobConfig(config).getStartpointEnabled()) { startpointManager = new StartpointManager(metadataStore); + } else if (!new JobConfig(config).getStartpointEnabled()) { + LOGGER.warn("StartpointManager not instantiated because startpoints is not enabled"); } else { LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor"); } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index c68ec03b2d..40d3d5c2f6 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -102,7 +102,10 @@ private static void run( LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE)); // StartpointManager wraps the coordinatorStreamStore in the namespaces internally - StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore); + StartpointManager startpointManager = null; + if (new JobConfig(config).getStartpointEnabled()) { + startpointManager = new StartpointManager(coordinatorStreamStore); + } Map metricsReporters = loadMetricsReporters(appDesc, containerId, config); diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index 9a196c3da5..f0a4cc3083 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -158,8 +158,24 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}. * It is empty if it does not exist or if it is too stale. */ + @VisibleForTesting public Optional readStartpoint(SystemStreamPartition ssp) { - return readStartpoint(ssp, null); + Map startpointBytes = readWriteStore.all(); + // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name + return readStartpoint(startpointBytes, ssp, null); + } + + /** + * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}. + * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. + * @param taskName the {@link TaskName} to fetch the {@link Startpoint} for. + * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}. + * It is empty if it does not exist or if it is too stale. + */ + @VisibleForTesting + public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { + Map startpointBytes = readWriteStore.all(); + return readStartpoint(startpointBytes, ssp, taskName); } /** @@ -169,11 +185,11 @@ public Optional readStartpoint(SystemStreamPartition ssp) { * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}. * It is empty if it does not exist or if it is too stale. */ - public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { + public Optional readStartpoint(Map startpointMap, SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); - byte[] startpointBytes = readWriteStore.get(toReadWriteStoreKey(ssp, taskName)); + byte[] startpointBytes = startpointMap.get(toReadWriteStoreKey(ssp, taskName)); if (ArrayUtils.isNotEmpty(startpointBytes)) { try { @@ -191,6 +207,7 @@ public Optional readStartpoint(SystemStreamPartition ssp, TaskName t return Optional.empty(); } + /** * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for. @@ -240,6 +257,8 @@ public Map> fanOut(Map deleteKeys = HashMultimap.create(); HashMap fanOuts = new HashMap<>(); + Map startpointMap = readWriteStore.all(); + for (TaskName taskName : taskToSSPs.keySet()) { Set ssps = taskToSSPs.get(taskName); if (CollectionUtils.isEmpty(ssps)) { @@ -247,10 +266,10 @@ public Map> fanOut(Map startpoint = readStartpoint(ssp); // Read SSP-only key + Optional startpoint = readStartpoint(startpointMap, ssp, null); // Read SSP-only key startpoint.ifPresent(sp -> deleteKeys.put(ssp, null)); - Optional startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key + Optional startpointForTask = readStartpoint(startpointMap, ssp, taskName); // Read SSP+taskName key startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName)); Optional startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index feaabba34b..86c1f06345 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -326,13 +326,15 @@ void loadMetadataResources(JobModel jobModel) { } configStore.flush(); - // fan out the startpoints - StartpointManager startpointManager = createStartpointManager(); - startpointManager.start(); - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); - } finally { - startpointManager.stop(); + if (new JobConfig(config).getStartpointEnabled()) { + // fan out the startpoints + StartpointManager startpointManager = createStartpointManager(); + startpointManager.start(); + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); + } finally { + startpointManager.stop(); + } } } else { LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out."); diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index fab8c6ee69..32649329b2 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -91,13 +91,15 @@ class ProcessJobFactory extends StreamJobFactory with Logging { val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config) metadataResourceUtil.createResources() - // fan out the startpoints - val startpointManager = new StartpointManager(coordinatorStreamStore) - startpointManager.start() - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) - } finally { - startpointManager.stop() + if (new JobConfig(config).getStartpointEnabled()) { + // fan out the startpoints + val startpointManager = new StartpointManager(coordinatorStreamStore) + startpointManager.start() + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) + } finally { + startpointManager.stop() + } } val taskConfig = new TaskConfig(config) diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index c1a36837f5..79bd1818f3 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -91,13 +91,15 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config) metadataResourceUtil.createResources() - // fan out the startpoints - val startpointManager = new StartpointManager(coordinatorStreamStore) - startpointManager.start() - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) - } finally { - startpointManager.stop() + if (new JobConfig(config).getStartpointEnabled()) { + // fan out the startpoints + val startpointManager = new StartpointManager(coordinatorStreamStore) + startpointManager.start() + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) + } finally { + startpointManager.stop() + } } val containerId = "0"