From b3cf6badab193c79078054b4829804a273afc11a Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Mon, 16 Apr 2018 15:46:22 +0200 Subject: [PATCH] ARTEMIS-1812 fix for missing (core) messages after master kill (HA) --- .../core/ServerSessionPacketHandler.java | 3 ++ .../cluster/failover/FailoverTest.java | 50 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index edfd566c15b..35ebbf6355a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -658,6 +658,9 @@ private void onSessionSend(Packet packet) { try { final SessionSendMessage message = (SessionSendMessage) packet; requiresResponse = message.isRequiresResponse(); + if (this.session.getMatchingQueue(message.getMessage().getAddressSimpleString(), RoutingType.ANYCAST) == null || this.session.getMatchingQueue(message.getMessage().getAddressSimpleString(), RoutingType.MULTICAST) == null) { + this.session.createQueue(message.getMessage().getAddressSimpleString(), message.getMessage().getAddressSimpleString(), RoutingType.ANYCAST, new SimpleString(""), false, true); + } this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct); if (requiresResponse) { response = new NullResponseMessage(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index c6ec6dd8776..a52e7f2a2b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; import org.apache.activemq.artemis.core.server.files.FileMoveManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -1641,6 +1642,55 @@ public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception { receiveDurableMessages(consumer); } + @Test(timeout = 120000) + public void testLiveKilledBackupReceivesCoreMessage() throws Exception { + createSessionFactory(); + SimpleString lalaQ = new SimpleString("lalaQ"); + + ClientSession session = createSession(sf, true, true); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoDeleteQueues(true).setAutoCreateQueues(true); + backupServer.getServer().getConfiguration().addAddressesSetting(lalaQ.toString(), addressSettings); + + session.createQueue(lalaQ, RoutingType.MULTICAST, lalaQ, null, true); + + ClientProducer producer = session.createProducer(lalaQ); + + producer.send(createMessage(session, 0, true)); + producer.send(createMessage(session, 1, true)); + + ClientConsumer consumer = session.createConsumer(lalaQ, true); + + session.start(); + + ClientMessage message = consumer.receive(1000); + Assert.assertNotNull(message); + + crash(session); + + message = consumer.receive(1000); + Assert.assertNotNull(message); + message.acknowledge(); + + message = consumer.receive(1000); + Assert.assertNotNull(message); + message.acknowledge(); + + message = consumer.receive(1000); + Assert.assertNull(message); + + + //queue should be destroyed after consuming all messages + backupServer.getServer().getActiveMQServerControl().destroyQueue(lalaQ.toString(), true); + + assertNull(backupServer.getServer().locateQueue(lalaQ)); + + producer.send(createMessage(session, 0, true)); + + assertNotNull(backupServer.getServer().locateQueue(lalaQ)); + } + protected void receiveDurableMessages(ClientConsumer consumer) throws ActiveMQException { // During failover non-persistent messages may disappear but in certain cases they may survive. // For that reason the test is validating all the messages but being permissive with non-persistent messages