diff --git a/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignal.java b/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignal.java new file mode 100644 index 0000000000..8838595fc1 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignal.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator.lifecycle; + +/** + * Interface for defining how to trigger a restart of a Samza job. This will be called when the Samza + * framework determines that a job restart is needed. For example, if the partition count of an input stream + * changes, then that means the job model needs to change, and restarting the job will update the job model. + */ +public interface JobRestartSignal { + /** + * Trigger a restart of the Samza job. This method should trigger the restart asynchronously, because the + * caller of this method is part of the Samza job which is going to be restarted. It is not necessary that + * the restart needs to actually happen immediately, as the job will continue to run until the restart + * actually happens. + */ + void restartJob(); +} diff --git a/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignalFactory.java b/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignalFactory.java new file mode 100644 index 0000000000..1ce48a545a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignalFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator.lifecycle; + +/** + * See {@link JobRestartSignal}. + */ +public interface JobRestartSignalFactory { + JobRestartSignal build(JobRestartSignalFactoryContext context); +} \ No newline at end of file diff --git a/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignalFactoryContext.java b/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignalFactoryContext.java new file mode 100644 index 0000000000..80bd16fb57 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/coordinator/lifecycle/JobRestartSignalFactoryContext.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator.lifecycle; + +import org.apache.samza.config.Config; + + +/** + * Contains objects that are needed to build a {@link JobRestartSignal}. + * Having this class allows {@link JobRestartSignalFactory#build} to remain unchanged if additional components + * are needed in the future. Update this class if additional components are needed building {@link JobRestartSignal}. + */ +public class JobRestartSignalFactoryContext { + private final Config config; + + public JobRestartSignalFactoryContext(Config config) { + this.config = config; + } + + /** + * {@link Config} used to build a {@link JobRestartSignal}. + */ + public Config getConfig() { + return config; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java index cb849566a8..053b913e9a 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java @@ -114,7 +114,6 @@ private static void runJobCoordinator(String jobCoordinatorClassName, MetricsReg JobCoordinator jobCoordinator = jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER, finalConfig, metrics, metadataStore); - addShutdownHook(jobCoordinator); Map metricsReporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME); metricsReporters.values() @@ -123,6 +122,7 @@ private static void runJobCoordinator(String jobCoordinatorClassName, MetricsReg CountDownLatch waitForShutdownLatch = new CountDownLatch(1); jobCoordinator.setListener(new NoProcessorJobCoordinatorListener(waitForShutdownLatch)); jobCoordinator.start(); + addShutdownHook(jobCoordinator); try { waitForShutdownLatch.await(); } catch (InterruptedException e) { diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index c97628c036..eed704c3f6 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.samza.config; import java.util.Optional; import com.google.common.base.Strings; import org.apache.samza.SamzaException; import org.apache.samza.coordinator.CoordinationUtilsFactory; +import org.apache.samza.coordinator.lifecycle.NoOpJobRestartSignalFactory; import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.util.ReflectionUtil; @@ -32,8 +32,11 @@ public class JobCoordinatorConfig extends MapConfig { public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName(); + public static final String JOB_RESTART_SIGNAL_FACTORY = "job.coordinator.restart.signal.factory"; + private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory"; private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory"; + private static final String DEFAULT_JOB_RESTART_SIGNAL_FACTORY = NoOpJobRestartSignalFactory.class.getName(); public JobCoordinatorConfig(Config config) { super(config); @@ -79,4 +82,8 @@ public String getJobCoordinatorFactoryClassName() { public Optional getOptionalJobCoordinatorFactoryClassName() { return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY)); } + + public String getJobRestartSignalFactory() { + return get(JOB_RESTART_SIGNAL_FACTORY, DEFAULT_JOB_RESTART_SIGNAL_FACTORY); + } } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java new file mode 100644 index 0000000000..2eb7c506a8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for managing multiple monitors. + */ +public class JobModelMonitors { + private static final Logger LOG = LoggerFactory.getLogger(JobModelMonitors.class); + + private final Optional streamPartitionCountMonitor; + private final Optional streamRegexMonitor; + + private final AtomicBoolean started = new AtomicBoolean(false); + + public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor, + StreamRegexMonitor streamRegexMonitor) { + this.streamPartitionCountMonitor = Optional.ofNullable(streamPartitionCountMonitor); + this.streamRegexMonitor = Optional.ofNullable(streamRegexMonitor); + } + + public void start() { + if (this.started.compareAndSet(false, true)) { + this.streamPartitionCountMonitor.ifPresent(StreamPartitionCountMonitor::start); + this.streamRegexMonitor.ifPresent(StreamRegexMonitor::start); + } else { + LOG.warn("Monitors already started"); + } + } + + public void stop() { + if (this.started.compareAndSet(true, false)) { + this.streamPartitionCountMonitor.ifPresent(StreamPartitionCountMonitor::stop); + this.streamRegexMonitor.ifPresent(StreamRegexMonitor::stop); + } else { + LOG.warn("Monitors already stopped"); + } + } +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java index daf0cc789f..46866c8767 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java @@ -32,7 +32,7 @@ public interface CoordinatorCommunication { void start(); /** - * Stop the communication components. This may be called even if {@link #start()} has not yet been called. + * Stop the communication components. */ void stop(); } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/lifecycle/NoOpJobRestartSignal.java b/samza-core/src/main/java/org/apache/samza/coordinator/lifecycle/NoOpJobRestartSignal.java new file mode 100644 index 0000000000..23c54b8253 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/lifecycle/NoOpJobRestartSignal.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator.lifecycle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Placeholder implementation for {@link JobRestartSignal}. + * If a use case requires job restarts, then a real implementation should be used. + */ +public class NoOpJobRestartSignal implements JobRestartSignal { + private static final Logger LOG = LoggerFactory.getLogger(NoOpJobRestartSignal.class); + + @Override + public void restartJob() { + LOG.info("Job restart signalled, but job restart is no-op for this class"); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/lifecycle/NoOpJobRestartSignalFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/lifecycle/NoOpJobRestartSignalFactory.java new file mode 100644 index 0000000000..61804c2aa5 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/lifecycle/NoOpJobRestartSignalFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator.lifecycle; + +public class NoOpJobRestartSignalFactory implements JobRestartSignalFactory { + @Override + public JobRestartSignal build(JobRestartSignalFactoryContext context) { + return new NoOpJobRestartSignal(); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java index 9595f90844..9f4810e6d5 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java @@ -21,15 +21,22 @@ import java.io.IOException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.coordinator.JobModelHelper; +import org.apache.samza.coordinator.JobModelMonitors; import org.apache.samza.coordinator.MetadataResourceUtil; +import org.apache.samza.coordinator.StreamPartitionCountMonitor; +import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory; +import org.apache.samza.coordinator.StreamRegexMonitor; +import org.apache.samza.coordinator.StreamRegexMonitorFactory; import org.apache.samza.coordinator.communication.CoordinatorCommunication; import org.apache.samza.coordinator.communication.JobInfoServingContext; +import org.apache.samza.coordinator.lifecycle.JobRestartSignal; import org.apache.samza.job.JobCoordinatorMetadata; import org.apache.samza.job.JobMetadataChange; import org.apache.samza.job.metadata.JobCoordinatorMetadataManager; @@ -58,26 +65,48 @@ public class StaticResourceJobCoordinator implements JobCoordinator { private final JobInfoServingContext jobModelServingContext; private final CoordinatorCommunication coordinatorCommunication; private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager; + private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory; + private final StreamRegexMonitorFactory streamRegexMonitorFactory; private final Optional startpointManager; private final ChangelogStreamManager changelogStreamManager; + private final JobRestartSignal jobRestartSignal; private final MetricsRegistry metrics; private final SystemAdmins systemAdmins; private final String processorId; private final Config config; - private Optional currentJobModel = Optional.empty(); private Optional jobCoordinatorListener = Optional.empty(); - StaticResourceJobCoordinator(String processorId, JobModelHelper jobModelHelper, JobInfoServingContext jobModelServingContext, - CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager, - StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics, + /** + * Job model is calculated during {@link #start()}, so it is not immediately available. + */ + private Optional currentJobModel = Optional.empty(); + /** + * {@link JobModelMonitors} depend on job model, so they are only available after {@link #start()}. + */ + private Optional currentJobModelMonitors = Optional.empty(); + /** + * Keeps track of if the job coordinator has completed all preparation for running the job, including + * publishing a new job model and starting the job model monitors. + */ + private AtomicBoolean jobPreparationComplete = new AtomicBoolean(false); + + StaticResourceJobCoordinator(String processorId, JobModelHelper jobModelHelper, + JobInfoServingContext jobModelServingContext, CoordinatorCommunication coordinatorCommunication, + JobCoordinatorMetadataManager jobCoordinatorMetadataManager, + StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory, + StreamRegexMonitorFactory streamRegexMonitorFactory, StartpointManager startpointManager, + ChangelogStreamManager changelogStreamManager, JobRestartSignal jobRestartSignal, MetricsRegistry metrics, SystemAdmins systemAdmins, Config config) { this.jobModelHelper = jobModelHelper; this.jobModelServingContext = jobModelServingContext; this.coordinatorCommunication = coordinatorCommunication; this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager; + this.streamPartitionCountMonitorFactory = streamPartitionCountMonitorFactory; + this.streamRegexMonitorFactory = streamRegexMonitorFactory; this.startpointManager = Optional.ofNullable(startpointManager); this.changelogStreamManager = changelogStreamManager; + this.jobRestartSignal = jobRestartSignal; this.metrics = metrics; this.systemAdmins = systemAdmins; this.processorId = processorId; @@ -92,13 +121,31 @@ public void start() { try { JobModel jobModel = newJobModel(); doSetLoggingContextConfig(jobModel.getConfig()); + // monitors should be created right after job model is calculated (see jobModelMonitors() for more details) + JobModelMonitors jobModelMonitors = jobModelMonitors(jobModel); JobCoordinatorMetadata newMetadata = this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig()); Set jobMetadataChanges = checkForMetadataChanges(newMetadata); - prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges); - this.coordinatorCommunication.start(); - this.currentJobModel = Optional.of(jobModel); - this.jobCoordinatorListener.ifPresent(listener -> listener.onNewJobModel(this.processorId, jobModel)); + if (!jobMetadataChanges.isEmpty() && !jobMetadataChanges.contains(JobMetadataChange.NEW_DEPLOYMENT)) { + /* + * If the job coordinator comes up, but not due to a new deployment, and the metadata changed, then trigger a + * restart. This case applies if the job coordinator died and the job model needed to change while it was down. + * If there were no metadata changes, then just let the current workers continue to run. + * If there was a new deployment (which includes the case where the coordinator requested a restart), then we + * rely on the external resource manager to make sure the previous workers restarted, so we don't need to + * restart again. + */ + LOG.info("Triggering job restart"); + this.jobRestartSignal.restartJob(); + } else { + prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges); + this.coordinatorCommunication.start(); + this.currentJobModel = Optional.of(jobModel); + this.jobCoordinatorListener.ifPresent(listener -> listener.onNewJobModel(this.processorId, jobModel)); + this.currentJobModelMonitors = Optional.of(jobModelMonitors); + jobModelMonitors.start(); + this.jobPreparationComplete.set(true); + } } catch (Exception e) { LOG.error("Error while running job coordinator; exiting", e); throw new SamzaException("Error while running job coordinator", e); @@ -109,7 +156,10 @@ public void start() { public void stop() { try { this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onJobModelExpired); - this.coordinatorCommunication.stop(); + if (this.jobPreparationComplete.get()) { + this.currentJobModelMonitors.ifPresent(JobModelMonitors::stop); + this.coordinatorCommunication.stop(); + } this.startpointManager.ifPresent(StartpointManager::stop); this.systemAdmins.stop(); } finally { @@ -136,6 +186,24 @@ private JobModel newJobModel() { return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping()); } + /* + * Possible race condition: The partition count monitor queries for stream metadata when it is created, so if the + * partition counts changed between the job model calculation and the creation of the partition count monitor, then + * the monitor will not trigger an update to the job model. This method should be called right after calculating the + * job model, in order to reduce the possible time in which a partition count change is missed. This issue also + * exists in the older ClusterBasedJobCoordinator. + * TODO This wouldn't be a problem if the partition count monitor used the job model to calculate initial metadata + */ + private JobModelMonitors jobModelMonitors(JobModel jobModel) { + StreamPartitionCountMonitor streamPartitionCountMonitor = + this.streamPartitionCountMonitorFactory.build(jobModel.getConfig(), + streamsChanged -> this.jobRestartSignal.restartJob()); + Optional streamRegexMonitor = + this.streamRegexMonitorFactory.build(jobModel, jobModel.getConfig(), + (initialInputSet, newInputStreams, regexesMonitored) -> this.jobRestartSignal.restartJob()); + return new JobModelMonitors(streamPartitionCountMonitor, streamRegexMonitor.orElse(null)); + } + /** * This is a helper method so that we can verify it is called in testing. */ diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java index 23ee54cc24..718778d374 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java @@ -20,6 +20,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.container.LocalityManager; import org.apache.samza.container.grouper.task.TaskAssignmentManager; import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager; @@ -27,10 +28,15 @@ import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.coordinator.JobModelCalculator; import org.apache.samza.coordinator.JobModelHelper; +import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory; +import org.apache.samza.coordinator.StreamRegexMonitorFactory; import org.apache.samza.coordinator.communication.CoordinatorCommunication; import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext; import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory; import org.apache.samza.coordinator.communication.JobInfoServingContext; +import org.apache.samza.coordinator.lifecycle.JobRestartSignal; +import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory; +import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; @@ -45,6 +51,7 @@ import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; +import org.apache.samza.util.ReflectionUtil; import org.apache.samza.util.SystemClock; @@ -63,14 +70,22 @@ public JobCoordinator getJobCoordinator(String processorId, Config config, Metri JobCoordinatorMetadataManager.ClusterType.NON_YARN, metricsRegistry); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE)); + JobRestartSignal jobRestartSignal = + ReflectionUtil.getObj(new JobCoordinatorConfig(config).getJobRestartSignalFactory(), + JobRestartSignalFactory.class).build(new JobRestartSignalFactoryContext(config)); StartpointManager startpointManager = jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null; SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName()); StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance()); JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache); + StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory = + new StreamPartitionCountMonitorFactory(streamMetadataCache, metricsRegistry); + StreamRegexMonitorFactory streamRegexMonitorFactory = + new StreamRegexMonitorFactory(streamMetadataCache, metricsRegistry); return new StaticResourceJobCoordinator(processorId, jobModelHelper, jobModelServingContext, - coordinatorCommunication, jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, - metricsRegistry, systemAdmins, config); + coordinatorCommunication, jobCoordinatorMetadataManager, streamPartitionCountMonitorFactory, + streamRegexMonitorFactory, startpointManager, changelogStreamManager, jobRestartSignal, metricsRegistry, + systemAdmins, config); } private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore, diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java index 9a95d33275..4c1ba7f04c 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java @@ -53,7 +53,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -116,12 +115,6 @@ public void testRunJobCoordinator() throws Exception { CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class); JobCoordinatorFactory jobCoordinatorFactory = mock(JobCoordinatorFactory.class); JobCoordinator jobCoordinator = mock(JobCoordinator.class); - // use a latch to keep track of when start has been called - CountDownLatch jobCoordinatorStartedLatch = new CountDownLatch(1); - doAnswer(invocation -> { - jobCoordinatorStartedLatch.countDown(); - return null; - }).when(jobCoordinator).start(); PowerMockito.mockStatic(CoordinatorStreamUtil.class); PowerMockito.doNothing().when(CoordinatorStreamUtil.class, "createCoordinatorStream", any()); @@ -136,8 +129,13 @@ public void testRunJobCoordinator() throws Exception { .when(ReflectionUtil.class, "getObj", jobCoordinatorFactoryClass, JobCoordinatorFactory.class); when(jobCoordinatorFactory.getJobCoordinator(eq("samza-job-coordinator"), eq(finalConfig), any(), eq(coordinatorStreamStore))).thenReturn(jobCoordinator); + // use a latch to keep track of when shutdown hook was added to know when we should start verifications + CountDownLatch addShutdownHookLatch = new CountDownLatch(1); PowerMockito.spy(JobCoordinatorLaunchUtil.class); - PowerMockito.doNothing().when(JobCoordinatorLaunchUtil.class, "addShutdownHook", any()); + PowerMockito.doAnswer(invocation -> { + addShutdownHookLatch.countDown(); + return null; + }).when(JobCoordinatorLaunchUtil.class, "addShutdownHook", any()); MetricsReporter metricsReporter = mock(MetricsReporter.class); Map metricsReporterMap = ImmutableMap.of("reporter", metricsReporter); PowerMockito.mockStatic(MetricsReporterLoader.class); @@ -148,8 +146,8 @@ public void testRunJobCoordinator() throws Exception { Thread runThread = new Thread(() -> JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig)); runThread.start(); - // wait for job coordinator to be started before doing verifications - jobCoordinatorStartedLatch.await(); + // last thing before waiting for shutdown is to add shutdown hook, so do verifications once hook is added + addShutdownHookLatch.await(); verifyStatic(); CoordinatorStreamUtil.createCoordinatorStream(fullConfig); diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java index ac7425fb51..ffb80f9342 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java @@ -19,6 +19,7 @@ package org.apache.samza.config; import com.google.common.collect.ImmutableMap; +import org.apache.samza.coordinator.lifecycle.NoOpJobRestartSignalFactory; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.junit.Test; @@ -50,4 +51,15 @@ public void getOptionalJobCoordinatorFactoryClassName() { ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.custom.MyJobCoordinatorFactory"))); assertTrue(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent()); } + + @Test + public void testGetJobRestartSignalFactory() { + assertEquals(NoOpJobRestartSignalFactory.class.getName(), + new JobCoordinatorConfig(new MapConfig()).getJobRestartSignalFactory()); + + JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(new MapConfig( + ImmutableMap.of(JobCoordinatorConfig.JOB_RESTART_SIGNAL_FACTORY, + "org.apache.samza.MyJobRestartSignalFactory"))); + assertEquals("org.apache.samza.MyJobRestartSignalFactory", jobCoordinatorConfig.getJobRestartSignalFactory()); + } } \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelMonitors.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelMonitors.java new file mode 100644 index 0000000000..8a09297e38 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelMonitors.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + + +public class TestJobModelMonitors { + @Mock + private StreamPartitionCountMonitor streamPartitionCountMonitor; + @Mock + private StreamRegexMonitor streamRegexMonitor; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testMonitorsExist() { + JobModelMonitors jobModelMonitors = new JobModelMonitors(this.streamPartitionCountMonitor, this.streamRegexMonitor); + + jobModelMonitors.start(); + verify(this.streamPartitionCountMonitor).start(); + verify(this.streamRegexMonitor).start(); + + jobModelMonitors.stop(); + verify(this.streamPartitionCountMonitor).stop(); + verify(this.streamRegexMonitor).stop(); + } + + @Test + public void testMissingMonitors() { + JobModelMonitors jobModelMonitors = new JobModelMonitors(null, null); + // expect no failures + jobModelMonitors.start(); + jobModelMonitors.stop(); + } + + @Test + public void testStopBeforeStart() { + JobModelMonitors jobModelMonitors = new JobModelMonitors(this.streamPartitionCountMonitor, this.streamRegexMonitor); + jobModelMonitors.stop(); + verifyZeroInteractions(this.streamPartitionCountMonitor, this.streamRegexMonitor); + } +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java index 30517d28fa..03d57769fb 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.regex.Pattern; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; @@ -30,8 +32,13 @@ import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.coordinator.JobModelHelper; import org.apache.samza.coordinator.MetadataResourceUtil; +import org.apache.samza.coordinator.StreamPartitionCountMonitor; +import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory; +import org.apache.samza.coordinator.StreamRegexMonitor; +import org.apache.samza.coordinator.StreamRegexMonitorFactory; import org.apache.samza.coordinator.communication.CoordinatorCommunication; import org.apache.samza.coordinator.communication.JobInfoServingContext; +import org.apache.samza.coordinator.lifecycle.JobRestartSignal; import org.apache.samza.job.JobCoordinatorMetadata; import org.apache.samza.job.JobMetadataChange; import org.apache.samza.job.metadata.JobCoordinatorMetadataManager; @@ -46,11 +53,13 @@ import org.apache.samza.system.SystemStreamPartition; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; @@ -61,6 +70,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @@ -88,10 +98,16 @@ public class TestStaticResourceJobCoordinator { @Mock private JobCoordinatorMetadataManager jobCoordinatorMetadataManager; @Mock + private StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory; + @Mock + private StreamRegexMonitorFactory streamRegexMonitorFactory; + @Mock private StartpointManager startpointManager; @Mock private ChangelogStreamManager changelogStreamManager; @Mock + private JobRestartSignal jobRestartSignal; + @Mock private Map changelogPartitionMapping; @Mock private MetricsRegistryMap metrics; @@ -110,8 +126,9 @@ public void setup() { when(this.changelogStreamManager.readPartitionMapping()).thenReturn(this.changelogPartitionMapping); this.staticResourceJobCoordinator = spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext, - this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.startpointManager, - this.changelogStreamManager, this.metrics, this.systemAdmins, this.config)); + this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory, + this.streamRegexMonitorFactory, this.startpointManager, this.changelogStreamManager, this.jobRestartSignal, + this.metrics, this.systemAdmins, this.config)); this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener); doNothing().when(this.staticResourceJobCoordinator).doSetLoggingContextConfig(any()); } @@ -120,15 +137,17 @@ public void setup() { public void testNoExistingJobModel() throws IOException { Config jobModelConfig = mock(Config.class); JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig); JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false); MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel); - this.staticResourceJobCoordinator.start(); assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel()); verifyStartLifecycle(); verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig); - verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, SINGLE_SSP_FANOUT); + verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, + streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT); verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel); } @@ -136,78 +155,204 @@ public void testNoExistingJobModel() throws IOException { public void testSameJobModelAsPrevious() throws IOException { Config jobModelConfig = mock(Config.class); JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig); setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(), true); MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel); - this.staticResourceJobCoordinator.start(); assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel()); verifyStartLifecycle(); verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig); - verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, null, null); + verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, + streamRegexMonitor, null, null); verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel); } + @Test + public void testSameDeploymentWithNewJobModel() throws IOException { + Config jobModelConfig = mock(Config.class); + JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig); + setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(JobMetadataChange.JOB_MODEL), true); + this.staticResourceJobCoordinator.start(); + verifyStartLifecycle(); + verify(this.jobRestartSignal).restartJob(); + assertNull(this.staticResourceJobCoordinator.getJobModel()); + verifyNoSideEffects(streamPartitionCountMonitor, streamRegexMonitor); + } + @Test public void testNewDeploymentNewJobModel() throws IOException { Config jobModelConfig = mock(Config.class); JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig); JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true); MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel); - this.staticResourceJobCoordinator.start(); assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel()); verifyStartLifecycle(); verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig); - verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, SINGLE_SSP_FANOUT); + verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, + streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT); verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel); } - @Test - public void testStop() { - this.staticResourceJobCoordinator.stop(); - - verify(this.jobCoordinatorListener).onJobModelExpired(); - verify(this.coordinatorCommunication).stop(); - verify(this.startpointManager).stop(); - verify(this.systemAdmins).stop(); - verify(this.jobCoordinatorListener).onCoordinatorStop(); - } - /** - * Missing {@link StartpointManager} and {@link JobCoordinatorListener}. + * Missing {@link StartpointManager}, {@link JobCoordinatorListener}, {@link StreamRegexMonitor} */ @Test public void testStartMissingOptionalComponents() throws IOException { this.staticResourceJobCoordinator = spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext, - this.coordinatorCommunication, this.jobCoordinatorMetadataManager, null, this.changelogStreamManager, - this.metrics, this.systemAdmins, this.config)); - + this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory, + this.streamRegexMonitorFactory, null, this.changelogStreamManager, this.jobRestartSignal, this.metrics, + this.systemAdmins, this.config)); Config jobModelConfig = mock(Config.class); JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + when(this.streamRegexMonitorFactory.build(any(), any(), any())).thenReturn(Optional.empty()); JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false); MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel); - this.staticResourceJobCoordinator.start(); assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel()); verify(this.systemAdmins).start(); verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig); - verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, null); + verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, null, + newMetadata, null); + verifyZeroInteractions(this.jobCoordinatorListener, this.startpointManager); + } + + @Test + public void testStopAfterStart() { + Config jobModelConfig = mock(Config.class); + JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig); + setupJobCoordinatorMetadata(jobModel, jobModelConfig, + ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false); + metadataResourceUtil(jobModel); + // call start in order to set up monitors + this.staticResourceJobCoordinator.start(); + // call stop to check that the expected components get shut down + this.staticResourceJobCoordinator.stop(); + + verify(this.jobCoordinatorListener).onJobModelExpired(); + verify(streamPartitionCountMonitor).stop(); + verify(streamRegexMonitor).stop(); + verify(this.coordinatorCommunication).stop(); + verify(this.startpointManager).stop(); + verify(this.systemAdmins).stop(); + verify(this.jobCoordinatorListener).onCoordinatorStop(); } @Test public void testStopMissingOptionalComponents() { this.staticResourceJobCoordinator = spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext, - this.coordinatorCommunication, this.jobCoordinatorMetadataManager, null, this.changelogStreamManager, - this.metrics, this.systemAdmins, this.config)); + this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory, + this.streamRegexMonitorFactory, null, this.changelogStreamManager, this.jobRestartSignal, this.metrics, + this.systemAdmins, this.config)); + Config jobModelConfig = mock(Config.class); + JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + when(this.streamRegexMonitorFactory.build(any(), any(), any())).thenReturn(Optional.empty()); + setupJobCoordinatorMetadata(jobModel, jobModelConfig, + ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false); + metadataResourceUtil(jobModel); + // call start in order to set up monitors + this.staticResourceJobCoordinator.start(); this.staticResourceJobCoordinator.stop(); + verify(streamPartitionCountMonitor).stop(); verify(this.coordinatorCommunication).stop(); verify(this.systemAdmins).stop(); + verifyZeroInteractions(this.jobCoordinatorListener); + } + + @Test + public void testStopWithoutStart() { + this.staticResourceJobCoordinator.stop(); + + verify(this.jobCoordinatorListener).onJobModelExpired(); + verify(this.startpointManager).stop(); + verify(this.systemAdmins).stop(); + verify(this.jobCoordinatorListener).onCoordinatorStop(); + verifyZeroInteractions(this.coordinatorCommunication, this.streamPartitionCountMonitorFactory, + this.streamRegexMonitorFactory); + } + + @Test + public void testPartitionCountChange() throws IOException { + Config jobModelConfig = mock(Config.class); + JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = mock(StreamPartitionCountMonitor.class); + ArgumentCaptor callbackArgumentCaptor = + ArgumentCaptor.forClass(StreamPartitionCountMonitor.Callback.class); + when( + this.streamPartitionCountMonitorFactory.build(eq(jobModelConfig), callbackArgumentCaptor.capture())).thenReturn( + streamPartitionCountMonitor); + StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig); + JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig, + ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true); + MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel); + this.staticResourceJobCoordinator.start(); + verifyStartLifecycle(); + verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig); + verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, + streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT); + // call the callback from the monitor + callbackArgumentCaptor.getValue().onSystemStreamPartitionChange(ImmutableSet.of(SYSTEM_STREAM)); + verify(this.jobRestartSignal).restartJob(); + } + + @Test + public void testStreamRegexChange() throws IOException { + Config jobModelConfig = mock(Config.class); + JobModel jobModel = setupJobModel(jobModelConfig); + StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig); + StreamRegexMonitor streamRegexMonitor = mock(StreamRegexMonitor.class); + ArgumentCaptor callbackArgumentCaptor = + ArgumentCaptor.forClass(StreamRegexMonitor.Callback.class); + when(this.streamRegexMonitorFactory.build(eq(jobModel), eq(jobModelConfig), + callbackArgumentCaptor.capture())).thenReturn(Optional.of(streamRegexMonitor)); + JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig, + ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true); + MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel); + this.staticResourceJobCoordinator.start(); + verifyStartLifecycle(); + verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig); + verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, + streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT); + // call the callback from the monitor + callbackArgumentCaptor.getValue() + .onInputStreamsChanged(ImmutableSet.of(SYSTEM_STREAM), + ImmutableSet.of(SYSTEM_STREAM, new SystemStream("system", "stream1")), + ImmutableMap.of("system", Pattern.compile("stream.*"))); + verify(this.jobRestartSignal).restartJob(); + } + + /** + * Set up {@link StreamPartitionCountMonitorFactory} to return a mock {@link StreamPartitionCountMonitor}. + */ + private StreamPartitionCountMonitor setupStreamPartitionCountMonitor(Config config) { + StreamPartitionCountMonitor streamPartitionCountMonitor = mock(StreamPartitionCountMonitor.class); + when(this.streamPartitionCountMonitorFactory.build(eq(config), any())).thenReturn(streamPartitionCountMonitor); + return streamPartitionCountMonitor; + } + + /** + * Set up {@link StreamRegexMonitorFactory} to return a mock {@link StreamRegexMonitor}. + */ + private StreamRegexMonitor setupStreamRegexMonitor(JobModel jobModel, Config jobModelConfig) { + StreamRegexMonitor streamRegexMonitor = mock(StreamRegexMonitor.class); + when(this.streamRegexMonitorFactory.build(eq(jobModel), eq(jobModelConfig), any())).thenReturn( + Optional.of(streamRegexMonitor)); + return streamRegexMonitor; } /** @@ -257,10 +402,13 @@ private void verifyStartLifecycle() { * Common steps to verify when preparing workers for processing. * @param jobModel job model to be served for workers * @param metadataResourceUtil expected to be used for creating resources + * @param streamPartitionCountMonitor expected to be started + * @param streamRegexMonitor if not null, expected to be started * @param newMetadata if not null, expected to be written to {@link JobCoordinatorMetadataManager} * @param expectedFanOut if not null, expected to be passed to {@link StartpointManager} for fan out */ - private void verifyPrepareWorkerExecution(JobModel jobModel, MetadataResourceUtil metadataResourceUtil, + private void verifyPrepareWorkerExecutionAndMonitor(JobModel jobModel, MetadataResourceUtil metadataResourceUtil, + StreamPartitionCountMonitor streamPartitionCountMonitor, StreamRegexMonitor streamRegexMonitor, JobCoordinatorMetadata newMetadata, Map> expectedFanOut) throws IOException { InOrder inOrder = inOrder(this.jobCoordinatorMetadataManager, this.jobModelServingContext, metadataResourceUtil, this.startpointManager, this.coordinatorCommunication); @@ -277,5 +425,18 @@ private void verifyPrepareWorkerExecution(JobModel jobModel, MetadataResourceUti verify(this.startpointManager, never()).fanOut(any()); } inOrder.verify(this.coordinatorCommunication).start(); + verify(streamPartitionCountMonitor).start(); + if (streamRegexMonitor != null) { + verify(streamRegexMonitor).start(); + } + } + + private void verifyNoSideEffects(StreamPartitionCountMonitor streamPartitionCountMonitor, + StreamRegexMonitor streamRegexMonitor) throws IOException { + verify(this.jobCoordinatorMetadataManager, never()).writeJobCoordinatorMetadata(any()); + verify(this.staticResourceJobCoordinator, never()).metadataResourceUtil(any()); + verify(this.startpointManager, never()).fanOut(any()); + verifyZeroInteractions(this.jobModelServingContext, this.coordinatorCommunication, streamPartitionCountMonitor, + streamRegexMonitor, this.jobCoordinatorListener); } } \ No newline at end of file