Skip to content

Commit

Permalink
ARTEMIS-2743 Synchronize JMS connection methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Apr 29, 2020
1 parent d0bc946 commit 803ccf7
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public ActiveMQConnection(final ConnectionFactoryOptions options,
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
* mandates createSession to return a XASession.
*/
public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();

return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION);
Expand All @@ -206,7 +206,7 @@ public Session createNonXASession(final boolean transacted, final int acknowledg
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
* mandates createSession to return a XASession.
*/
public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();

return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION);
Expand All @@ -220,7 +220,7 @@ public Session createNonXATopicSession(final boolean transacted, final int ackno
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
* mandates createSession to return a XASession.
*/
public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();

return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION);
Expand Down Expand Up @@ -432,22 +432,22 @@ public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
}

@Override
public Session createSession(int sessionMode) throws JMSException {
public synchronized Session createSession(int sessionMode) throws JMSException {
checkClosed();
return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION);

}

@Override
public Session createSession() throws JMSException {
public synchronized Session createSession() throws JMSException {
checkClosed();
return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION);
}

// QueueConnection implementation ---------------------------------------------------------------

@Override
public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException {
public synchronized QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION);
}
Expand Down Expand Up @@ -477,7 +477,7 @@ public ConnectionConsumer createConnectionConsumer(final Queue queue,
// TopicConnection implementation ---------------------------------------------------------------

@Override
public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ public ActiveMQXAConnection(final ConnectionFactoryOptions options,
}

@Override
public XASession createXASession() throws JMSException {
public synchronized XASession createXASession() throws JMSException {
checkClosed();
return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION);
}

@Override
public XAQueueSession createXAQueueSession() throws JMSException {
public synchronized XAQueueSession createXAQueueSession() throws JMSException {
checkClosed();
return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION);

}

@Override
public XATopicSession createXATopicSession() throws JMSException {
public synchronized XATopicSession createXATopicSession() throws JMSException {
checkClosed();
return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
Expand All @@ -31,15 +32,23 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class ConnectionTest extends JMSTestBase {

private static final Logger log = Logger.getLogger(ConnectionTest.class);

private Connection conn2;

@Test
Expand Down Expand Up @@ -248,6 +257,55 @@ private void testCreateConnection(ConnectionFactory fact) throws Exception {
}
}

@Test
public void testCreateSessionAndCloseConnectionConcurrently() throws Exception {
final int ATTEMPTS = 10;
final int THREAD_COUNT = 50;
final int SESSION_COUNT = 10;
final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

for (int i = 0; i < ATTEMPTS; i++) {
final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT);
final AtomicBoolean error = new AtomicBoolean(false);
final Connection connection = cf.createConnection();

for (int j = 0; j < THREAD_COUNT; ++j) {
executor.execute(() -> {
for (int k = 0; k < SESSION_COUNT; k++) {
try {
connection.createSession().close();
if (k == 0) {
lineUp.countDown();
}
} catch (javax.jms.IllegalStateException e) {
// ignore
break;
} catch (JMSException e) {
// ignore
break;
} catch (Throwable t) {
log.warn(t.getMessage(), t);
error.set(true);
break;
}
}
});
}

// wait until all the threads have created & closed at least 1 session
assertTrue(lineUp.await(10, TimeUnit.SECONDS));
connection.close();
if (error.get()) {
assertFalse(error.get());
}
}
} finally {
executor.shutdownNow();
}
}

@Override
@After
public void tearDown() throws Exception {
Expand Down

0 comments on commit 803ccf7

Please sign in to comment.