Skip to content

Commit

Permalink
ARTEMIS-3767 Incompatibility on replication between 2.17 and current …
Browse files Browse the repository at this point in the history
…version
  • Loading branch information
clebertsuconic committed Jul 18, 2022
1 parent 7bc3b02 commit 8e54a65
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ default boolean isVersionUsingLongOnPageReplication() {
return version >= PacketImpl.ARTEMIS_2_24_0_VERSION;
}

default boolean isBeforeTwoEighteen() {
int version = getChannelVersion();
return version < PacketImpl.ARTEMIS_2_18_0_VERSION;
}

/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingCo
break;
}
case REPLICATION_APPEND: {
packet = new ReplicationAddMessage();
packet = new ReplicationAddMessage(connection.isBeforeTwoEighteen());
break;
}
case REPLICATION_APPEND_TX: {
packet = new ReplicationAddTXMessage();
packet = new ReplicationAddTXMessage(connection.isBeforeTwoEighteen());
break;
}
case REPLICATION_DELETE: {
Expand Down Expand Up @@ -222,7 +222,7 @@ private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingCo
break;
}
case PacketImpl.REPLICATION_START_FINISH_SYNC: {
packet = new ReplicationStartSyncMessage();
packet = new ReplicationStartSyncMessage(connection.isBeforeTwoEighteen());
break;
}
case PacketImpl.REPLICATION_SYNC_FILE: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ public final class ReplicationAddMessage extends PacketImpl {

private byte[] recordData;

public ReplicationAddMessage() {
// this is for version compatibility
private final boolean beforeTwoEighteen;

public ReplicationAddMessage(final boolean beforeTwoEighteen) {
super(PacketImpl.REPLICATION_APPEND);
this.beforeTwoEighteen = beforeTwoEighteen;
}

public ReplicationAddMessage(final byte journalID,
public ReplicationAddMessage(final boolean beforeTwoEighteen,
final byte journalID,
final ADD_OPERATION_TYPE operation,
final long id,
final byte journalRecordType,
final Persister persister,
final Object encodingData) {
this();
this(beforeTwoEighteen);
this.journalID = journalID;
this.operation = operation;
this.id = id;
Expand All @@ -77,7 +82,11 @@ public int expectedEncodeSize() {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeByte(operation.toRecord());
if (beforeTwoEighteen) {
buffer.writeBoolean(operation == ADD_OPERATION_TYPE.UPDATE);
} else {
buffer.writeByte(operation.toRecord());
}
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
buffer.writeInt(persister.getEncodeSize(encodingData));
Expand All @@ -87,7 +96,16 @@ public void encodeRest(final ActiveMQBuffer buffer) {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
if (beforeTwoEighteen) {
boolean isUpdate = buffer.readBoolean();
if (isUpdate) {
operation = ADD_OPERATION_TYPE.UPDATE;
} else {
operation = ADD_OPERATION_TYPE.ADD;
}
} else {
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
}
id = buffer.readLong();
journalRecordType = buffer.readByte();
final int recordDataSize = buffer.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,23 @@ public class ReplicationAddTXMessage extends PacketImpl {

private ADD_OPERATION_TYPE operation;

public ReplicationAddTXMessage() {
// this is for version compatibility
private final boolean beforeTwoEighteen;

public ReplicationAddTXMessage(final boolean beforeTwoEighteen) {
super(PacketImpl.REPLICATION_APPEND_TX);
this.beforeTwoEighteen = beforeTwoEighteen;
}

public ReplicationAddTXMessage(final byte journalID,
public ReplicationAddTXMessage(final boolean beforeTwoEighteen,
final byte journalID,
final ADD_OPERATION_TYPE operation,
final long txId,
final long id,
final byte recordType,
final Persister persister,
final Object encodingData) {
this();
this(beforeTwoEighteen);
this.journalID = journalID;
this.operation = operation;
this.txId = txId;
Expand All @@ -82,7 +87,11 @@ public int expectedEncodeSize() {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeByte(operation.toRecord());
if (beforeTwoEighteen) {
buffer.writeBoolean(operation == ADD_OPERATION_TYPE.UPDATE);
} else {
buffer.writeByte(operation.toRecord());
}
buffer.writeLong(txId);
buffer.writeLong(id);
buffer.writeByte(recordType);
Expand All @@ -93,7 +102,16 @@ public void encodeRest(final ActiveMQBuffer buffer) {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
if (beforeTwoEighteen) {
boolean isUpdate = buffer.readBoolean();
if (isUpdate) {
operation = ADD_OPERATION_TYPE.UPDATE;
} else {
operation = ADD_OPERATION_TYPE.ADD;
}
} else {
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
}
txId = buffer.readLong();
id = buffer.readLong();
recordType = buffer.readByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;

import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.List;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;

import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.List;

/**
* This message may signal start or end of the replication synchronization.
* <p>
Expand All @@ -40,6 +40,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
private String nodeID;
private boolean allowsAutoFailBack;

// this is for version compatibility
// certain versions will need to interrupt encoding and decoding after synchronizationIsFinished on the encoding depending on its value
private final boolean beforeTwoEighteen;

public enum SyncDataType {
JournalBindings(AbstractJournalStorageManager.JournalContent.BINDINGS.typeByte),
JournalMessages(AbstractJournalStorageManager.JournalContent.MESSAGES.typeByte),
Expand Down Expand Up @@ -70,12 +74,13 @@ public static SyncDataType getDataType(byte code) {
}
}

public ReplicationStartSyncMessage() {
public ReplicationStartSyncMessage(boolean beforeTwoEighteen) {
super(REPLICATION_START_FINISH_SYNC);
this.beforeTwoEighteen = synchronizationIsFinished;
}

public ReplicationStartSyncMessage(List<Long> filenames) {
this();
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, List<Long> filenames) {
this(beforeTwoEighteen);
ids = new long[filenames.size()];
for (int i = 0; i < filenames.size(); i++) {
ids[i] = filenames.get(i);
Expand All @@ -85,24 +90,24 @@ public ReplicationStartSyncMessage(List<Long> filenames) {
}


public ReplicationStartSyncMessage(String nodeID, long nodeDataVersion) {
this(nodeID);
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, String nodeID, long nodeDataVersion) {
this(beforeTwoEighteen, nodeID);
ids = new long[1];
ids[0] = nodeDataVersion;
dataType = SyncDataType.ActivationSequence;
}

public ReplicationStartSyncMessage(String nodeID) {
this();
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, String nodeID) {
this(beforeTwoEighteen);
synchronizationIsFinished = true;
this.nodeID = nodeID;
}

public ReplicationStartSyncMessage(JournalFile[] datafiles,
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, JournalFile[] datafiles,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) {
this();
this(beforeTwoEighteen);
this.nodeID = nodeID;
this.allowsAutoFailBack = allowsAutoFailBack;
synchronizationIsFinished = false;
Expand Down Expand Up @@ -143,6 +148,10 @@ public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeBoolean(synchronizationIsFinished);
buffer.writeBoolean(allowsAutoFailBack);
buffer.writeString(nodeID);
if (beforeTwoEighteen && synchronizationIsFinished) {
// At this point, pre 2.18.0 servers don't expect any more data to come.
return;
}
buffer.writeByte(dataType.code);
buffer.writeInt(ids.length);
for (long id : ids) {
Expand All @@ -155,6 +164,10 @@ public void decodeRest(final ActiveMQBuffer buffer) {
synchronizationIsFinished = buffer.readBoolean();
allowsAutoFailBack = buffer.readBoolean();
nodeID = buffer.readString();
if (buffer.readableBytes() == 0) {
// Pre-2.18.0 server wouldn't send anything more than this.
return;
}
dataType = SyncDataType.getDataType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void appendUpdateRecord(final byte journalID,
final Persister persister,
final Object record) throws Exception {
if (enabled) {
sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
sendReplicatePacket(new ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, id, recordType, persister, record));
}
}

Expand All @@ -242,7 +242,7 @@ public void appendAddRecordTransactional(final byte journalID,
final Persister persister,
final Object record) throws Exception {
if (enabled) {
sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
sendReplicatePacket(new ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, txID, id, recordType, persister, record));
}
}

Expand Down Expand Up @@ -801,7 +801,7 @@ public void sendStartSyncMessage(JournalFile[] datafiles,
String nodeID,
boolean allowsAutoFailBack) throws ActiveMQException {
if (enabled)
sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID, allowsAutoFailBack));
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), datafiles, contentType, nodeID, allowsAutoFailBack));
}

/**
Expand All @@ -820,7 +820,7 @@ public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTi
}

synchronizationIsFinishedAcknowledgement.countUp();
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID, server.getNodeManager().getNodeActivationSequence()));
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), nodeID, server.getNodeManager().getNodeActivationSequence()));
try {
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
ActiveMQReplicationTimeooutException exception = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
Expand Down Expand Up @@ -865,7 +865,7 @@ public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> largeMes
idsToSend = new ArrayList<>(largeMessages.keySet());

if (enabled)
sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend));
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), idsToSend));
}

/**
Expand Down
60 changes: 60 additions & 0 deletions tests/compatibility-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,58 @@
<variableName>ARTEMIS-2_10_0</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>2_17_0-check</id>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.17.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.17.0</arg>
<arg>org.apache.activemq:artemis-cli:2.17.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.17.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.17.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.17.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
<arg>org.jboss.marshalling:jboss-marshalling-river:2.0.9.Final</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-2_17_0</variableName>
</configuration>
</execution> <execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>2_18_0-check</id>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.18.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.18.0</arg>
<arg>org.apache.activemq:artemis-cli:2.18.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.18.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.18.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.18.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
<arg>org.jboss.marshalling:jboss-marshalling-river:2.0.9.Final</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-2_18_0</variableName>
</configuration>
</execution>

<execution>
<phase>compile</phase>
<goals>
Expand Down Expand Up @@ -661,6 +713,14 @@
<name>ARTEMIS-2_10_0</name> <!-- 2.10.0 -->
<value>${ARTEMIS-2_10_0}</value>
</property>
<property>
<name>ARTEMIS-2_17_0</name>
<value>${ARTEMIS-2_17_0}</value>
</property>
<property>
<name>ARTEMIS-2_18_0</name>
<value>${ARTEMIS-2_18_0}</value>
</property>
<property>
<name>ARTEMIS-2_22_0</name>
<value>${ARTEMIS-2_22_0}</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class GroovyRun {
public static final String TWO_SIX_THREE = "ARTEMIS-263";
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
public static final String TWO_TEN_ZERO = "ARTEMIS-2_10_0";
public static final String TWO_SEVENTEEN_ZERO = "ARTEMIS-2_17_0";
public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0";
public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port);
configuration.addConnectorConfiguration("local", "tcp://localhost:" + port);
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true);
configuration.setGlobalMaxMessages(100);

if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) {
configuration.globalMaxMessages = 10
} else {
configuration.globalMaxSize = 10 * 1024
}
configuration.setHAPolicyConfiguration(new ReplicaPolicyConfiguration().setClusterName("main"))
configuration.addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));

Expand Down
Loading

0 comments on commit 8e54a65

Please sign in to comment.