From a9f3dcfb10314763d5feeff6d56de9694cc0ede1 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 16 Aug 2017 12:08:31 -0500 Subject: [PATCH] ARTEMIS-1353 ensure replication packet order Incorrect ordering of replication packets may happen because of useExecutor parameter in the sendReplicatePacket method. ReplicationStartSyncMessage packets are sent as first, but they are sent with useExecutor=true. Although ReplicationSyncFileMessage packets are sent after ReplicationStartSyncMessage packets, they are sent with useExecutor=false. So sending of ReplicationStartSyncMessage packets is scheduled to executor and there is no guarantee when the task will be executed, whereas ReplicationStartSyncMessage packets are sent immediately. The solution is to wait for an ack for ReplicationStartSyncMessages. --- .../impl/journal/JournalStorageManager.java | 33 ++++++++++-- .../ReplicationResponseMessageV2.java | 22 +++++--- .../ReplicationStartSyncMessage.java | 34 ++---------- .../core/replication/ReplicationEndpoint.java | 5 +- .../core/replication/ReplicationManager.java | 53 ++++++++++--------- .../cluster/util/BackupSyncDelay.java | 2 +- 6 files changed, 80 insertions(+), 69 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 2341a66223c..3c906121f46 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; @@ -515,10 +516,27 @@ private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) th private JournalFile[] prepareJournalForCopy(Journal journal, JournalContent contentType, String nodeID, - boolean autoFailBack) throws Exception { + boolean autoFailBack, + long initialReplicationSyncTimeout) throws Exception { journal.forceMoveNextFile(); JournalFile[] datafiles = journal.getDataFiles(); - replicator.sendStartSyncMessage(datafiles, contentType, nodeID, autoFailBack); + long[] ids = new long[datafiles.length]; + for (int i = 0; i < datafiles.length; i++) { + ids[i] = datafiles[i].getFileID(); + } + ReplicationStartSyncMessage.SyncDataType dataType; + switch (contentType) { + case MESSAGES: + dataType = ReplicationStartSyncMessage.SyncDataType.JournalMessages; + break; + case BINDINGS: + dataType = ReplicationStartSyncMessage.SyncDataType.JournalBindings; + break; + default: + throw new IllegalArgumentException(); + } + + replicator.sendStartSyncMessage(ids, dataType, nodeID, autoFailBack, initialReplicationSyncTimeout); return datafiles; } @@ -568,8 +586,8 @@ public void startReplication(ReplicationManager replicationManager, pagingManager.lock(); try { pagingManager.disableCleanup(); - messageFiles = prepareJournalForCopy(originalMessageJournal, JournalContent.MESSAGES, nodeID, autoFailBack); - bindingsFiles = prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID, autoFailBack); + messageFiles = prepareJournalForCopy(originalMessageJournal, JournalContent.MESSAGES, nodeID, autoFailBack, initialReplicationSyncTimeout); + bindingsFiles = prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID, autoFailBack, initialReplicationSyncTimeout); pageFilesToSync = getPageInformationForSync(pagingManager); pendingLargeMessages = recoverPendingLargeMessages(); } finally { @@ -584,7 +602,12 @@ public void startReplication(ReplicationManager replicationManager, // We need to send the list while locking otherwise part of the body might get sent too soon // it will send a list of IDs that we are allocating - replicator.sendLargeMessageIdListMessage(pendingLargeMessages); + ArrayList idsToSend = new ArrayList<>(pendingLargeMessages.keySet()); + long[] ids = new long[idsToSend.size()]; + for (int i = 0; i < idsToSend.size(); i++) { + ids[i] = idsToSend.get(i); + } + replicator.sendStartSyncMessage(ids, ReplicationStartSyncMessage.SyncDataType.LargeMessages, "", false, initialReplicationSyncTimeout); } finally { storageManagerLock.writeLock().unlock(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index c01dd4fff77..35c3f660b71 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -23,12 +23,7 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage { boolean synchronizationIsFinishedAcknowledgement = false; - - public ReplicationResponseMessageV2(final boolean synchronizationIsFinishedAcknowledgement) { - super(REPLICATION_RESPONSE_V2); - - this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; - } + boolean synchronizationIsStartedAcknowledgement = false; public ReplicationResponseMessageV2() { super(PacketImpl.REPLICATION_RESPONSE_V2); @@ -43,28 +38,41 @@ public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement( return this; } + public boolean isSynchronizationIsStartedAcknowledgement() { + return synchronizationIsStartedAcknowledgement; + } + + public ReplicationResponseMessageV2 setSynchronizationIsStartedAcknowledgement(boolean synchronizationIsStartedAcknowledgement) { + this.synchronizationIsStartedAcknowledgement = synchronizationIsStartedAcknowledgement; + return this; + } + @Override public int expectedEncodeSize() { return PACKET_HEADERS_SIZE + - DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement); + DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsStartedAcknowledgement); } @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); buffer.writeBoolean(synchronizationIsFinishedAcknowledgement); + buffer.writeBoolean(synchronizationIsStartedAcknowledgement); } @Override public void decodeRest(final ActiveMQBuffer buffer) { super.decodeRest(buffer); synchronizationIsFinishedAcknowledgement = buffer.readBoolean(); + synchronizationIsStartedAcknowledgement = buffer.readBoolean(); } @Override public String toString() { StringBuffer buf = new StringBuffer(getParentString()); buf.append(", synchronizationIsFinishedAcknowledgement=" + synchronizationIsFinishedAcknowledgement); + buf.append(", synchronizationIsStartedAcknowledgement=" + synchronizationIsStartedAcknowledgement); buf.append("]"); return buf.toString(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java index 018535f426b..864db5ec5b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java @@ -18,10 +18,8 @@ import java.security.InvalidParameterException; import java.util.Arrays; -import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.utils.DataConstants; @@ -70,44 +68,22 @@ public ReplicationStartSyncMessage() { super(REPLICATION_START_FINISH_SYNC); } - public ReplicationStartSyncMessage(List filenames) { - this(); - ids = new long[filenames.size()]; - for (int i = 0; i < filenames.size(); i++) { - ids[i] = filenames.get(i); - } - dataType = SyncDataType.LargeMessages; - nodeID = ""; // this value will be ignored - } - public ReplicationStartSyncMessage(String nodeID) { this(); synchronizationIsFinished = true; this.nodeID = nodeID; } - public ReplicationStartSyncMessage(JournalFile[] datafiles, - AbstractJournalStorageManager.JournalContent contentType, + public ReplicationStartSyncMessage(long[] ids, + SyncDataType dataType, String nodeID, boolean allowsAutoFailBack) { this(); this.nodeID = nodeID; this.allowsAutoFailBack = allowsAutoFailBack; - synchronizationIsFinished = false; - ids = new long[datafiles.length]; - for (int i = 0; i < datafiles.length; i++) { - ids[i] = datafiles[i].getFileID(); - } - switch (contentType) { - case MESSAGES: - dataType = SyncDataType.JournalMessages; - break; - case BINDINGS: - dataType = SyncDataType.JournalBindings; - break; - default: - throw new IllegalArgumentException(); - } + this.synchronizationIsFinished = false; + this.ids = ids; + this.dataType = dataType; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 4d8161316f4..5725239cf7a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -475,8 +475,9 @@ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final if (packet.isSynchronizationFinished()) { finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); - return replicationResponseMessage; + return replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + } else { + replicationResponseMessage.setSynchronizationIsStartedAcknowledgement(true); } switch (packet.getDataType()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 3b6f9d6253e..cde15ed6155 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -19,9 +19,8 @@ import java.io.FileInputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashSet; -import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -33,7 +32,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -134,6 +132,8 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) { private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); + private final ReusableLatch synchronizationIsStartedAcknowledgement = new ReusableLatch(0); + /** * @param remotingConnection */ @@ -459,6 +459,9 @@ public void handlePacket(final Packet packet) { if (replicationResponseMessage.isSynchronizationIsFinishedAcknowledgement()) { synchronizationIsFinishedAcknowledgement.countDown(); } + if (replicationResponseMessage.isSynchronizationIsStartedAcknowledgement()) { + synchronizationIsStartedAcknowledgement.countDown(); + } } } } @@ -576,16 +579,32 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, /** * Reserve the following fileIDs in the backup server. * - * @param datafiles + * @param ids * @param contentType * @throws ActiveMQException */ - public void sendStartSyncMessage(JournalFile[] datafiles, - AbstractJournalStorageManager.JournalContent contentType, + public void sendStartSyncMessage(long[] ids, + ReplicationStartSyncMessage.SyncDataType contentType, String nodeID, - boolean allowsAutoFailBack) throws ActiveMQException { - if (enabled) - sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID, allowsAutoFailBack)); + boolean allowsAutoFailBack, + long initialReplicationSyncTimeout) throws ActiveMQException { + if (enabled) { + + if (logger.isTraceEnabled()) { + logger.trace("sendStartSyncMessage ::" + Arrays.toString(ids) + ", " + contentType + ", " + nodeID + ", " + allowsAutoFailBack + ", " + initialReplicationSyncTimeout); + } + + synchronizationIsStartedAcknowledgement.countUp(); + sendReplicatePacket(new ReplicationStartSyncMessage(ids, contentType, nodeID, allowsAutoFailBack)); + try { + if (!synchronizationIsStartedAcknowledgement.await(initialReplicationSyncTimeout)) { + logger.trace("sendStartSyncMessage wasn't finished in time"); + throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + } + } catch (InterruptedException e) { + logger.debug(e); + } + } } /** @@ -619,22 +638,6 @@ public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTi } } - /** - * Reserves several LargeMessage IDs in the backup. - *

- * Doing this before hand removes the need of synchronizing large-message deletes with the - * largeMessageSyncList. - * - * @param largeMessages - */ - public void sendLargeMessageIdListMessage(Map> largeMessages) { - ArrayList idsToSend; - idsToSend = new ArrayList<>(largeMessages.keySet()); - - if (enabled) - sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend)); - } - /** * Notifies the backup that the live server is stopping. *

diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index e4afb5be51c..e57d5c5809d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -170,7 +170,7 @@ public synchronized void handlePacket(Packet packet) { receivedUpToDate = true; assert onHold == null; onHold = packet; - PacketImpl response = new ReplicationResponseMessageV2(true); + PacketImpl response = new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true); channel.send(response); return; }