Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator.lifecycle;

/**
* Interface for defining how to trigger a restart of a Samza job. This will be called when the Samza
* framework determines that a job restart is needed. For example, if the partition count of an input stream
* changes, then that means the job model needs to change, and restarting the job will update the job model.
*/
public interface JobRestartSignal {
/**
* Trigger a restart of the Samza job. This method should trigger the restart asynchronously, because the
* caller of this method is part of the Samza job which is going to be restarted. It is not necessary that
* the restart needs to actually happen immediately, as the job will continue to run until the restart
* actually happens.
*/
void restartJob();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator.lifecycle;

/**
* See {@link JobRestartSignal}.
*/
public interface JobRestartSignalFactory {
JobRestartSignal build(JobRestartSignalFactoryContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator.lifecycle;

import org.apache.samza.config.Config;


/**
* Contains objects that are needed to build a {@link JobRestartSignal}.
* Having this class allows {@link JobRestartSignalFactory#build} to remain unchanged if additional components
* are needed in the future. Update this class if additional components are needed building {@link JobRestartSignal}.
*/
public class JobRestartSignalFactoryContext {
private final Config config;

public JobRestartSignalFactoryContext(Config config) {
this.config = config;
}

/**
* {@link Config} used to build a {@link JobRestartSignal}.
*/
public Config getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ private static void runJobCoordinator(String jobCoordinatorClassName, MetricsReg
JobCoordinator jobCoordinator =
jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER, finalConfig, metrics,
metadataStore);
addShutdownHook(jobCoordinator);
Map<String, MetricsReporter> metricsReporters =
MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME);
metricsReporters.values()
Expand All @@ -123,6 +122,7 @@ private static void runJobCoordinator(String jobCoordinatorClassName, MetricsReg
CountDownLatch waitForShutdownLatch = new CountDownLatch(1);
jobCoordinator.setListener(new NoProcessorJobCoordinatorListener(waitForShutdownLatch));
jobCoordinator.start();
addShutdownHook(jobCoordinator);
try {
waitForShutdownLatch.await();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.config;

import java.util.Optional;
import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.coordinator.lifecycle.NoOpJobRestartSignalFactory;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.util.ReflectionUtil;
Expand All @@ -32,8 +32,11 @@
public class JobCoordinatorConfig extends MapConfig {
public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
public static final String JOB_RESTART_SIGNAL_FACTORY = "job.coordinator.restart.signal.factory";

private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
private static final String DEFAULT_JOB_RESTART_SIGNAL_FACTORY = NoOpJobRestartSignalFactory.class.getName();

public JobCoordinatorConfig(Config config) {
super(config);
Expand Down Expand Up @@ -79,4 +82,8 @@ public String getJobCoordinatorFactoryClassName() {
public Optional<String> getOptionalJobCoordinatorFactoryClassName() {
return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY));
}

public String getJobRestartSignalFactory() {
return get(JOB_RESTART_SIGNAL_FACTORY, DEFAULT_JOB_RESTART_SIGNAL_FACTORY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Utility class for managing multiple monitors.
*/
public class JobModelMonitors {
private static final Logger LOG = LoggerFactory.getLogger(JobModelMonitors.class);

private final Optional<StreamPartitionCountMonitor> streamPartitionCountMonitor;
private final Optional<StreamRegexMonitor> streamRegexMonitor;

private final AtomicBoolean started = new AtomicBoolean(false);

public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
StreamRegexMonitor streamRegexMonitor) {
this.streamPartitionCountMonitor = Optional.ofNullable(streamPartitionCountMonitor);
this.streamRegexMonitor = Optional.ofNullable(streamRegexMonitor);
}

public void start() {
if (this.started.compareAndSet(false, true)) {
this.streamPartitionCountMonitor.ifPresent(StreamPartitionCountMonitor::start);
this.streamRegexMonitor.ifPresent(StreamRegexMonitor::start);
} else {
LOG.warn("Monitors already started");
}
}

public void stop() {
if (this.started.compareAndSet(true, false)) {
this.streamPartitionCountMonitor.ifPresent(StreamPartitionCountMonitor::stop);
this.streamRegexMonitor.ifPresent(StreamRegexMonitor::stop);
} else {
LOG.warn("Monitors already stopped");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface CoordinatorCommunication {
void start();

/**
* Stop the communication components. This may be called even if {@link #start()} has not yet been called.
* Stop the communication components.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator.lifecycle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Placeholder implementation for {@link JobRestartSignal}.
* If a use case requires job restarts, then a real implementation should be used.
*/
public class NoOpJobRestartSignal implements JobRestartSignal {
private static final Logger LOG = LoggerFactory.getLogger(NoOpJobRestartSignal.class);

@Override
public void restartJob() {
LOG.info("Job restart signalled, but job restart is no-op for this class");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator.lifecycle;

public class NoOpJobRestartSignalFactory implements JobRestartSignalFactory {
@Override
public JobRestartSignal build(JobRestartSignalFactoryContext context) {
return new NoOpJobRestartSignal();
}
}
Loading