From 817987b01b50ed5b0748cf2cb7040b891ad9d08d Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Tue, 31 Mar 2020 15:18:41 -0700 Subject: [PATCH 1/8] Optimizing startpoint manager --- .../samza/startpoint/StartpointManager.java | 91 ++++++++++++------- 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index 9a196c3da5..04bb966711 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -159,38 +160,63 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp * It is empty if it does not exist or if it is too stale. */ public Optional readStartpoint(SystemStreamPartition ssp) { - return readStartpoint(ssp, null); + return readStartpointMap(Collections.singletonMap(null, Collections.singleton(ssp)), false).get(null) + .get(ssp); + } + + @VisibleForTesting + public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { + return readStartpointMap(Collections.singletonMap(taskName, Collections.singleton(ssp)), false).get(taskName) + .get(ssp); } /** - * Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}. - * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. - * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for. - * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}. - * It is empty if it does not exist or if it is too stale. + * Returns the {@link Startpoint} for all {@link SystemStreamPartition} and {@link TaskName}s. + * @param taskToSSPs Map of {@link TaskName} to {@link SystemStreamPartition} to fetch startpoints for. + * @param readSSPKeyOnly true if the key to be used comprises of SSP only or SSP+Taskname. + * @return Map of {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}. + * It is empty if it does not exist or if it is too stale. */ - public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { + public Map>> readStartpointMap( + Map> taskToSSPs, boolean readSSPKeyOnly) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); - Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); + Map>> retVal = new HashMap<>(); - byte[] startpointBytes = readWriteStore.get(toReadWriteStoreKey(ssp, taskName)); + Map startpointBytes = readWriteStore.all(); - if (ArrayUtils.isNotEmpty(startpointBytes)) { - try { - Startpoint startpoint = objectMapper.readValue(startpointBytes, Startpoint.class); - if (Instant.now().minus(DEFAULT_EXPIRATION_DURATION).isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) { - return Optional.of(startpoint); // return if deserializable and if not stale + for (TaskName taskName : taskToSSPs.keySet()) { + for (SystemStreamPartition ssp : taskToSSPs.get(taskName)) { + Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); + + String readWriteStoreKey = readSSPKeyOnly ? toReadWriteStoreKey(ssp, null) : toReadWriteStoreKey(ssp, taskName); + byte[] startpointBytesForTask = startpointBytes.get(readWriteStoreKey); + + // pre-populate the empty value for the taskname and ssp + retVal.putIfAbsent(taskName, new HashMap<>()); + retVal.get(taskName).put(ssp, Optional.empty()); + + if (ArrayUtils.isNotEmpty(startpointBytesForTask)) { + try { + Startpoint startpoint = objectMapper.readValue(startpointBytesForTask, Startpoint.class); + if (Instant.now() + .minus(DEFAULT_EXPIRATION_DURATION) + .isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) { + retVal.get(taskName).put(ssp, Optional.of(startpoint)); // return if deserializable and if not stale + } else { + LOG.warn("Creation timestamp: {} of startpoint: {} has crossed the expiration duration: {}. Ignoring it", + startpoint.getCreationTimestamp(), startpoint, DEFAULT_EXPIRATION_DURATION); + } + } catch (IOException ex) { + throw new SamzaException(ex); + } } - LOG.warn("Creation timestamp: {} of startpoint: {} has crossed the expiration duration: {}. Ignoring it", - startpoint.getCreationTimestamp(), startpoint, DEFAULT_EXPIRATION_DURATION); - } catch (IOException ex) { - throw new SamzaException(ex); } } - return Optional.empty(); + return retVal; } + /** * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for. @@ -240,27 +266,26 @@ public Map> fanOut(Map deleteKeys = HashMultimap.create(); HashMap fanOuts = new HashMap<>(); + + Map>> startpointMap = readStartpointMap(taskToSSPs, true); // Read SSP-only key + Map>> startpointForTaskMap = readStartpointMap(taskToSSPs, false); // Read SSP+taskname key + for (TaskName taskName : taskToSSPs.keySet()) { Set ssps = taskToSSPs.get(taskName); if (CollectionUtils.isEmpty(ssps)) { LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName()); continue; } - for (SystemStreamPartition ssp : ssps) { - Optional startpoint = readStartpoint(ssp); // Read SSP-only key - startpoint.ifPresent(sp -> deleteKeys.put(ssp, null)); - Optional startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key - startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName)); - - Optional startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); - if (!startpointWithPrecedence.isPresent()) { - continue; - } - - fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); - fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence.get()); - } + Map> startpoint = startpointMap.get(taskName); + Map> startpointForTask = startpointForTaskMap.get(taskName); + Map> startpointWithPrecedence = ssps.stream() + .collect(Collectors.toMap(ssp -> ssp, + ssp -> resolveStartpointPrecendence(startpoint.get(ssp), startpointForTask.get(ssp)))); + startpointWithPrecedence.entrySet().stream().filter(x -> !x.getValue().isPresent()).forEach(x -> { + fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); + fanOuts.get(taskName).getFanOuts().put(x.getKey(), x.getValue().get()); + }); } if (fanOuts.isEmpty()) { From fbe8dc71509e2b06aac6508f30149c91e2f6936f Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Tue, 31 Mar 2020 15:40:06 -0700 Subject: [PATCH 2/8] Minor --- .../java/org/apache/samza/startpoint/StartpointManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index 04bb966711..e63a0e4aa2 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -282,7 +282,7 @@ public Map> fanOut(Map> startpointWithPrecedence = ssps.stream() .collect(Collectors.toMap(ssp -> ssp, ssp -> resolveStartpointPrecendence(startpoint.get(ssp), startpointForTask.get(ssp)))); - startpointWithPrecedence.entrySet().stream().filter(x -> !x.getValue().isPresent()).forEach(x -> { + startpointWithPrecedence.entrySet().stream().filter(x -> x.getValue().isPresent()).forEach(x -> { fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); fanOuts.get(taskName).getFanOuts().put(x.getKey(), x.getValue().get()); }); From d318de7bca18592ac00b5efe614e2b1aaaf21f71 Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Tue, 31 Mar 2020 15:53:49 -0700 Subject: [PATCH 3/8] Updating deleteKeys after fanout behavior --- .../java/org/apache/samza/startpoint/StartpointManager.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index e63a0e4aa2..687f5f7453 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -278,7 +278,11 @@ public Map> fanOut(Map> startpoint = startpointMap.get(taskName); + startpoint.entrySet().stream().filter(x -> x.getValue().isPresent()).forEach(x -> deleteKeys.put(x.getKey(), null)); + Map> startpointForTask = startpointForTaskMap.get(taskName); + startpointForTask.entrySet().stream().filter(x -> x.getValue().isPresent()).forEach(x -> deleteKeys.put(x.getKey(), taskName)); + Map> startpointWithPrecedence = ssps.stream() .collect(Collectors.toMap(ssp -> ssp, ssp -> resolveStartpointPrecendence(startpoint.get(ssp), startpointForTask.get(ssp)))); From 9497014fd36d7e02d45f429e146e530d8be9f22f Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Wed, 1 Apr 2020 10:31:46 -0700 Subject: [PATCH 4/8] Making read once --- .../ClusterBasedJobCoordinator.java | 4 +++- .../samza/startpoint/StartpointManager.java | 16 +++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index e155b58b09..08192d295f 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -266,7 +266,9 @@ public void run() { StartpointManager startpointManager = createStartpointManager(); startpointManager.start(); try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); + if () { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); + } } finally { startpointManager.stop(); } diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index 687f5f7453..1b40fba07a 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -160,13 +160,16 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp * It is empty if it does not exist or if it is too stale. */ public Optional readStartpoint(SystemStreamPartition ssp) { - return readStartpointMap(Collections.singletonMap(null, Collections.singleton(ssp)), false).get(null) + Map startpointBytes = readWriteStore.all(); + // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name + return readStartpointMap(startpointBytes, Collections.singletonMap(null, Collections.singleton(ssp)), false).get(null) .get(ssp); } @VisibleForTesting public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { - return readStartpointMap(Collections.singletonMap(taskName, Collections.singleton(ssp)), false).get(taskName) + Map startpointBytes = readWriteStore.all(); + return readStartpointMap(startpointBytes, Collections.singletonMap(taskName, Collections.singleton(ssp)), false).get(taskName) .get(ssp); } @@ -178,12 +181,10 @@ public Optional readStartpoint(SystemStreamPartition ssp, TaskName t * It is empty if it does not exist or if it is too stale. */ public Map>> readStartpointMap( - Map> taskToSSPs, boolean readSSPKeyOnly) { + Map startpointBytes, Map> taskToSSPs, boolean readSSPKeyOnly) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Map>> retVal = new HashMap<>(); - Map startpointBytes = readWriteStore.all(); - for (TaskName taskName : taskToSSPs.keySet()) { for (SystemStreamPartition ssp : taskToSSPs.get(taskName)) { Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); @@ -267,8 +268,9 @@ public Map> fanOut(Map deleteKeys = HashMultimap.create(); HashMap fanOuts = new HashMap<>(); - Map>> startpointMap = readStartpointMap(taskToSSPs, true); // Read SSP-only key - Map>> startpointForTaskMap = readStartpointMap(taskToSSPs, false); // Read SSP+taskname key + Map startpointBytes = readWriteStore.all(); + Map>> startpointMap = readStartpointMap(startpointBytes, taskToSSPs, true); // Read SSP-only key + Map>> startpointForTaskMap = readStartpointMap(startpointBytes, taskToSSPs, false); // Read SSP+taskname key for (TaskName taskName : taskToSSPs.keySet()) { Set ssps = taskToSSPs.get(taskName); From e36107973554517d6f58ad3822a0047359ea542f Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Wed, 1 Apr 2020 10:54:23 -0700 Subject: [PATCH 5/8] Major refactor adding config --- .../ClusterBasedJobCoordinator.java | 2 +- .../samza/config/ClusterManagerConfig.java | 5 + .../samza/startpoint/StartpointManager.java | 97 ++++++++----------- 3 files changed, 49 insertions(+), 55 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 08192d295f..b54efca680 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -266,7 +266,7 @@ public void run() { StartpointManager startpointManager = createStartpointManager(); startpointManager.start(); try { - if () { + if (!new ClusterManagerConfig(config).getStartpointFanoutDisabled()) { startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); } } finally { diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java index 231bdda58c..df2915a512 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -127,6 +127,7 @@ public class ClusterManagerConfig extends MapConfig { */ private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled"; private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled"; + private static final String CLUSTER_MANAGER_STARTPOINT_FANOUT_DISABLED = "job.startpoint.fanout.disabled"; public ClusterManagerConfig(Config config) { super(config); @@ -263,4 +264,8 @@ public boolean getJmxEnabledOnJobCoordinator() { return true; } } + + public boolean getStartpointFanoutDisabled() { + return getBoolean(CLUSTER_MANAGER_STARTPOINT_FANOUT_DISABLED, false); + } } diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index 1b40fba07a..841b46df12 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -159,62 +158,54 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}. * It is empty if it does not exist or if it is too stale. */ + @VisibleForTesting public Optional readStartpoint(SystemStreamPartition ssp) { Map startpointBytes = readWriteStore.all(); // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name - return readStartpointMap(startpointBytes, Collections.singletonMap(null, Collections.singleton(ssp)), false).get(null) - .get(ssp); + return readStartpoint(startpointBytes, ssp, null); } + /** + * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}. + * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. + * @param taskName the {@link TaskName} to fetch the {@link Startpoint} for. + * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}. + * It is empty if it does not exist or if it is too stale. + */ @VisibleForTesting public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { Map startpointBytes = readWriteStore.all(); - return readStartpointMap(startpointBytes, Collections.singletonMap(taskName, Collections.singleton(ssp)), false).get(taskName) - .get(ssp); + // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name + return readStartpoint(startpointBytes, ssp, taskName); } /** - * Returns the {@link Startpoint} for all {@link SystemStreamPartition} and {@link TaskName}s. - * @param taskToSSPs Map of {@link TaskName} to {@link SystemStreamPartition} to fetch startpoints for. - * @param readSSPKeyOnly true if the key to be used comprises of SSP only or SSP+Taskname. - * @return Map of {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}. - * It is empty if it does not exist or if it is too stale. + * Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}. + * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. + * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for. + * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}. + * It is empty if it does not exist or if it is too stale. */ - public Map>> readStartpointMap( - Map startpointBytes, Map> taskToSSPs, boolean readSSPKeyOnly) { + public Optional readStartpoint(Map startpointMap, SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); - Map>> retVal = new HashMap<>(); + Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); - for (TaskName taskName : taskToSSPs.keySet()) { - for (SystemStreamPartition ssp : taskToSSPs.get(taskName)) { - Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); - - String readWriteStoreKey = readSSPKeyOnly ? toReadWriteStoreKey(ssp, null) : toReadWriteStoreKey(ssp, taskName); - byte[] startpointBytesForTask = startpointBytes.get(readWriteStoreKey); - - // pre-populate the empty value for the taskname and ssp - retVal.putIfAbsent(taskName, new HashMap<>()); - retVal.get(taskName).put(ssp, Optional.empty()); - - if (ArrayUtils.isNotEmpty(startpointBytesForTask)) { - try { - Startpoint startpoint = objectMapper.readValue(startpointBytesForTask, Startpoint.class); - if (Instant.now() - .minus(DEFAULT_EXPIRATION_DURATION) - .isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) { - retVal.get(taskName).put(ssp, Optional.of(startpoint)); // return if deserializable and if not stale - } else { - LOG.warn("Creation timestamp: {} of startpoint: {} has crossed the expiration duration: {}. Ignoring it", - startpoint.getCreationTimestamp(), startpoint, DEFAULT_EXPIRATION_DURATION); - } - } catch (IOException ex) { - throw new SamzaException(ex); - } + byte[] startpointBytes = startpointMap.get(toReadWriteStoreKey(ssp, taskName)); + + if (ArrayUtils.isNotEmpty(startpointBytes)) { + try { + Startpoint startpoint = objectMapper.readValue(startpointBytes, Startpoint.class); + if (Instant.now().minus(DEFAULT_EXPIRATION_DURATION).isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) { + return Optional.of(startpoint); // return if deserializable and if not stale } + LOG.warn("Creation timestamp: {} of startpoint: {} has crossed the expiration duration: {}. Ignoring it", + startpoint.getCreationTimestamp(), startpoint, DEFAULT_EXPIRATION_DURATION); + } catch (IOException ex) { + throw new SamzaException(ex); } } - return retVal; + return Optional.empty(); } @@ -267,10 +258,7 @@ public Map> fanOut(Map deleteKeys = HashMultimap.create(); HashMap fanOuts = new HashMap<>(); - - Map startpointBytes = readWriteStore.all(); - Map>> startpointMap = readStartpointMap(startpointBytes, taskToSSPs, true); // Read SSP-only key - Map>> startpointForTaskMap = readStartpointMap(startpointBytes, taskToSSPs, false); // Read SSP+taskname key + Map startpointMap = readWriteStore.all(); for (TaskName taskName : taskToSSPs.keySet()) { Set ssps = taskToSSPs.get(taskName); @@ -278,20 +266,21 @@ public Map> fanOut(Map startpoint = readStartpoint(startpointMap, ssp, null); // Read SSP-only key + startpoint.ifPresent(sp -> deleteKeys.put(ssp, null)); - Map> startpoint = startpointMap.get(taskName); - startpoint.entrySet().stream().filter(x -> x.getValue().isPresent()).forEach(x -> deleteKeys.put(x.getKey(), null)); + Optional startpointForTask = readStartpoint(startpointMap, ssp, taskName); // Read SSP+taskName key + startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName)); - Map> startpointForTask = startpointForTaskMap.get(taskName); - startpointForTask.entrySet().stream().filter(x -> x.getValue().isPresent()).forEach(x -> deleteKeys.put(x.getKey(), taskName)); + Optional startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); + if (!startpointWithPrecedence.isPresent()) { + continue; + } - Map> startpointWithPrecedence = ssps.stream() - .collect(Collectors.toMap(ssp -> ssp, - ssp -> resolveStartpointPrecendence(startpoint.get(ssp), startpointForTask.get(ssp)))); - startpointWithPrecedence.entrySet().stream().filter(x -> x.getValue().isPresent()).forEach(x -> { - fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); - fanOuts.get(taskName).getFanOuts().put(x.getKey(), x.getValue().get()); - }); + fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); + fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence.get()); + } } if (fanOuts.isEmpty()) { From 23290222031cd2d9f73f1eef7c4432fda43e02b9 Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Wed, 1 Apr 2020 10:56:00 -0700 Subject: [PATCH 6/8] Making config as "enabled" --- .../samza/clustermanager/ClusterBasedJobCoordinator.java | 2 +- .../java/org/apache/samza/config/ClusterManagerConfig.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index b54efca680..16b2b4fb55 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -266,7 +266,7 @@ public void run() { StartpointManager startpointManager = createStartpointManager(); startpointManager.start(); try { - if (!new ClusterManagerConfig(config).getStartpointFanoutDisabled()) { + if (new ClusterManagerConfig(config).getStartpointFanoutEnabled()) { startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); } } finally { diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java index df2915a512..6cbb0a5469 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -127,7 +127,7 @@ public class ClusterManagerConfig extends MapConfig { */ private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled"; private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled"; - private static final String CLUSTER_MANAGER_STARTPOINT_FANOUT_DISABLED = "job.startpoint.fanout.disabled"; + private static final String CLUSTER_MANAGER_STARTPOINT_FANOUT_ENABLED = "job.startpoint.fanout.enabled"; public ClusterManagerConfig(Config config) { super(config); @@ -265,7 +265,7 @@ public boolean getJmxEnabledOnJobCoordinator() { } } - public boolean getStartpointFanoutDisabled() { - return getBoolean(CLUSTER_MANAGER_STARTPOINT_FANOUT_DISABLED, false); + public boolean getStartpointFanoutEnabled() { + return getBoolean(CLUSTER_MANAGER_STARTPOINT_FANOUT_ENABLED, true); } } From 099730963a7f42732e3d3002806c50388733e6bc Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Wed, 1 Apr 2020 12:22:38 -0700 Subject: [PATCH 7/8] some minor comment changes --- .../main/java/org/apache/samza/startpoint/StartpointManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index 841b46df12..f0a4cc3083 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -175,7 +175,6 @@ public Optional readStartpoint(SystemStreamPartition ssp) { @VisibleForTesting public Optional readStartpoint(SystemStreamPartition ssp, TaskName taskName) { Map startpointBytes = readWriteStore.all(); - // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name return readStartpoint(startpointBytes, ssp, taskName); } From 7965d51c2e7879efd54825ced4733d5c4c5b508e Mon Sep 17 00:00:00 2001 From: Ray Manpreet Singh Matharu Date: Wed, 1 Apr 2020 15:57:43 -0700 Subject: [PATCH 8/8] Adding jobconfig for startpoints --- .../ClusterBasedJobCoordinator.java | 14 +++++++------- .../samza/config/ClusterManagerConfig.java | 5 ----- .../java/org/apache/samza/config/JobConfig.java | 6 ++++++ .../apache/samza/processor/StreamProcessor.java | 4 +++- .../samza/runtime/ContainerLaunchUtil.java | 5 ++++- .../org/apache/samza/zk/ZkJobCoordinator.java | 16 +++++++++------- .../samza/job/local/ProcessJobFactory.scala | 16 +++++++++------- .../samza/job/local/ThreadJobFactory.scala | 16 +++++++++------- 8 files changed, 47 insertions(+), 35 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 16b2b4fb55..0cb80b20fc 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -262,15 +262,15 @@ public void run() { MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config); metadataResourceUtil.createResources(); - // fan out the startpoints - StartpointManager startpointManager = createStartpointManager(); - startpointManager.start(); - try { - if (new ClusterManagerConfig(config).getStartpointFanoutEnabled()) { + // fan out the startpoints if startpoints is enabled + if (new JobConfig(config).getStartpointEnabled()) { + StartpointManager startpointManager = createStartpointManager(); + startpointManager.start(); + try { startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); + } finally { + startpointManager.stop(); } - } finally { - startpointManager.stop(); } // Remap changelog partitions to tasks diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java index 6cbb0a5469..231bdda58c 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -127,7 +127,6 @@ public class ClusterManagerConfig extends MapConfig { */ private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled"; private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled"; - private static final String CLUSTER_MANAGER_STARTPOINT_FANOUT_ENABLED = "job.startpoint.fanout.enabled"; public ClusterManagerConfig(Config config) { super(config); @@ -264,8 +263,4 @@ public boolean getJmxEnabledOnJobCoordinator() { return true; } } - - public boolean getStartpointFanoutEnabled() { - return getBoolean(CLUSTER_MANAGER_STARTPOINT_FANOUT_ENABLED, true); - } } diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index 37bfe37118..dff2991da2 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -146,6 +146,8 @@ public class JobConfig extends MapConfig { public static final String CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED = "samza.cluster.based.job.coordinator.dependency.isolation.enabled"; + private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled"; + public JobConfig(Config config) { super(config); } @@ -404,4 +406,8 @@ public String getCoordinatorStreamFactory() { public Optional getConfigLoaderFactory() { return Optional.ofNullable(get(CONFIG_LOADER_FACTORY)); } + + public boolean getStartpointEnabled() { + return getBoolean(JOB_STARTPOINT_ENABLED, true); + } } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 5a395207d0..ed0c875295 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -396,8 +396,10 @@ SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { // Metadata store lifecycle managed outside of the SamzaContainer. // All manager lifecycles are managed in the SamzaContainer including startpointManager StartpointManager startpointManager = null; - if (metadataStore != null) { + if (metadataStore != null && new JobConfig(config).getStartpointEnabled()) { startpointManager = new StartpointManager(metadataStore); + } else if (!new JobConfig(config).getStartpointEnabled()) { + LOGGER.warn("StartpointManager not instantiated because startpoints is not enabled"); } else { LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor"); } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index c68ec03b2d..40d3d5c2f6 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -102,7 +102,10 @@ private static void run( LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE)); // StartpointManager wraps the coordinatorStreamStore in the namespaces internally - StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore); + StartpointManager startpointManager = null; + if (new JobConfig(config).getStartpointEnabled()) { + startpointManager = new StartpointManager(coordinatorStreamStore); + } Map metricsReporters = loadMetricsReporters(appDesc, containerId, config); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index feaabba34b..86c1f06345 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -326,13 +326,15 @@ void loadMetadataResources(JobModel jobModel) { } configStore.flush(); - // fan out the startpoints - StartpointManager startpointManager = createStartpointManager(); - startpointManager.start(); - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); - } finally { - startpointManager.stop(); + if (new JobConfig(config).getStartpointEnabled()) { + // fan out the startpoints + StartpointManager startpointManager = createStartpointManager(); + startpointManager.start(); + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)); + } finally { + startpointManager.stop(); + } } } else { LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out."); diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index fab8c6ee69..32649329b2 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -91,13 +91,15 @@ class ProcessJobFactory extends StreamJobFactory with Logging { val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config) metadataResourceUtil.createResources() - // fan out the startpoints - val startpointManager = new StartpointManager(coordinatorStreamStore) - startpointManager.start() - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) - } finally { - startpointManager.stop() + if (new JobConfig(config).getStartpointEnabled()) { + // fan out the startpoints + val startpointManager = new StartpointManager(coordinatorStreamStore) + startpointManager.start() + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) + } finally { + startpointManager.stop() + } } val taskConfig = new TaskConfig(config) diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index c1a36837f5..79bd1818f3 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -91,13 +91,15 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config) metadataResourceUtil.createResources() - // fan out the startpoints - val startpointManager = new StartpointManager(coordinatorStreamStore) - startpointManager.start() - try { - startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) - } finally { - startpointManager.stop() + if (new JobConfig(config).getStartpointEnabled()) { + // fan out the startpoints + val startpointManager = new StartpointManager(coordinatorStreamStore) + startpointManager.start() + try { + startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel)) + } finally { + startpointManager.stop() + } } val containerId = "0"