Skip to content
Permalink
Browse files
ARTEMIS-3791 Openwire failover leaving sessions leaked
  • Loading branch information
clebertsuconic committed Apr 21, 2022
1 parent 857f9f1 commit d1e1faacc4b0b9c1abf8b495739e7ccb75f0b10a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 14 deletions.
@@ -846,7 +846,6 @@ public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info)
context.setClientMaster(info.isClientMaster());
context.setFaultTolerant(info.isFaultTolerant());
context.setReconnect(true);
context.incRefCount();
}

/**
@@ -229,12 +229,11 @@ public void removeConnection(ConnectionInfo info, Throwable error) throws Invali
synchronized (clientIdSet) {
String clientId = info.getClientId();
if (clientId != null) {
AMQConnectionContext context = this.clientIdSet.get(clientId);
if (context != null && context.decRefCount() == 0) {
AMQConnectionContext context = this.clientIdSet.remove(clientId);
if (context != null) {
//connection is still there and need to close
context.getConnection().disconnect(error != null);
this.connections.remove(context.getConnection());
this.clientIdSet.remove(clientId);
}
} else {
throw new InvalidClientIDException("No clientID specified for connection disconnect request");
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
@@ -49,7 +48,6 @@ public class AMQConnectionContext {
private boolean clientMaster = true;
private ConnectionState connectionState;
private XATransactionId xid;
private AtomicInteger refCount = new AtomicInteger(1);
private Command lastCommand;

public AMQConnectionContext() {
@@ -252,14 +250,6 @@ public boolean isAllowLinkStealing() {
return false;
}

public void incRefCount() {
refCount.incrementAndGet();
}

public int decRefCount() {
return refCount.decrementAndGet();
}

public void setLastCommand(Command lastCommand) {
this.lastCommand = lastCommand;
}
@@ -24,8 +24,13 @@
import javax.jms.Session;
import javax.jms.TextMessage;

import io.netty.channel.ChannelFuture;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;

@@ -54,6 +59,48 @@ public void testReconnectOnFailoverWithClientID() throws Exception {
} finally {
connection.close();
}
Wait.assertEquals(0, () -> server.getSessions().size());
}

// I was trying to reproduce ARTEMIS-3791 where sessions leaked after openwire reconnects.
// even though I was not able to reproduce the issue after many tries
// I am still keeping the test to make sure I am not breaking anything
@Test
public void testReconnectPacket() throws Exception {
ConnectionFactory failoverFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
ActiveMQConnection connection = (ActiveMQConnection)failoverFactory.createConnection();


try {
ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue tempQueue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(tempQueue);
server.getRemotingService().getConnections().forEach(r -> {
NettyConnection nettyConnection = (NettyConnection) r.getTransportConnection();
ChannelFuture future = nettyConnection.getChannel().close();
try {
while (!future.isDone()) {
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
}

});
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("hello"));
}
connection.start();
MessageConsumer consumer = session.createConsumer(tempQueue);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("hello", message.getText());
}
} finally {
connection.close();
}
Wait.assertEquals(0, () -> server.getSessions().size());
}

}

0 comments on commit d1e1faa

Please sign in to comment.