Skip to content

Commit

Permalink
ARTEMIS-2135 Race condition on getProtonMessage / getHeader causing NPE
Browse files Browse the repository at this point in the history
Test was added at commit 48d8a54

I did not use cherry-pick from master as this is no longer an issue in master after the refactoring
done at ARTEMIS-2096.
  • Loading branch information
clebertsuconic committed Oct 17, 2018
1 parent e7d26d8 commit 48e0fc8
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
Expand Up @@ -141,7 +141,7 @@ public AMQPMessage(Message message) {
this(0, message);
}

public MessageImpl getProtonMessage() {
public synchronized MessageImpl getProtonMessage() {
if (protonMessage == null) {
protonMessage = (MessageImpl) Message.Factory.create();

Expand Down
Expand Up @@ -26,6 +26,9 @@
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
Expand Down Expand Up @@ -75,6 +78,64 @@ public void testVerySimple() {
assertEquals("someNiceLocal", decoded.getAddress());
}

@Test
public void testDecodeMultiThreaded() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));

final AtomicInteger failures = new AtomicInteger(0);


for (int testTry = 0; testTry < 100; testTry++) {
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
Thread[] threads = new Thread[100];

CountDownLatch latchAlign = new CountDownLatch(threads.length);
CountDownLatch go = new CountDownLatch(1);

Runnable run = new Runnable() {
@Override
public void run() {
try {

latchAlign.countDown();
go.await();

Assert.assertNotNull(decoded.getHeader());
// this is a method used by Core Converter
decoded.getProtonMessage();
Assert.assertNotNull(decoded.getHeader());

} catch (Throwable e) {
e.printStackTrace();
failures.incrementAndGet();
}
}
};

for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(run);
threads[i].start();
}

Assert.assertTrue(latchAlign.await(10, TimeUnit.SECONDS));
go.countDown();

for (Thread thread : threads) {
thread.join(5000);
Assert.assertFalse(thread.isAlive());
}

Assert.assertEquals(0, failures.get());
}
}

@Test
public void testApplicationPropertiesReencode() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Expand Down

0 comments on commit 48e0fc8

Please sign in to comment.