From f744904fdb97a2f5380f37e9b8e904885cc95050 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 30 Jun 2017 20:19:43 -0400 Subject: [PATCH] ARTEMIS-1269 replication won't finish synchronization (cherry picked from commit 6b2798a0fe02bcb5ca2f13b1431fd8dfe327d20e) --- .../core/client/ActiveMQClientLogger.java | 2 +- .../core/protocol/core/impl/ChannelImpl.java | 10 +++ .../core/impl/RemotingConnectionImpl.java | 2 +- .../ReplicationResponseMessageV2.java | 3 +- .../core/replication/ReplicationEndpoint.java | 74 +++++++++++++++++-- .../core/replication/ReplicationManager.java | 7 +- .../core/server/impl/ReplicationError.java | 12 ++- .../impl/SharedNothingBackupActivation.java | 34 ++++++--- .../artemis/tests/util/ActiveMQTestBase.java | 6 +- .../failover/LargeMessageFailoverTest.java | 5 +- .../ReplicatedMultipleServerFailoverTest.java | 2 +- .../util/TransportConfigurationUtils.java | 4 +- 12 files changed, 127 insertions(+), 34 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 748e508bc26..88c817fe082 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -369,7 +369,7 @@ public interface ActiveMQClientLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT) - void errorDecodingPacket(@Cause Exception e); + void errorDecodingPacket(@Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 6e5f027fad8..e620f27d480 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -457,6 +457,10 @@ public void setCommandConfirmationHandler(final CommandConfirmationHandler handl @Override public void setHandler(final ChannelHandler handler) { + if (logger.isTraceEnabled()) { + logger.trace("Setting handler on " + this + " as " + handler); + } + this.handler = handler; } @@ -516,6 +520,9 @@ public void replayCommands(final int otherLastConfirmedCommandID) { @Override public void lock() { + if (logger.isTraceEnabled()) { + logger.trace("lock channel " + this); + } lock.lock(); reconnectID.incrementAndGet(); @@ -527,6 +534,9 @@ public void lock() { @Override public void unlock() { + if (logger.isTraceEnabled()) { + logger.trace("unlock channel " + this); + } lock.lock(); failingOver = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 506c60231b0..3679576aab2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -362,7 +362,7 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe doBufferReceived(packet); super.bufferReceived(connectionID, buffer); - } catch (Exception e) { + } catch (Throwable e) { ActiveMQClientLogger.LOGGER.errorDecodingPacket(e); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index b26084bb4c5..c01dd4fff77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -38,8 +38,9 @@ public boolean isSynchronizationIsFinishedAcknowledgement() { return synchronizationIsFinishedAcknowledgement; } - public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { + public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; + return this; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index e1879daaa37..04488cdecae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -209,7 +209,16 @@ public void handlePacket(final Packet packet) { ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet); response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)); } - channel.send(response); + + if (response != null) { + if (logger.isTraceEnabled()) { + logger.trace("Returning " + response); + } + + channel.send(response); + } else { + logger.trace("Response is null, ignoring response"); + } } /** @@ -331,34 +340,68 @@ public void setChannel(final Channel channel) { private void finishSynchronization(String liveID) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("finishSynchronization::" + liveID); + logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID); } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); + if (logger.isTraceEnabled()) { + logger.trace("getting lock on " + jc + ", journal = " + journal); + } + registerJournal(jc.typeByte, journal); journal.synchronizationLock(); try { + if (logger.isTraceEnabled()) { + logger.trace("lock acquired on " + jc); + } // files should be already in place. filesReservedForSync.remove(jc); - registerJournal(jc.typeByte, journal); + if (logger.isTraceEnabled()) { + logger.trace("stopping journal for " + jc); + } journal.stop(); + if (logger.isTraceEnabled()) { + logger.trace("starting journal for " + jc); + } journal.start(); + if (logger.isTraceEnabled()) { + logger.trace("loadAndSync " + jc); + } journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE); } finally { + if (logger.isTraceEnabled()) { + logger.trace("unlocking " + jc); + } journal.synchronizationUnlock(); } } + + if (logger.isTraceEnabled()) { + logger.trace("Sync on large messages..."); + } ByteBuffer buffer = ByteBuffer.allocate(4 * 1024); for (Entry entry : largeMessages.entrySet()) { ReplicatedLargeMessage lm = entry.getValue(); if (lm instanceof LargeServerMessageInSync) { LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm; + if (logger.isTraceEnabled()) { + logger.trace("lmSync on " + lmSync.toString()); + } lmSync.joinSyncedData(buffer); } } + if (logger.isTraceEnabled()) { + logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID); + } + journalsHolder = null; backupQuorum.liveIDSet(liveID); activation.setRemoteBackupUpToDate(); + + if (logger.isTraceEnabled()) { + logger.trace("Backup is synchronized / BACKUP-SYNC-DONE"); + } + ActiveMQServerLogger.LOGGER.backupServerSynched(server); return; } @@ -427,13 +470,28 @@ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final if (logger.isTraceEnabled()) { logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); } - ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (!started) - return replicationResponseMessage; if (packet.isSynchronizationFinished()) { - finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + executor.execute(() -> { + try { + // this is a long running process, we cannot block the reading thread from netty + finishSynchronization(packet.getNodeID()); + if (logger.isTraceEnabled()) { + logger.trace("returning completion on synchronization catchup"); + } + channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true)); + } catch (Exception e) { + logger.warn(e.getMessage()); + channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e))); + } + + }); + // the write will happen through an executor + return null; + } + + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); + if (!started) { return replicationResponseMessage; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 5a6c102f6c9..d8d70f0f0e4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -350,15 +350,16 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp } if (enabled) { - pendingTokens.add(repliToken); if (useExecutor) { replicationStream.execute(() -> { if (enabled) { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } }); } else { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } @@ -405,9 +406,9 @@ private void replicated() { OperationContext ctx = pendingTokens.poll(); if (ctx == null) { - throw new IllegalStateException("Missing replication token on the queue."); + logger.warn("Missing replication token on queue"); + return; } - ctx.replicationDone(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java index 7c333a5a639..83b49c90ff1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java @@ -22,10 +22,10 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.jboss.logging.Logger; /** * Stops the backup in case of an error at the start of Replication. @@ -36,11 +36,11 @@ */ final class ReplicationError implements Interceptor { - private final ActiveMQServer server; + private static final Logger logger = Logger.getLogger(ReplicationError.class); + private LiveNodeLocator nodeLocator; - ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) { - this.server = server; + ReplicationError(LiveNodeLocator nodeLocator) { this.nodeLocator = nodeLocator; } @@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor { public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED) return true; + + if (logger.isTraceEnabled()) { + logger.trace("Received ReplicationError::" + packet); + } BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet; switch (message.getRegistrationProblem()) { case ALREADY_REPLICATING: diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 112af0afee0..92be7f87c24 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -101,6 +101,8 @@ public void init() throws Exception { @Override public void run() { try { + + logger.trace("SharedNothingBackupActivation..start"); synchronized (activeMQServer) { activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); } @@ -109,16 +111,24 @@ public void run() { activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize()); activeMQServer.getNodeManager().start(); synchronized (this) { - if (closed) + if (closed) { + logger.trace("SharedNothingBackupActivation is closed, ignoring activation!"); return; + } } boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled(); - if (!activeMQServer.initialisePart1(scalingDown)) + if (!activeMQServer.initialisePart1(scalingDown)) { + if (logger.isTraceEnabled()) { + logger.trace("could not initialize part1 " + scalingDown); + } return; + } + logger.trace("Waiting for a synchronize now..."); synchronized (this) { + logger.trace("Entered a synchronized"); if (closed) return; backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck); @@ -136,16 +146,12 @@ public void run() { ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); - if (logger.isTraceEnabled()) { - logger.trace("Waiting on cluster connection"); - } - //todo do we actually need to wait? + logger.trace("Waiting on cluster connection"); clusterController.awaitConnectionToReplicationCluster(); - if (logger.isTraceEnabled()) { - logger.trace("Cluster Connected"); - } - clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator)); + logger.trace("Cluster Connected"); + + clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator)); // nodeManager.startBackup(); if (logger.isTraceEnabled()) { @@ -319,13 +325,19 @@ public void run() { return; } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); + logger.trace("stop backup"); activeMQServer.getNodeManager().stopBackup(); + logger.trace("start store manager"); activeMQServer.getStorageManager().start(); + logger.trace("activated"); activeMQServer.getBackupManager().activated(); if (scalingDown) { + logger.trace("Scalling down..."); activeMQServer.initialisePart2(true); } else { + logger.trace("Setting up new activation"); activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy())); + logger.trace("initialize part 2"); activeMQServer.initialisePart2(false); if (activeMQServer.getIdentity() != null) { @@ -336,6 +348,8 @@ public void run() { } + logger.trace("completeActivation at the end"); + activeMQServer.completeActivation(); } } catch (Exception e) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 47c916aca21..b4ea62fc135 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -531,7 +531,11 @@ protected static final ClusterConnectionConfiguration basicClusterConnectionConf for (String c : connectors) { connectors0.add(c); } - ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0); + ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration(). + setName("cluster1").setAddress("jms").setConnectorName(connectorName). + setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1). + setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT). + setStaticConnectors(connectors0); return clusterConnectionConfiguration; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java index f192506463d..8889ec5f31e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java @@ -18,22 +18,23 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; +import org.junit.Ignore; import org.junit.Test; public class LargeMessageFailoverTest extends FailoverTest { @Override @Test + @Ignore public void testLiveAndBackupLiveComesBackNewFactory() throws Exception { // skip test because it triggers OutOfMemoryError. - Thread.sleep(1000); } @Override @Test + @Ignore public void testLiveAndBackupBackupComesBackNewFactory() throws Exception { // skip test because it triggers OutOfMemoryError. - Thread.sleep(1000); } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java index 392af317a2d..494f0986470 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java @@ -135,7 +135,7 @@ public int getBackupServerCount() { @Override public boolean isNetty() { - return false; + return true; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java index 472d32795ec..abd08b82f99 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java @@ -86,7 +86,7 @@ private static TransportConfiguration transportConfiguration(String classname, b private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams); } @@ -102,7 +102,7 @@ private static TransportConfiguration transportConfiguration(String classname, String name) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams, name); }