From 1cdf64da0449195c47ab252311da5998bceec2aa Mon Sep 17 00:00:00 2001 From: Benjamin Graf Date: Thu, 14 Jun 2018 20:34:04 +0200 Subject: [PATCH] ARTEMIS-1935: Close of connection closes all open sessions --- .../protocol/openwire/OpenWireConnection.java | 14 +++++--------- .../openwire/SessionHandlingOpenWireTest.java | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 225aac4aeea..bafce92cb9b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -183,12 +183,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se */ private ServerSession internalSession; - /** - * Used for proper closing of internal sessions like OpenWire advisory - * session at disconnect. - */ - private final Set internalSessionIds = new ConcurrentHashSet<>(); - private final OperationContext operationContext; private static final AtomicLongFieldUpdater LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, "lastSent"); @@ -616,8 +610,11 @@ private void disconnect(ActiveMQException me, String reason, boolean fail) { state.shutdown(); try { - for (SessionId sessionId : internalSessionIds) { - sessions.get(sessionId).close(); + for (SessionId sessionId : sessionIdMap.values()) { + AMQSession session = sessions.get(sessionId); + if (session != null) { + session.close(); + } } internalSession.close(false); } catch (Exception e) { @@ -993,7 +990,6 @@ public void onSlowConsumer(ServerConsumer consumer) { public void addSessions(Set sessionSet) { for (SessionId sid : sessionSet) { addSession(getState().getSessionState(sid).getInfo(), true); - internalSessionIds.add(sid); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java index 9c476a9f885..989b62d5f04 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java @@ -56,4 +56,19 @@ public void testInternalSessionHandling() throws Exception { assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session")); assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session")); } + + @Test + public void testInternalSessionHandlingNoSessionClose() throws Exception { + try (Connection conn = factory.createConnection()) { + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = createDestination(session,ActiveMQDestination.QUEUE_TYPE); + sendMessages(session, dest, 1); + MessageConsumer consumer = session.createConsumer(dest); + Message m = consumer.receive(2000); + assertNotNull(m); + } + assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session")); + assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session")); + } }