Skip to content
Permalink
Browse files
ARTEMIS-3776 Avoid Integer.MAX_VALUE overflow on openwire clients
Older versions of Openwire clients wil be affected by AMQ-6431.
As a result of the issue if the ID of the message>Integer.MAX_VALUE
a consumer configured with Failover and doing duplicate detection on the client
will not be able to process duplicate detection accordingly and miss messages.
  • Loading branch information
clebertsuconic committed Apr 13, 2022
1 parent 316fe8a commit bc17acd6da345cce3f648d7afd03c0bfe60031c3
Showing 8 changed files with 267 additions and 2 deletions.
@@ -624,8 +624,10 @@ private static ActiveMQMessage toAMQMessage(MessageReference reference,
mid = new MessageId(midString.toString());
} else {
//JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id)
String midd = "ID:" + serverNodeUUID + ":-1:-1:-1";
mid = new MessageId(midd, coreMessage.getMessageID());
// ARTEMIS-3776 due to AMQ-6431 some older clients will not be able to receive messages
// if using a failover schema due to the messageID overFlowing Integer.MAX_VALUE
String midd = "ID:" + serverNodeUUID + ":-1:-1:" + (coreMessage.getMessageID() / Integer.MAX_VALUE);
mid = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
}

amqMsg.setMessageId(mid);
@@ -87,6 +87,10 @@ public ArtemisCloseable closeableReadLock() {
return dummy;
}

protected void setNextId(long id) {
idSequence.set(id);
}

public NullStorageManager() {
this(new IOCriticalErrorListener() {
@Override
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.core.persistence.impl.nullpm;

import org.apache.activemq.artemis.core.persistence.StorageManager;

public class NullStorageAccessor {

public static void setNextID(StorageManager sm, long id) {
((NullStorageManager)sm).setNextId(id);
}

}
@@ -631,6 +631,27 @@
<variableName>HORNETQ-247</variableName>
</configuration>
</execution>

<execution>
<id>openwire-5.11</id>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:activemq-client:5.11.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>AMQ_5_11</variableName>
</configuration>
</execution>
</executions>
</plugin>

@@ -691,6 +712,11 @@
<name>ARTEMIS-JAKARTAEE</name>
<value>${ARTEMIS-JAKARTAEE}</value>
</property>
<property>
<name>AMQ_5_11</name>
<value>${AMQ_5_11}</value>
</property>
<variableName>AMQ_5_11</variableName>
</systemProperties>
<skipTests>${skipCompatibilityTests}</skipTests>
<argLine>${modular.jdk.surefire.arg} -Djgroups.bind_addr=::1 ${activemq-surefire-argline}</argLine>
@@ -39,6 +39,7 @@ public class GroovyRun {
public static final String TWO_TEN_ZERO = "ARTEMIS-2_10_0";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";
public static final String AMQ_5_11 = "AMQ_5_11";

public static Binding binding = new Binding();
public static GroovyShell shell = new GroovyShell(binding);
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import org.apache.activemq.artemis.tests.compatibility.GroovyRun

import javax.jms.Connection
import javax.jms.MessageConsumer
import java.util.Date;
import java.util.UUID;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;


final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
connection = cf.createConnection();
final int numberOfMessages = Integer.parseInt(arg[0])
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("Test");
final MessageConsumer consumer = session.createConsumer(queue)
connection.start();

for (int i = 0; i < numberOfMessages; i++) {
final TextMessage tm = (TextMessage)consumer.receive(1000);
GroovyRun.assertNotNull(tm)
GroovyRun.assertEquals("m" + i, tm.getText());
}
connection.close();
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import javax.jms.Connection
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;

{
final String url = "(tcp://localhost:61616)?ha=true&initialConnectAttempts=-1&reconnectAttempts=-1&retryInterval=1000&retryIntervalMultiplier=1.0"
final String queueName = "Test"
final int startMessages = Integer.parseInt(arg[0]);
final int numberOfMessages = Integer.parseInt(arg[1]);

Connection c;

try {
final ConnectionFactory cf = new ActiveMQJMSConnectionFactory(url);

c = cf.createConnection();

final Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue q = s.createQueue(queueName);

final MessageProducer p = s.createProducer(q);

for (int i = startMessages; i < numberOfMessages; i++) {
final TextMessage m = s.createTextMessage("m" + i);
p.send(m);
}
}
finally {
if (c != null) c.close();
}
}
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.compatibility;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageAccessor;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.AMQ_5_11;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;

/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
* cd /compatibility-tests
* mvn install -Ptests | tee output.log
*
* on the output.log you will see the output generated by {@link #getClasspath(String)}
*
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
* On Idea you would do the following:
*
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
public class OldOpenWireTest extends ClasspathBase {

EmbeddedActiveMQ server;

@Before
public void setServer() throws Throwable {

ConfigurationImpl configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);

server = new EmbeddedActiveMQ();
server.setConfiguration(configuration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo("Test").addRoutingType(RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(new QueueConfiguration("Test").setDurable(true).setRoutingType(RoutingType.ANYCAST));

server.getActiveMQServer().addAddressInfo(new AddressInfo("DLQ").addRoutingType(RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(new QueueConfiguration("DLQ").setDurable(true).setRoutingType(RoutingType.ANYCAST));

server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("DLQ")));
}

@After
public void shutdownServer() throws Throwable {
if (server != null) {
server.stop();
}
}

@Test
public void testIDOverflow() throws Throwable {
Queue queue = server.getActiveMQServer().locateQueue("Test");
Queue dlq = server.getActiveMQServer().locateQueue("DLQ");

NullStorageAccessor.setNextID(server.getActiveMQServer().getStorageManager(), Integer.MAX_VALUE);
evaluate(getClasspath(SNAPSHOT), "oldOpenWire/sendCore.groovy", "0", "10");
Wait.assertEquals(10L, queue::getMessageCount, 1000, 10);

NullStorageAccessor.setNextID(server.getActiveMQServer().getStorageManager(), Integer.MAX_VALUE * 2L);
evaluate(getClasspath(SNAPSHOT), "oldOpenWire/sendCore.groovy", "10", "20");

Wait.assertEquals(20L, queue::getMessageCount, 1000, 10);

evaluate(getClasspath(AMQ_5_11), "oldOpenWire/receiveOW.groovy", "20");

Wait.assertEquals(0L, queue::getMessageCount, 1000, 100);
Assert.assertEquals(0L, dlq.getMessageCount());
}

}

0 comments on commit bc17acd

Please sign in to comment.