+ * In case of fail-back, any event that's going to restart this broker as backup (eg quorum service unavailable
+ * or some replication failures) will cause {@code wasLive} to be {@code false}, because the HA policy set isn't
+ * a primary anymore.
+ */
+ private boolean isFirstFailbackAttempt() {
+ return wasLive && policy.isTryFailback();
+ }
+
+ private DistributedLock tryAcquireLiveLock() throws InterruptedException {
+ // disable quorum service unavailability handling and just treat this imperatively
+ if (!stopping.compareAndSet(false, true)) {
+ // already unavailable quorum service: fail fast
+ return null;
+ }
+ distributedManager.removeUnavailableManagerListener(this);
+ assert activeMQServer.getNodeManager().getNodeId() != null;
+ final String liveID = activeMQServer.getNodeManager().getNodeId().toString();
+ final int voteRetries = policy.getVoteRetries();
+ final long maxAttempts = voteRetries >= 0 ? (voteRetries + 1) : -1;
+ if (maxAttempts == -1) {
+ LOGGER.error("It's not safe to retry an infinite amount of time to acquire a live lock: please consider setting a vote-retries value");
+ }
+ final long voteRetryWait = policy.getVoteRetryWait();
+ final DistributedLock liveLock = getLock(distributedManager, liveID);
+ if (liveLock == null) {
+ return null;
+ }
+ for (long attempt = 0; maxAttempts >= 0 ? (attempt < maxAttempts) : true; attempt++) {
+ try {
+ if (liveLock.tryLock(voteRetryWait, TimeUnit.MILLISECONDS)) {
+ LOGGER.debugf("%s live lock acquired after %d attempts.", liveID, (attempt + 1));
+ return liveLock;
+ }
+ } catch (UnavailableStateException e) {
+ LOGGER.warnf(e, "Failed to acquire live lock %s because of unavailable quorum service: stop trying", liveID);
+ distributedManager.stop();
+ return null;
+ }
+ }
+ LOGGER.warnf("Failed to acquire live lock %s after %d tries", liveID, maxAttempts);
+ distributedManager.stop();
+ return null;
+ }
+
+ private DistributedLock getLock(final DistributedPrimitiveManager manager,
+ final String lockId) throws InterruptedException {
+ if (!manager.isStarted()) {
+ return null;
+ }
+ try {
+ return manager.getDistributedLock(lockId);
+ } catch (ExecutionException e) {
+ LOGGER.warnf(e, "Errored while getting lock %s", lockId);
+ return null;
+ } catch (TimeoutException te) {
+ LOGGER.warnf(te, "Timeout while getting lock %s", lockId);
+ return null;
+ }
+ }
+
+ private ReplicationObserver replicationObserver() {
+ if (policy.isTryFailback()) {
+ return ReplicationObserver.failbackObserver(activeMQServer.getNodeManager(), activeMQServer.getBackupManager(), activeMQServer.getScheduledPool(), expectedNodeID);
+ }
+ return ReplicationObserver.failoverObserver(activeMQServer.getNodeManager(), activeMQServer.getBackupManager(), activeMQServer.getScheduledPool());
+ }
+
+ private ReplicationFailure replicateLive(final ClusterController clusterController,
+ final LiveNodeLocator liveLocator,
+ final RegistrationFailureForwarder registrationFailureForwarder) throws ActiveMQException {
+ try (ReplicationObserver replicationObserver = replicationObserver();
+ RegistrationFailureForwarder ignored = registrationFailureForwarder.to(replicationObserver)) {
+ this.replicationObserver = replicationObserver;
+ clusterController.addClusterTopologyListener(replicationObserver);
+ // ReplicationError notifies backup registration failures to live locator -> forwarder -> observer
+ final ReplicationError replicationError = new ReplicationError(liveLocator);
+ clusterController.addIncomingInterceptorForReplication(replicationError);
+ try {
+ final ClusterControl liveControl = tryLocateAndConnectToLive(liveLocator, clusterController);
+ if (liveControl == null) {
+ return null;
+ }
+ try {
+ final ReplicationEndpoint replicationEndpoint = tryAuthorizeAndAsyncRegisterAsBackupToLive(liveControl, replicationObserver);
+ if (replicationEndpoint == null) {
+ return ReplicationFailure.RegistrationError;
+ }
+ this.replicationEndpoint = replicationEndpoint;
+ assert replicationEndpoint != null;
+ try {
+ return replicationObserver.awaitReplicationFailure();
+ } finally {
+ this.replicationEndpoint = null;
+ ActiveMQServerImpl.stopComponent(replicationEndpoint);
+ closeChannelOf(replicationEndpoint);
+ }
+ } finally {
+ silentExecution("Errored on live control close", liveControl::close);
+ }
+ } finally {
+ silentExecution("Errored on cluster topology listener cleanup", () -> clusterController.removeClusterTopologyListener(replicationObserver));
+ silentExecution("Errored while removing incoming interceptor for replication", () -> clusterController.removeIncomingInterceptorForReplication(replicationError));
+ }
+ } finally {
+ this.replicationObserver = null;
+ }
+ }
+
+ private static void silentExecution(String debugErrorMessage, Runnable task) {
+ try {
+ task.run();
+ } catch (Throwable ignore) {
+ LOGGER.debug(debugErrorMessage, ignore);
+ }
+ }
+
+ private static void closeChannelOf(final ReplicationEndpoint replicationEndpoint) {
+ if (replicationEndpoint == null) {
+ return;
+ }
+ if (replicationEndpoint.getChannel() != null) {
+ silentExecution("Errored while closing replication endpoint channel", () -> replicationEndpoint.getChannel().close());
+ replicationEndpoint.setChannel(null);
+ }
+ }
+
+ private boolean asyncRestartServer(final ActiveMQServer server, boolean restart) {
+ return asyncRestartServer(server, restart, true);
+ }
+
+ private boolean asyncRestartServer(final ActiveMQServer server, boolean restart, boolean checkStopping) {
+ if (checkStopping) {
+ if (!stopping.compareAndSet(false, true)) {
+ return false;
+ }
+ }
+ new Thread(() -> {
+ if (server.getState() != ActiveMQServer.SERVER_STATE.STOPPED && server.getState() != ActiveMQServer.SERVER_STATE.STOPPING) {
+ try {
+ server.stop(!restart);
+ if (restart) {
+ server.start();
+ }
+ } catch (Exception e) {
+ if (restart) {
+ ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, server);
+ } else {
+ ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
+ }
+ }
+ }
+ }).start();
+ return true;
+ }
+
+ private ClusterControl tryLocateAndConnectToLive(final LiveNodeLocator liveLocator,
+ final ClusterController clusterController) throws ActiveMQException {
+ liveLocator.locateNode();
+ final Pair
+ * If the second message does come within this dead line, we fail over anyway.
+ */
+ public static final int WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG = 60;
+
+ private ReplicationObserver(final NodeManager nodeManager,
+ final BackupManager backupManager,
+ final ScheduledExecutorService scheduledPool,
+ final boolean failback,
+ final String expectedNodeID) {
+ this.nodeManager = nodeManager;
+ this.backupManager = backupManager;
+ this.scheduledPool = scheduledPool;
+ this.failback = failback;
+ this.expectedNodeID = expectedNodeID;
+ this.replicationFailure = new CompletableFuture<>();
+
+ this.sessionFactory = null;
+ this.connection = null;
+ this.forcedFailover = null;
+
+ this.liveID = null;
+ this.backupUpToDate = false;
+ this.closed = false;
+ }
+
+ public static ReplicationObserver failbackObserver(final NodeManager nodeManager,
+ final BackupManager backupManager,
+ final ScheduledExecutorService scheduledPool,
+ final String expectedNodeID) {
+ Objects.requireNonNull(expectedNodeID);
+ return new ReplicationObserver(nodeManager, backupManager, scheduledPool, true, expectedNodeID);
+ }
+
+ public static ReplicationObserver failoverObserver(final NodeManager nodeManager,
+ final BackupManager backupManager,
+ final ScheduledExecutorService scheduledPool) {
+ return new ReplicationObserver(nodeManager, backupManager, scheduledPool, false, null);
+ }
+
+ private void onLiveDown(boolean voluntaryFailover) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ stopForcedFailoverAfterDelay();
+ unlistenConnectionFailures();
+ if (!isRemoteBackupUpToDate()) {
+ replicationFailure.complete(ReplicationFailure.BackupNotInSync);
+ } else if (voluntaryFailover) {
+ replicationFailure.complete(ReplicationFailure.VoluntaryFailOver);
+ } else {
+ replicationFailure.complete(ReplicationFailure.NonVoluntaryFailover);
+ }
+ }
+ }
+
+ @Override
+ public void nodeDown(long eventUID, String nodeID) {
+ // ignore it during a failback:
+ // a failing slave close all connections but the one used for replication
+ // triggering a nodeDown before the restarted master receive a STOP_CALLED from it.
+ // This can make master to fire a useless quorum vote during a normal failback.
+ if (failback) {
+ return;
+ }
+ if (nodeID.equals(liveID)) {
+ onLiveDown(false);
+ }
+ }
+
+ @Override
+ public void nodeUP(TopologyMember member, boolean last) {
+ }
+
+ /**
+ * if the connection to our replicated live goes down then decide on an action
+ */
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ onLiveDown(false);
+ }
+
+ @Override
+ public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
+ connectionFailed(me, failedOver);
+ }
+
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ //noop
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ unlistenConnectionFailures();
+ closed = true;
+ replicationFailure.complete(ReplicationFailure.ClosedObserver);
+ }
+ }
+
+ /**
+ * @param liveSessionFactory the session factory used to connect to the live server
+ */
+ public synchronized void listenConnectionFailuresOf(final ClientSessionFactoryInternal liveSessionFactory) {
+ if (closed) {
+ throw new IllegalStateException("the observer is closed: cannot listen to any failures");
+ }
+ if (sessionFactory != null || connection != null) {
+ throw new IllegalStateException("this observer is already listening to other session factory failures");
+ }
+ this.sessionFactory = liveSessionFactory;
+ //belts and braces, there are circumstances where the connection listener doesn't get called but the session does.
+ this.sessionFactory.addFailureListener(this);
+ connection = (CoreRemotingConnection) liveSessionFactory.getConnection();
+ connection.addFailureListener(this);
+ }
+
+ public synchronized void unlistenConnectionFailures() {
+ if (connection != null) {
+ connection.removeFailureListener(this);
+ connection = null;
+ }
+ if (sessionFactory != null) {
+ sessionFactory.removeFailureListener(this);
+ sessionFactory = null;
+ }
+ }
+
+ @Override
+ public void onBackupRegistrationFailed(boolean alreadyReplicating) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ stopForcedFailoverAfterDelay();
+ unlistenConnectionFailures();
+ replicationFailure.complete(alreadyReplicating ? ReplicationFailure.AlreadyReplicating : ReplicationFailure.RegistrationError);
+ }
+ }
+
+ public ReplicationFailure awaitReplicationFailure() {
+ try {
+ return replicationFailure.get();
+ } catch (Throwable e) {
+ return ReplicationFailure.ClosedObserver;
+ }
+ }
+
+ private synchronized void scheduleForcedFailoverAfterDelay() {
+ if (forcedFailover != null) {
+ return;
+ }
+ forcedFailover = scheduledPool.schedule(() -> onLiveDown(false), WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
+ }
+
+ private synchronized void stopForcedFailoverAfterDelay() {
+ if (forcedFailover == null) {
+ return;
+ }
+ forcedFailover.cancel(false);
+ forcedFailover = null;
+ }
+
+ @Override
+ public void onRemoteBackupUpToDate() {
+ if (backupUpToDate || closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (backupUpToDate || closed || replicationFailure.isDone()) {
+ return;
+ }
+ assert liveID != null;
+ backupManager.announceBackup();
+ backupUpToDate = true;
+ }
+ }
+
+ public boolean isBackupUpToDate() {
+ return backupUpToDate;
+ }
+
+ public String getLiveID() {
+ return liveID;
+ }
+
+ private boolean validateNodeId(String nodeID) {
+ if (nodeID == null) {
+ return false;
+ }
+ final String existingNodeId = this.liveID;
+ if (existingNodeId == null) {
+ if (!failback) {
+ return true;
+ }
+ return nodeID.equals(expectedNodeID);
+ }
+ return existingNodeId.equals(nodeID);
+ }
+
+ @Override
+ public void onLiveNodeId(String nodeId) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ final String existingNodeId = this.liveID;
+ if (existingNodeId != null && existingNodeId.equals(nodeId)) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ if (!validateNodeId(nodeId)) {
+ stopForcedFailoverAfterDelay();
+ unlistenConnectionFailures();
+ replicationFailure.complete(ReplicationFailure.WrongNodeId);
+ } else if (liveID == null) {
+ liveID = nodeId;
+ nodeManager.setNodeID(nodeId);
+ }
+ }
+ }
+
+ public boolean isRemoteBackupUpToDate() {
+ return backupUpToDate;
+ }
+
+ @Override
+ public void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ switch (finalMessage) {
+ case STOP_CALLED:
+ scheduleForcedFailoverAfterDelay();
+ break;
+ case FAIL_OVER:
+ onLiveDown(true);
+ break;
+ default:
+ LOGGER.errorf("unsupported LiveStopping type: %s", finalMessage);
+ }
+ }
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java
new file mode 100644
index 00000000000..aa4d0e7e5d2
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.server.impl.ClusterTopologySearch.searchActiveLiveNodeId;
+
+/**
+ * This is going to be {@link #run()} just by natural born primary, at the first start.
+ * Both during a failover or a failback, {@link #run()} isn't going to be used, but only {@link #getActivationChannelHandler(Channel, Acceptor)}.
+ */
+public class ReplicationPrimaryActivation extends LiveActivation implements DistributedLock.UnavailableLockListener {
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicationPrimaryActivation.class);
+ private static final long DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS = 20_000;
+ private static final long BLOCKING_CALLS_TIMEOUT_MILLIS = 5_000;
+
+ private final ReplicationPrimaryPolicy policy;
+
+ private final ActiveMQServerImpl activeMQServer;
+
+ @GuardedBy("replicationLock")
+ private ReplicationManager replicationManager;
+
+ private final Object replicationLock;
+
+ private final DistributedPrimitiveManager distributedManager;
+
+ private volatile boolean stoppingServer;
+
+ public ReplicationPrimaryActivation(final ActiveMQServerImpl activeMQServer,
+ final DistributedPrimitiveManager distributedManager,
+ final ReplicationPrimaryPolicy policy) {
+ this.activeMQServer = activeMQServer;
+ this.policy = policy;
+ this.replicationLock = new Object();
+ this.distributedManager = distributedManager;
+ }
+
+ /**
+ * used for testing purposes.
+ */
+ public DistributedPrimitiveManager getDistributedManager() {
+ return distributedManager;
+ }
+
+ @Override
+ public void freezeConnections(RemotingService remotingService) {
+ final ReplicationManager replicationManager = getReplicationManager();
+
+ if (remotingService != null && replicationManager != null) {
+ remotingService.freeze(null, replicationManager.getBackupTransportConnection());
+ } else if (remotingService != null) {
+ remotingService.freeze(null, null);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ final String nodeId = activeMQServer.getNodeManager().readNodeId().toString();
+
+ final DistributedLock liveLock = searchLiveOrAcquireLiveLock(nodeId, BLOCKING_CALLS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ if (liveLock == null) {
+ return;
+ }
+
+ activeMQServer.initialisePart1(false);
+
+ activeMQServer.initialisePart2(false);
+
+ // must be registered before checking the caller
+ liveLock.addListener(this);
+
+ // This control is placed here because initialisePart2 is going to load the journal that
+ // could pause the JVM for enough time to lose lock ownership
+ if (!liveLock.isHeldByCaller()) {
+ throw new IllegalStateException("This broker isn't live anymore, probably due to application pauses eg GC, OS etc: failing now");
+ }
+
+ activeMQServer.completeActivation(true);
+
+ if (activeMQServer.getIdentity() != null) {
+ ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+ } else {
+ ActiveMQServerLogger.LOGGER.serverIsLive();
+ }
+ } catch (Exception e) {
+ // async stop it, we don't need to await this to complete
+ distributedManager.stop();
+ ActiveMQServerLogger.LOGGER.initializationError(e);
+ activeMQServer.callActivationFailureListeners(e);
+ }
+ }
+
+ private DistributedLock searchLiveOrAcquireLiveLock(final String nodeId,
+ final long blockingCallTimeout,
+ final TimeUnit unit) throws ActiveMQException, InterruptedException {
+ if (policy.isCheckForLiveServer()) {
+ LOGGER.infof("Searching a live server with NodeID = %s", nodeId);
+ if (searchActiveLiveNodeId(policy.getClusterName(), nodeId, blockingCallTimeout, unit, activeMQServer.getConfiguration())) {
+ LOGGER.infof("Found a live server with NodeID = %s: restarting as backup", nodeId);
+ activeMQServer.setHAPolicy(policy.getBackupPolicy());
+ return null;
+ }
+ }
+ startDistributedPrimitiveManager();
+ return acquireDistributeLock(getDistributeLock(nodeId), blockingCallTimeout, unit);
+ }
+
+ private void startDistributedPrimitiveManager() throws InterruptedException, ActiveMQException {
+ LOGGER.infof("Trying to reach the majority of quorum nodes in %d ms.", DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS);
+ try {
+ if (distributedManager.start(DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (Throwable t) {
+ LOGGER.debug(t);
+ }
+ assert !distributedManager.isStarted();
+ throw new ActiveMQException("Cannot reach the majority of quorum nodes");
+ }
+
+ private DistributedLock getDistributeLock(final String nodeId) throws InterruptedException, ActiveMQException {
+ try {
+ return distributedManager.getDistributedLock(nodeId);
+ } catch (Throwable t) {
+ try {
+ distributedManager.stop();
+ } catch (Throwable ignore) {
+ // don't care
+ }
+ if (t instanceof InterruptedException) {
+ throw (InterruptedException) t;
+ }
+ throw new ActiveMQException("Cannot obtain a live lock instance");
+ }
+ }
+
+ private DistributedLock acquireDistributeLock(final DistributedLock liveLock,
+ final long acquireLockTimeout,
+ final TimeUnit unit) throws InterruptedException, ActiveMQException {
+ try {
+ if (liveLock.tryLock(acquireLockTimeout, unit)) {
+ return liveLock;
+ }
+ } catch (UnavailableStateException e) {
+ LOGGER.debug(e);
+ }
+ try {
+ distributedManager.stop();
+ } catch (Throwable ignore) {
+ // don't care
+ }
+ throw new ActiveMQException("Failed to become live");
+ }
+
+ @Override
+ public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) {
+ if (stoppingServer) {
+ return null;
+ }
+ return packet -> {
+ if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) {
+ onBackupRegistration(channel, acceptorUsed, (BackupRegistrationMessage) packet);
+ }
+ };
+ }
+
+ private void onBackupRegistration(final Channel channel,
+ final Acceptor acceptorUsed,
+ final BackupRegistrationMessage msg) {
+ try {
+ startAsyncReplication(channel.getConnection(), acceptorUsed.getClusterConnection(), msg.getConnector(), msg.isFailBackRequest());
+ } catch (ActiveMQAlreadyReplicatingException are) {
+ channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
+ } catch (ActiveMQException e) {
+ LOGGER.debug("Failed to process backup registration packet", e);
+ channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
+ }
+ }
+
+ private void startAsyncReplication(final CoreRemotingConnection remotingConnection,
+ final ClusterConnection clusterConnection,
+ final TransportConfiguration backupTransport,
+ final boolean isFailBackRequest) throws ActiveMQException {
+ synchronized (replicationLock) {
+ if (replicationManager != null) {
+ throw new ActiveMQAlreadyReplicatingException();
+ }
+ if (!activeMQServer.isStarted()) {
+ throw new ActiveMQIllegalStateException();
+ }
+ final ReplicationFailureListener listener = new ReplicationFailureListener();
+ remotingConnection.addCloseListener(listener);
+ remotingConnection.addFailureListener(listener);
+ final ReplicationManager replicationManager = new ReplicationManager(activeMQServer, remotingConnection, clusterConnection.getCallTimeout(), policy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
+ this.replicationManager = replicationManager;
+ replicationManager.start();
+ final Thread replicatingThread = new Thread(() -> replicate(replicationManager, clusterConnection, isFailBackRequest, backupTransport));
+ replicatingThread.setName("async-replication-thread");
+ replicatingThread.start();
+ }
+ }
+
+ private void replicate(final ReplicationManager replicationManager,
+ final ClusterConnection clusterConnection,
+ final boolean isFailBackRequest,
+ final TransportConfiguration backupTransport) {
+ try {
+ final String nodeID = activeMQServer.getNodeID().toString();
+ activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), nodeID, isFailBackRequest && policy.isAllowAutoFailBack(), policy.getInitialReplicationSyncTimeout());
+
+ clusterConnection.nodeAnnounced(System.currentTimeMillis(), nodeID, policy.getGroupName(), policy.getScaleDownGroupName(), new Pair<>(null, backupTransport), true);
+
+ if (isFailBackRequest && policy.isAllowAutoFailBack()) {
+ awaitBackupAnnouncementOnFailbackRequest(clusterConnection);
+ }
+ } catch (Exception e) {
+ if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STARTED) {
+ /*
+ * The reasoning here is that the exception was either caused by (1) the
+ * (interaction with) the backup, or (2) by an IO Error at the storage. If (1), we
+ * can swallow the exception and ignore the replication request. If (2) the live
+ * will crash shortly.
+ */
+ ActiveMQServerLogger.LOGGER.errorStartingReplication(e);
+ }
+ try {
+ ActiveMQServerImpl.stopComponent(replicationManager);
+ } catch (Exception amqe) {
+ ActiveMQServerLogger.LOGGER.errorStoppingReplication(amqe);
+ } finally {
+ synchronized (replicationLock) {
+ this.replicationManager = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * This is handling awaiting backup announcement before trying to failover.
+ * This broker is a backup broker, acting as a live and ready to restart as a backup
+ */
+ private void awaitBackupAnnouncementOnFailbackRequest(ClusterConnection clusterConnection) throws Exception {
+ final String nodeID = activeMQServer.getNodeID().toString();
+ final BackupTopologyListener topologyListener = new BackupTopologyListener(nodeID, clusterConnection.getConnector());
+ clusterConnection.addClusterTopologyListener(topologyListener);
+ try {
+ if (topologyListener.waitForBackup()) {
+ restartAsBackupAfterFailback();
+ } else {
+ ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
+ }
+ } finally {
+ clusterConnection.removeClusterTopologyListener(topologyListener);
+ }
+ }
+
+ /**
+ * If {@link #asyncStopServer()} happens before this call, the restart just won't happen.
+ * If {@link #asyncStopServer()} happens after this call, will make the server to stop right after being restarted.
+ */
+ private void restartAsBackupAfterFailback() throws Exception {
+ if (stoppingServer) {
+ return;
+ }
+ synchronized (this) {
+ if (stoppingServer) {
+ return;
+ }
+ distributedManager.stop();
+ activeMQServer.fail(true);
+ ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
+ activeMQServer.setHAPolicy(policy.getBackupPolicy());
+ activeMQServer.start();
+ }
+ }
+
+ private void asyncStopServer() {
+ if (stoppingServer) {
+ return;
+ }
+ synchronized (this) {
+ if (stoppingServer) {
+ return;
+ }
+ stoppingServer = true;
+ new Thread(() -> {
+ try {
+ activeMQServer.stop();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);
+ }
+ }).start();
+ }
+ }
+
+ @Override
+ public void onUnavailableLockEvent() {
+ LOGGER.error("Quorum UNAVAILABLE: async stopping broker.");
+ asyncStopServer();
+ }
+
+ private final class ReplicationFailureListener implements FailureListener, CloseListener {
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ onReplicationConnectionClose();
+ }
+
+ @Override
+ public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
+ connectionFailed(me, failedOver);
+ }
+
+ @Override
+ public void connectionClosed() {
+ onReplicationConnectionClose();
+ }
+ }
+
+ private void onReplicationConnectionClose() {
+ ExecutorService executorService = activeMQServer.getThreadPool();
+ if (executorService != null) {
+ synchronized (replicationLock) {
+ if (replicationManager == null) {
+ return;
+ }
+ }
+ executorService.execute(() -> {
+ synchronized (replicationLock) {
+ if (replicationManager == null) {
+ return;
+ }
+ // this is going to stop the replication manager
+ activeMQServer.getStorageManager().stopReplication();
+ assert !replicationManager.isStarted();
+ replicationManager = null;
+ }
+ });
+ }
+ }
+
+ @Override
+ public void close(boolean permanently, boolean restarting) throws Exception {
+ synchronized (replicationLock) {
+ replicationManager = null;
+ }
+ distributedManager.stop();
+ // To avoid a NPE cause by the stop
+ final NodeManager nodeManager = activeMQServer.getNodeManager();
+ if (nodeManager != null) {
+ if (permanently) {
+ nodeManager.crashLiveServer();
+ } else {
+ nodeManager.pauseLiveServer();
+ }
+ }
+ }
+
+ @Override
+ public void sendLiveIsStopping() {
+ final ReplicationManager replicationManager = getReplicationManager();
+ if (replicationManager == null) {
+ return;
+ }
+ replicationManager.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED);
+ // this pool gets a 'hard' shutdown, no need to manage the Future of this Runnable.
+ activeMQServer.getScheduledPool().schedule(replicationManager::clearReplicationTokens, 30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public ReplicationManager getReplicationManager() {
+ synchronized (replicationLock) {
+ return replicationManager;
+ }
+ }
+
+ @Override
+ public boolean isReplicaSync() {
+ final ReplicationManager replicationManager = getReplicationManager();
+ if (replicationManager == null) {
+ return false;
+ }
+ return !replicationManager.isSynchronizing();
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 0249cdfe53e..3876185803b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -32,6 +32,7 @@
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint.ReplicationEndpointEventListener;
import org.apache.activemq.artemis.core.server.ActivationParams;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -54,7 +55,7 @@
import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAIL_OVER;
import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.STOP;
-public final class SharedNothingBackupActivation extends Activation {
+public final class SharedNothingBackupActivation extends Activation implements ReplicationEndpointEventListener {
private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class);
@@ -96,7 +97,7 @@ public void init() throws Exception {
assert replicationEndpoint == null;
activeMQServer.resetNodeManager();
backupUpToDate = false;
- replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this);
+ replicationEndpoint = new ReplicationEndpoint(activeMQServer, attemptFailBack, this);
}
@Override
@@ -156,9 +157,6 @@ public void run() {
logger.debug("Starting backup manager");
activeMQServer.getBackupManager().start();
- logger.debug("Set backup Quorum");
- replicationEndpoint.setBackupQuorum(backupQuorum);
-
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
EndpointConnector endpointConnector = new EndpointConnector();
@@ -461,7 +459,13 @@ public boolean isRemoteBackupUpToDate() {
return backupUpToDate;
}
- public void setRemoteBackupUpToDate() {
+ @Override
+ public void onLiveNodeId(String nodeId) {
+ backupQuorum.liveIDSet(nodeId);
+ }
+
+ @Override
+ public void onRemoteBackupUpToDate() {
activeMQServer.getBackupManager().announceBackup();
backupUpToDate = true;
backupSyncLatch.countDown();
@@ -470,7 +474,8 @@ public void setRemoteBackupUpToDate() {
/**
* @throws ActiveMQException
*/
- public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException {
+ @Override
+ public void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException {
if (logger.isTraceEnabled()) {
logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
backupUpToDate);
@@ -526,4 +531,9 @@ private synchronized ReplicationEndpoint connectToReplicationEndpoint(final Clus
return replicationEndpoint;
}
}
+
+ @Override
+ public boolean isReplicaSync() {
+ return isRemoteBackupUpToDate();
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 9de4be057ba..f876a7604b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -462,4 +462,13 @@ public void nodeDown(long eventUID, String nodeID) {
private TransportConfiguration[] connectorNameListToArray(final List
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.quorum;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingZooKeeperServer;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+public class ZookeeperPluggableQuorumSinglePairTest extends PluggableQuorumSinglePairTest {
+
+ private static final Logger LOGGER = Logger.getLogger(ZookeeperPluggableQuorumSinglePairTest.class);
+ private static final int BASE_SERVER_PORT = 6666;
+ // Beware: the server tick must be small enough that to let the session to be correctly expired
+ private static final int SERVER_TICK_MS = 100;
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ private TestingCluster testingServer;
+ private InstanceSpec[] clusterSpecs;
+ private int nodes;
+
+ @Before
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ nodes = 3;
+ clusterSpecs = new InstanceSpec[nodes];
+ for (int i = 0; i < nodes; i++) {
+ clusterSpecs[i] = new InstanceSpec(tmpFolder.newFolder(), BASE_SERVER_PORT + i, -1, -1, true, -1, SERVER_TICK_MS, -1);
+ }
+ testingServer = new TestingCluster(clusterSpecs);
+ testingServer.start();
+ Assert.assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668", testingServer.getConnectString());
+ LOGGER.infof("Cluster of %d nodes on: %s", 3, testingServer.getConnectString());
+ }
+
+ @Override
+ @After
+ public void after() throws Exception {
+ try {
+ super.after();
+ } finally {
+ testingServer.close();
+ }
+ }
+
+ public ZookeeperPluggableQuorumSinglePairTest() {
+ super("zk");
+ }
+
+ @Override
+ protected boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) {
+ return true;
+ }
+
+ @Override
+ protected void stopMajority() throws Exception {
+ List