diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index bdb4bd163125..405ed07a9344 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT) - void errorDecodingPacket(@Cause Exception e); + void errorDecodingPacket(@Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 75c23de8bd19..39bddf5d29c8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -462,6 +462,10 @@ public void setCommandConfirmationHandler(final CommandConfirmationHandler handl @Override public void setHandler(final ChannelHandler handler) { + if (logger.isTraceEnabled()) { + logger.trace("Setting handler on " + this + " as " + handler); + } + this.handler = handler; } @@ -521,6 +525,9 @@ public void replayCommands(final int otherLastConfirmedCommandID) { @Override public void lock() { + if (logger.isTraceEnabled()) { + logger.trace("lock channel " + this); + } lock.lock(); reconnectID.incrementAndGet(); @@ -532,6 +539,9 @@ public void lock() { @Override public void unlock() { + if (logger.isTraceEnabled()) { + logger.trace("unlock channel " + this); + } lock.lock(); failingOver = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index cc1d6852b1f5..e0837e9ef8ed 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -363,7 +363,7 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe doBufferReceived(packet); super.bufferReceived(connectionID, buffer); - } catch (Exception e) { + } catch (Throwable e) { ActiveMQClientLogger.LOGGER.errorDecodingPacket(e); throw new IllegalStateException(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index b26084bb4c58..c01dd4fff779 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -38,8 +38,9 @@ public boolean isSynchronizationIsFinishedAcknowledgement() { return synchronizationIsFinishedAcknowledgement; } - public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { + public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; + return this; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index f879aeb4347a..bd140d2cbe3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -210,7 +210,18 @@ public void handlePacket(final Packet packet) { ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet); response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)); } - channel.send(response); + + if (response != null) { + if (logger.isTraceEnabled()) { + logger.trace("Returning " + response); + } + + if (response != null) { + channel.send(response); + } + } else { + logger.trace("Response is null, ignoring response"); + } } /** @@ -332,34 +343,68 @@ public void setChannel(final Channel channel) { private void finishSynchronization(String liveID) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("finishSynchronization::" + liveID); + logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID); } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); + if (logger.isTraceEnabled()) { + logger.trace("getting lock on " + jc + ", journal = " + journal); + } journal.synchronizationLock(); try { + if (logger.isTraceEnabled()) { + logger.trace("lock acquired on " + jc); + } // files should be already in place. filesReservedForSync.remove(jc); registerJournal(jc.typeByte, journal); + if (logger.isTraceEnabled()) { + logger.trace("stopping journal for " + jc); + } journal.stop(); + if (logger.isTraceEnabled()) { + logger.trace("starting journal for " + jc); + } journal.start(); + if (logger.isTraceEnabled()) { + logger.trace("loadAndSync " + jc); + } journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE); } finally { + if (logger.isTraceEnabled()) { + logger.trace("unlocking " + jc); + } journal.synchronizationUnlock(); } } + + if (logger.isTraceEnabled()) { + logger.trace("Sync on large messages..."); + } ByteBuffer buffer = ByteBuffer.allocate(4 * 1024); for (Entry entry : largeMessages.entrySet()) { ReplicatedLargeMessage lm = entry.getValue(); if (lm instanceof LargeServerMessageInSync) { LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm; + if (logger.isTraceEnabled()) { + logger.trace("lmSync on " + lmSync.toString()); + } lmSync.joinSyncedData(buffer); } } + if (logger.isTraceEnabled()) { + logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID); + } + journalsHolder = null; backupQuorum.liveIDSet(liveID); activation.setRemoteBackupUpToDate(); + + if (logger.isTraceEnabled()) { + logger.trace("Backup is synchronized / BACKUP-SYNC-DONE"); + } + ActiveMQServerLogger.LOGGER.backupServerSynched(server); return; } @@ -428,13 +473,28 @@ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final if (logger.isTraceEnabled()) { logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); } - ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (!started) - return replicationResponseMessage; if (packet.isSynchronizationFinished()) { - finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + executor.execute(() -> { + try { + // this is a long running process, we cannot block the reading thread from netty + finishSynchronization(packet.getNodeID()); + if (logger.isTraceEnabled()) { + logger.trace("returning completion on synchronization catchup"); + } + channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true)); + } catch (Exception e) { + logger.warn(e.getMessage()); + channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e))); + } + + }); + // the write will happen through an executor + return null; + } + + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); + if (!started) { return replicationResponseMessage; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 398f4527f567..3b6f9d6253e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -356,15 +356,16 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp } if (enabled) { - pendingTokens.add(repliToken); if (useExecutor) { replicationStream.execute(() -> { if (enabled) { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } }); } else { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } @@ -411,9 +412,9 @@ private void replicated() { OperationContext ctx = pendingTokens.poll(); if (ctx == null) { - throw new IllegalStateException("Missing replication token on the queue."); + logger.warn("Missing replication token on queue"); + return; } - ctx.replicationDone(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java index 7c333a5a6390..83b49c90ff12 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java @@ -22,10 +22,10 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.jboss.logging.Logger; /** * Stops the backup in case of an error at the start of Replication. @@ -36,11 +36,11 @@ */ final class ReplicationError implements Interceptor { - private final ActiveMQServer server; + private static final Logger logger = Logger.getLogger(ReplicationError.class); + private LiveNodeLocator nodeLocator; - ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) { - this.server = server; + ReplicationError(LiveNodeLocator nodeLocator) { this.nodeLocator = nodeLocator; } @@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor { public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED) return true; + + if (logger.isTraceEnabled()) { + logger.trace("Received ReplicationError::" + packet); + } BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet; switch (message.getRegistrationProblem()) { case ALREADY_REPLICATING: diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index d45abe3a8041..fcba00c8d964 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -101,6 +101,8 @@ public void init() throws Exception { @Override public void run() { try { + + logger.trace("SharedNothingBackupActivation..start"); synchronized (activeMQServer) { activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); } @@ -109,16 +111,24 @@ public void run() { activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize()); activeMQServer.getNodeManager().start(); synchronized (this) { - if (closed) + if (closed) { + logger.trace("SharedNothingBackupActivation is closed, ignoring activation!"); return; + } } boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled(); - if (!activeMQServer.initialisePart1(scalingDown)) + if (!activeMQServer.initialisePart1(scalingDown)) { + if (logger.isTraceEnabled()) { + logger.trace("could not initialize part1 " + scalingDown); + } return; + } + logger.trace("Waiting for a synchronize now..."); synchronized (this) { + logger.trace("Entered a synchronized"); if (closed) return; backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); @@ -136,16 +146,12 @@ public void run() { ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); - if (logger.isTraceEnabled()) { - logger.trace("Waiting on cluster connection"); - } - //todo do we actually need to wait? + logger.trace("Waiting on cluster connection"); clusterController.awaitConnectionToReplicationCluster(); - if (logger.isTraceEnabled()) { - logger.trace("Cluster Connected"); - } - clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator)); + logger.trace("Cluster Connected"); + + clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator)); // nodeManager.startBackup(); if (logger.isTraceEnabled()) { @@ -320,13 +326,19 @@ public void run() { return; } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); + logger.trace("stop backup"); activeMQServer.getNodeManager().stopBackup(); + logger.trace("start store manager"); activeMQServer.getStorageManager().start(); + logger.trace("activated"); activeMQServer.getBackupManager().activated(); if (scalingDown) { + logger.trace("Scalling down..."); activeMQServer.initialisePart2(true); } else { + logger.trace("Setting up new activation"); activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy())); + logger.trace("initialize part 2"); activeMQServer.initialisePart2(false); if (activeMQServer.getIdentity() != null) { @@ -337,6 +349,8 @@ public void run() { } + logger.trace("completeActivation at the end"); + activeMQServer.completeActivation(); } } catch (Exception e) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 1b35393eabcb..a95f77a25627 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -534,7 +534,11 @@ protected static final ClusterConnectionConfiguration basicClusterConnectionConf for (String c : connectors) { connectors0.add(c); } - ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0); + ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration(). + setName("cluster1").setAddress("jms").setConnectorName(connectorName). + setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1). + setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT). + setStaticConnectors(connectors0); return clusterConnectionConfiguration; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java index 38bf424358a7..383b371ca210 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java @@ -136,7 +136,7 @@ public int getBackupServerCount() { @Override public boolean isNetty() { - return false; + return true; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java index 472d32795ec2..7ae1b90a94ca 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java @@ -102,7 +102,7 @@ private static TransportConfiguration transportConfiguration(String classname, String name) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams, name); }