Skip to content

Commit

Permalink
ARTEMIS-2011 Fixing incompatibility of AddressSettings encode between…
Browse files Browse the repository at this point in the history
… versions

To fix this I added a retry on AddressSettings using code that's closer to the original version

(cherry picked from commit b710df7)
  • Loading branch information
clebertsuconic committed Aug 6, 2018
1 parent 58add60 commit 1eebe46
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 16 deletions.
Expand Up @@ -20,9 +20,9 @@

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.Mergeable;
import org.apache.activemq.artemis.utils.BufferHelper;
import org.apache.activemq.artemis.utils.DataConstants;
Expand Down Expand Up @@ -671,6 +671,17 @@ public void merge(final AddressSettings merged) {

@Override
public void decode(ActiveMQBuffer buffer) {
int original = buffer.readerIndex();
try {
decode(buffer, false);
} catch (Throwable e) {
buffer.readerIndex(original);
// Try a compatible version where the wire was broken
decode(buffer, true);
}
}

public void decode(ActiveMQBuffer buffer, boolean tryCompatible) {
SimpleString policyStr = buffer.readNullableSimpleString();

if (policyStr != null) {
Expand Down Expand Up @@ -733,7 +744,7 @@ public void decode(ActiveMQBuffer buffer) {

autoDeleteQueues = BufferHelper.readNullableBoolean(buffer);

policyStr = buffer.readNullableSimpleString();
policyStr = tryCompatible ? null : buffer.readNullableSimpleString();

if (policyStr != null) {
configDeleteQueues = DeletionPolicy.valueOf(policyStr.toString());
Expand All @@ -745,7 +756,7 @@ public void decode(ActiveMQBuffer buffer) {

autoDeleteAddresses = BufferHelper.readNullableBoolean(buffer);

policyStr = buffer.readNullableSimpleString();
policyStr = tryCompatible ? null : buffer.readNullableSimpleString();

if (policyStr != null) {
configDeleteAddresses = DeletionPolicy.valueOf(policyStr.toString());
Expand Down
52 changes: 52 additions & 0 deletions tests/compatibility-tests/pom.xml
Expand Up @@ -350,6 +350,50 @@
<variableName>ARTEMIS-240</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>210-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.1.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.1.0</arg>
<arg>org.apache.activemq:artemis-cli:2.1.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.1.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.1.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.1.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<variableName>ARTEMIS-210</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>200-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.0.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.0.0</arg>
<arg>org.apache.activemq:artemis-cli:2.0.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.0.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.0.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.0.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<variableName>ARTEMIS-200</variableName>
</configuration>
</execution>
<execution>
<id>140-check</id>
<phase>compile</phase>
Expand Down Expand Up @@ -453,6 +497,14 @@
<name>ARTEMIS-SNAPSHOT</name>
<value>${ARTEMIS-SNAPSHOT}</value>
</property>
<property>
<name>ARTEMIS-200</name>
<value>${ARTEMIS-200}</value>
</property>
<property>
<name>ARTEMIS-210</name>
<value>${ARTEMIS-210}</value>
</property>
<property>
<name>ARTEMIS-240</name>
<value>${ARTEMIS-240}</value>
Expand Down
Expand Up @@ -30,6 +30,8 @@ public class GroovyRun {
public static final String SNAPSHOT = "ARTEMIS-SNAPSHOT";
public static final String ONE_FIVE = "ARTEMIS-155";
public static final String ONE_FOUR = "ARTEMIS-140";
public static final String TWO_ZERO = "ARTEMIS-200";
public static final String TWO_ONE = "ARTEMIS-210";
public static final String TWO_FOUR = "ARTEMIS-240";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";
Expand Down
@@ -1,4 +1,7 @@
package servers

import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand All @@ -17,15 +20,11 @@ package servers
*/

// starts an artemis server

import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.JournalType
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;


String folder = arg[0];
String id = arg[1];
Expand Down Expand Up @@ -62,3 +61,31 @@ server.start();

server.getJMSServerManager().createTopic(true, "topic");
server.getJMSServerManager().createQueue(true, "queue", null, true);

if (setAddressSettings) {

// this is to force records that will have pittfals between versions
server.getJMSServerManager().getActiveMQServer().getActiveMQServerControl().
addAddressSettings("ad1", //@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
"dla", // @Parameter(desc = "the dead letter address setting", name = "DLA") String DLA,
"exp", //@Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress,
0l, //@Parameter(desc = "the expiry delay setting", name = "expiryDelay") long expiryDelay,
false, //@Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue,
1, //@Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
10 * 1024 * 1024, //@Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
1024 * 1024, //@Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
3, //@Parameter(desc = "the max number of pages in the soft memory cache", name = "pageMaxCacheSize") int pageMaxCacheSize,
0l, //@Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
0, //@Parameter(desc = "the redelivery delay multiplier", name = "redeliveryMultiplier") double redeliveryMultiplier,
0, //@Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
0, //@Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
false, //@Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,
"BLOCK", //@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
1000, //@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
1000, //@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
"NOTIFY", //@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
true, //@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
true, // @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
true, //@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
true) //@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
}
Expand Up @@ -19,6 +19,8 @@

import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ONE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ZERO;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -62,6 +64,8 @@ public static Collection getParameters() {
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});

combinations.add(new Object[]{null, TWO_ZERO, SNAPSHOT});
combinations.add(new Object[]{null, TWO_ONE, SNAPSHOT});
combinations.add(new Object[]{null, TWO_FOUR, SNAPSHOT});
// the purpose on this one is just to validate the test itself.
/// if it can't run against itself it won't work at all
Expand Down Expand Up @@ -94,12 +98,12 @@ public void tearDown() {
@Test
public void testSendReceive() throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader);

setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);

setVariable(receiverClassloader, "latch", null);
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
Expand All @@ -112,12 +116,12 @@ public void testSendReceive() throws Throwable {
@Test
public void testSendReceiveQueueMetrics() throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader);

setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);

setVariable(receiverClassloader, "latch", null);
evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
Expand All @@ -132,14 +136,14 @@ public void testSendReceiveQueueMetrics() throws Throwable {
public void testSendReceiveSizeQueueMetricsPaging() throws Throwable {
setVariable(senderClassloader, "persistent", true);
//Set max size to 1 to cause messages to immediately go to the paging store
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1));
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1), true);
evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
stopServer(senderClassloader);

setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1));
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1), false);
evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");


Expand Down
Expand Up @@ -190,6 +190,10 @@ public void startServer(File folder, ClassLoader loader, String serverName) thro
}

public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable {
startServer(folder, loader, serverName, globalMaxSize, false);

}
public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize, boolean setAddressSettings) throws Throwable {
folder.mkdirs();

String scriptToUse;
Expand All @@ -201,6 +205,7 @@ public void startServer(File folder, ClassLoader loader, String serverName, Stri
scriptToUse = "servers/hornetqServer.groovy";
}

setVariable(loader, "setAddressSettings", setAddressSettings);
evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize);
}
public void stopServer(ClassLoader loader) throws Throwable {
Expand Down

0 comments on commit 1eebe46

Please sign in to comment.