From 86a81d315d9794e491221a258561abcddae38ffb Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 4 Apr 2016 18:36:36 -0400 Subject: [PATCH 1/5] ARTEMIS-465 Changing Byteman race on test --- ...nSyncLargeMessageOverReplication2Test.java | 52 +++++++------------ 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java index 95bd0284ee6..417f98c5151 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -53,7 +54,6 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase public static int messageChunkCount = 0; - private static final ReusableLatch ruleFired = new ReusableLatch(1); private static ActiveMQServer backupServer; private static ActiveMQServer liveServer; @@ -68,16 +68,12 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase Configuration liveConfig; // To inform the main thread the condition is met - static final ReusableLatch flagArrived = new ReusableLatch(1); + static final ReusableLatch flagChunkEntered = new ReusableLatch(1); // To wait while the condition is worked out - static final ReusableLatch flagWait = new ReusableLatch(1); - - static final ReusableLatch flag15Arrived = new ReusableLatch(1); - // To wait while the condition is worked out - static final ReusableLatch flag15Wait = new ReusableLatch(1); + static final ReusableLatch flagChunkWait = new ReusableLatch(1); // To inform the main thread the condition is met - static final ReusableLatch flagSyncArrived = new ReusableLatch(1); + static final ReusableLatch flagSyncEntered = new ReusableLatch(1); // To wait while the condition is worked out static final ReusableLatch flagSyncWait = new ReusableLatch(1); @@ -88,13 +84,12 @@ public void setUp() throws Exception { System.out.println("Tmp::" + getTemporaryDir()); - flagArrived.setCount(1); - flagWait.setCount(1); + flagChunkEntered.setCount(1); + flagChunkWait.setCount(1); - flag15Arrived.setCount(1); - flag15Wait.setCount(1); + flagSyncEntered.setCount(1); + flagSyncWait.setCount(1); - ruleFired.setCount(1); messageChunkCount = 0; TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); @@ -188,7 +183,6 @@ public void failoverEvent(FailoverEventType eventType) { final MapMessage message = createLargeMessage(); t = new Thread() { - @Override public void run() { try { producer.send(message); @@ -206,26 +200,24 @@ public void run() { // I'm trying to simulate the following race here: // The message is syncing while the client is already sending the body of the message - Assert.assertTrue(flagArrived.await(10, TimeUnit.SECONDS)); + Assert.assertTrue(flagChunkEntered.await(10, TimeUnit.SECONDS)); startBackup(); - Assert.assertTrue(flagSyncArrived.await(10, TimeUnit.SECONDS)); - - flagWait.countDown(); + Assert.assertTrue(flagSyncEntered.await(10, TimeUnit.SECONDS)); - Assert.assertTrue(flag15Arrived.await(10, TimeUnit.SECONDS)); - - flag15Wait.countDown(); + flagChunkWait.countDown(); t.join(5000); - flagSyncWait.countDown(); - System.out.println("Thread joined"); Assert.assertFalse(t.isAlive()); + flagSyncWait.countDown(); + + Assert.assertTrue(((SharedNothingBackupActivation)backupServer.getActivation()).waitForBackupSync(10, TimeUnit.SECONDS)); + waitForRemoteBackup(connection.getSessionFactory(), 30); @@ -253,8 +245,8 @@ public void run() { public static void syncLargeMessage() { try { - flagSyncArrived.countDown(); - flagSyncWait.await(10, TimeUnit.SECONDS); + flagSyncEntered.countDown(); + flagSyncWait.await(100, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); @@ -266,13 +258,9 @@ public static void messageChunkSent() { messageChunkCount++; try { - if (messageChunkCount == 10) { - flagArrived.countDown(); - flagWait.await(10, TimeUnit.SECONDS); - } - if (messageChunkCount == 15) { - flag15Arrived.countDown(); - flag15Wait.await(10, TimeUnit.SECONDS); + if (messageChunkCount == 1) { + flagChunkEntered.countDown(); + flagChunkWait.await(10, TimeUnit.SECONDS); } } catch (Exception e) { From 2e894554ca8d0e518f254b22ff3f4cccc9a59475 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 7 Apr 2016 15:53:52 -0400 Subject: [PATCH 2/5] ARTEMIS-474 fixing page.close() deadlock with replica --- .../paging/cursor/impl/PageCursorProviderImpl.java | 6 +++--- .../apache/activemq/artemis/core/paging/impl/Page.java | 10 ++++++++-- .../artemis/core/paging/impl/PagingStoreImpl.java | 8 ++++---- .../artemis/core/replication/ReplicationEndpoint.java | 4 ++-- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 5f5e1b32e74..7dad12be829 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -174,7 +174,7 @@ private void readPage(int pageId, PageCache cache) throws Exception { finally { try { if (page != null) { - page.close(); + page.close(false); } } catch (Throwable ignored) { @@ -448,14 +448,14 @@ public void cleanup() { } finally { try { - depagedPage.close(); + depagedPage.close(false); } catch (Exception e) { } storageManager.afterPageRead(); } - depagedPage.close(); + depagedPage.close(false); pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index b2373c92ada..07579a29d77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -216,8 +216,14 @@ public void open() throws Exception { file.position(0); } - public synchronized void close() throws Exception { - if (storageManager != null) { + public void close() throws Exception { + close(false); + } + + /** sendEvent means it's a close happening from a major event such moveNext. + * While reading the cache we don't need (and shouldn't inform the backup */ + public synchronized void close(boolean sendEvent) throws Exception { + if (sendEvent && storageManager != null) { storageManager.pageClosed(storeName, pageId); } if (pageCache != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 9a0af15d9bf..85a2dc2b2b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -348,7 +348,7 @@ public synchronized void stop() throws Exception { flushExecutors(); if (currentPage != null) { - currentPage.close(); + currentPage.close(false); currentPage = null; } } @@ -390,7 +390,7 @@ public void start() throws Exception { currentPageId = 0; if (currentPage != null) { - currentPage.close(); + currentPage.close(false); } currentPage = null; @@ -589,7 +589,7 @@ public Page depage() throws Exception { } returnPage = currentPage; - returnPage.close(); + returnPage.close(false); currentPage = null; // The current page is empty... which means we reached the end of the pages @@ -1021,7 +1021,7 @@ private void openNewPage() throws Exception { int tmpCurrentPageId = currentPageId + 1; if (currentPage != null) { - currentPage.close(); + currentPage.close(true); } currentPage = createPage(tmpCurrentPageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index dabe9fd7009..7cf54501305 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -306,7 +306,7 @@ public synchronized void stop() throws Exception { for (Page page : map.values()) { try { page.sync(); - page.close(); + page.close(false); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorClosingPageOnReplication(e); @@ -698,7 +698,7 @@ private void handlePageEvent(final ReplicationPageEventMessage packet) throws Ex } } else { - page.close(); + page.close(false); } } From 3ecd8b7c44934838ad1176b7ac240b7b3ef8f957 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 6 Apr 2016 12:58:50 -0400 Subject: [PATCH 3/5] ARTEMIS-474 Avoiding one lock around the readyListener call tree and fixing ReplicationManager / NettyConnection deadlock --- .../remoting/impl/netty/NettyConnection.java | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 3f102279630..69478832a5a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -106,25 +106,41 @@ public void setAutoRead(boolean autoRead) { } @Override - public synchronized boolean isWritable(ReadyListener callback) { - if (!ready) { - readyListeners.push(callback); - } + public boolean isWritable(ReadyListener callback) { + synchronized (readyListeners) { + if (!ready) { + readyListeners.push(callback); + } - return ready; + return ready; + } } @Override - public synchronized void fireReady(final boolean ready) { - this.ready = ready; - - if (ready) { - for (;;) { - ReadyListener readyListener = readyListeners.poll(); - if (readyListener == null) { - return; + public void fireReady(final boolean ready) { + LinkedList readyToCall = null; + synchronized (readyListeners) { + this.ready = ready; + + if (ready) { + for (;;) { + ReadyListener readyListener = readyListeners.poll(); + if (readyListener == null) { + break; + } + + + if (readyToCall == null) { + readyToCall = new LinkedList<>(); + } + + readyToCall.add(readyListener); } + } + } + if (readyToCall != null) { + for (ReadyListener readyListener : readyToCall) { try { readyListener.readyForWriting(); } From d6c7e30594cd6620ad05cbbd247421e07793ee77 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 14 Apr 2016 13:24:46 -0400 Subject: [PATCH 4/5] ARTEMIS-484 Large Message Loss on Initial replication https://issues.apache.org/jira/browse/ARTEMIS-484 The File copy after the initial synchronization on large messages was broken. On this commit we fix how the buffer is cleaned up before each read since a previously unfinished body read would make the buffer dirty. I'm keeping also lots of Traces I have added to debug this issue, so they will be useful if anything like this happens again. --- .../core/client/impl/ClientMessageImpl.java | 2 +- .../jms/client/ActiveMQJMSClientLogger.java | 8 +- .../jms/client/ActiveMQMessageConsumer.java | 14 +- .../jms/client/ActiveMQQueueBrowser.java | 2 +- .../jms/client/JMSMessageListenerWrapper.java | 2 +- .../core/io/aio/AIOSequentialFile.java | 10 +- .../artemis/core/io/util/FileIOUtil.java | 84 +++++++++ .../artemis/core/io/aio/FileIOUtilTest.java | 87 +++++++++ .../journal/LargeServerMessageInSync.java | 51 ++++-- .../ReplicationLargeMessageBeginMessage.java | 7 + .../ReplicationLargeMessageEndMessage.java | 7 + .../ReplicationLargeMessageWriteMessage.java | 8 + .../core/replication/ReplicationEndpoint.java | 172 ++++++++---------- .../core/replication/ReplicationManager.java | 12 +- 14 files changed, 344 insertions(+), 122 deletions(-) create mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java create mode 100644 artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 31d9aad3893..926ac1bd3be 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -141,7 +141,7 @@ public int getBodySize() { @Override public String toString() { - return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; + return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; } @Override diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java index 7eb56cd1dd2..3f116969c26 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java @@ -58,12 +58,12 @@ public interface ActiveMQJMSClientLogger extends BasicLogger { void errorCallingExcListener(@Cause Exception e); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124002, value = "Queue Browser failed to create message", format = Message.Format.MESSAGE_FORMAT) - void errorCreatingMessage(@Cause Throwable e); + @Message(id = 124002, value = "Queue Browser failed to create message {0}", format = Message.Format.MESSAGE_FORMAT) + void errorCreatingMessage(String messageToString, @Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124003, value = "Message Listener failed to prepare message for receipt", format = Message.Format.MESSAGE_FORMAT) - void errorPreparingMessageForReceipt(@Cause Throwable e); + @Message(id = 124003, value = "Message Listener failed to prepare message for receipt, message={0}", format = Message.Format.MESSAGE_FORMAT) + void errorPreparingMessageForReceipt(String messagetoString, @Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 124004, value = "Message Listener failed to process message", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index 3f47209910e..04e4f41c079 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; /** * ActiveMQ Artemis implementation of a JMS MessageConsumer. @@ -211,7 +212,18 @@ private ActiveMQMessage getMessage(final long timeout, final boolean noWait) thr boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE; jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null); - jmsMsg.doBeforeReceive(); + try { + jmsMsg.doBeforeReceive(); + } + catch (IndexOutOfBoundsException ioob) { + // In case this exception happen you will need to know where it happened. + // it has been a bug here in the past, and this was used to debug it. + // nothing better than keep it for future investigations in case it happened again + IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage()); + newIOOB.initCause(ioob); + ActiveMQClientLogger.LOGGER.warn(newIOOB.getMessage(), newIOOB); + throw ioob; + } // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered // https://issues.jboss.org/browse/JBPAPP-6110 diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java index 5022fcd90ad..4cf34ea3b4c 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java @@ -141,7 +141,7 @@ public ActiveMQMessage nextElement() { msg.doBeforeReceive(); } catch (Exception e) { - ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e); + ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(msg.getCoreMessage().toString(), e); return null; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java index 0d831f0f1a7..ab62dbc87b2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java @@ -73,7 +73,7 @@ public void onMessage(final ClientMessage message) { msg.doBeforeReceive(); } catch (Exception e) { - ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(e); + ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e); return; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index efeeb2e28d2..1a109cb4dfd 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.jlibaio.LibaioFile; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ReusableLatch; public class AIOSequentialFile extends AbstractSequentialFile { @@ -202,7 +203,14 @@ public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Excep */ @Override public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { - checkOpened(); + try { + checkOpened(); + } + catch (Exception e) { + ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); + callback.onError(-1, e.getMessage()); + return; + } final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java new file mode 100644 index 00000000000..f9d3ab7b6de --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.jboss.logging.Logger; + +public class FileIOUtil { + + private static final Logger logger = Logger.getLogger(Logger.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + public static void copyData(SequentialFile from, SequentialFile to, ByteBuffer buffer) throws Exception { + + boolean fromIsOpen = from.isOpen(); + boolean toIsOpen = to.isOpen(); + + from.close(); + from.open(); + + if (!toIsOpen) { + to.open(); + } + + to.position(to.size()); + + from.position(0); + + try { + for (;;) { + // The buffer is reused... + // We need to make sure we clear the limits and the buffer before reusing it + buffer.clear(); + int bytesRead = from.read(buffer); + + if (isTrace) { + logger.trace("appending " + bytesRead + " bytes on " + to.getFileName()); + } + + if (bytesRead > 0) { + to.writeDirect(buffer, false); + } + + if (bytesRead < buffer.capacity()) { + logger.trace("Interrupting reading as the whole thing was sent on " + to.getFileName()); + break; + } + } + } + finally { + if (!fromIsOpen) { + from.close(); + } + else { + from.position(from.size()); + } + if (!toIsOpen) { + to.close(); + } + else { + to.position(to.size()); + } + } + + } + +} diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java new file mode 100644 index 00000000000..0e3d7d7a2e7 --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io.aio; + +import java.io.File; +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.util.FileIOUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class FileIOUtilTest { + + @Rule + public TemporaryFolder temporaryFolder; + + public FileIOUtilTest() { + File parent = new File("./target"); + parent.mkdirs(); + temporaryFolder = new TemporaryFolder(parent); + } + + @Test + public void testCopy() throws Exception { + System.out.println("Data at " + temporaryFolder.getRoot()); + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + + ByteBuffer buffer = ByteBuffer.allocate(204800); + buffer.put(new byte[204800]); + buffer.rewind(); + file.writeDirect(buffer, true); + + buffer = ByteBuffer.allocate(409605); + buffer.put(new byte[409605]); + buffer.rewind(); + + SequentialFile file2 = factory.createSequentialFile("file2.bin"); + + file2.open(); + file2.writeDirect(buffer, true); + + + // This is allocating a reusable buffer to perform the copy, just like it's used within LargeMessageInSync + buffer = ByteBuffer.allocate(4 * 1024); + + SequentialFile newFile = factory.createSequentialFile("file1.cop"); + FileIOUtil.copyData(file, newFile, buffer); + + SequentialFile newFile2 = factory.createSequentialFile("file2.cop"); + FileIOUtil.copyData(file2, newFile2, buffer); + + Assert.assertEquals(file.size(), newFile.size()); + Assert.assertEquals(file2.size(), newFile2.size()); + + newFile.close(); + newFile2.close(); + file.close(); + file2.close(); + + System.out.println("Test result::"); + + } + + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index f2644150520..274abeb994e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -21,14 +21,19 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.util.FileIOUtil; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.jboss.logging.Logger; public final class LargeServerMessageInSync implements ReplicatedLargeMessage { + private static final Logger logger = Logger.getLogger(LargeServerMessageInSync.class); + private static final boolean isTrace = logger.isTraceEnabled(); + private final LargeServerMessage mainLM; private final StorageManager storageManager; private SequentialFile appendFile; @@ -50,20 +55,33 @@ public synchronized void joinSyncedData(ByteBuffer buffer) throws Exception { if (!mainSeqFile.isOpen()) { mainSeqFile.open(); } - if (appendFile != null) { - appendFile.close(); - appendFile.open(); - for (;;) { - buffer.rewind(); - int bytesRead = appendFile.read(buffer); - if (bytesRead > 0) - mainSeqFile.writeDirect(buffer, false); - if (bytesRead < buffer.capacity()) { - break; + + try { + if (appendFile != null) { + if (isTrace) { + logger.trace("joinSyncedData on " + mainLM + ", currentSize on mainMessage=" + mainSeqFile.size() + ", appendFile size = " + appendFile.size()); + } + + FileIOUtil.copyData(appendFile, mainSeqFile, buffer); + deleteAppendFile(); + } + else { + if (isTrace) { + logger.trace("joinSyncedData, appendFile is null, ignoring joinSyncedData on " + mainLM); } } - deleteAppendFile(); } + catch (Throwable e) { + logger.warn("Error while sincing data on largeMessageInSync::" + mainLM); + } + + + if (isTrace) { + logger.trace("joinedSyncData on " + mainLM + " finished with " + mainSeqFile.size()); + } + + + syncDone = true; } @@ -85,6 +103,9 @@ public synchronized Message setMessageID(long id) { @Override public synchronized void releaseResources() { + if (isTrace) { + logger.warn("release resources called on " + mainLM, new Exception("trace")); + } mainLM.releaseResources(); if (appendFile != null && appendFile.isOpen()) { try { @@ -122,11 +143,19 @@ private void deleteAppendFile() throws Exception { public synchronized void addBytes(byte[] bytes) throws Exception { if (deleted) return; + if (syncDone) { + if (isTrace) { + logger.trace("Adding " + bytes.length + " towards sync message::" + mainLM); + } mainLM.addBytes(bytes); return; } + if (isTrace) { + logger.trace("addBytes(bytes.length=" + bytes.length + ") on message=" + mainLM); + } + if (appendFile == null) { appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(), LargeMessageExtension.SYNC); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java index 0a36a564713..20af68c82ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java @@ -57,6 +57,13 @@ public int hashCode() { return result; } + @Override + public String toString() { + return "ReplicationLargeMessageBeginMessage{" + + "messageId=" + messageId + + '}'; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index eea788a0d33..bb779292cb5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -57,6 +57,13 @@ public int hashCode() { return result; } + @Override + public String toString() { + return "ReplicationLargeMessageEndMessage{" + + "messageId=" + messageId + + '}'; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java index 0970f0594ae..f60c62994b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java @@ -80,6 +80,14 @@ public int hashCode() { return result; } + @Override + public String toString() { + return "ReplicationLargeMessageWriteMessage{" + + "messageId=" + messageId + + ", body.size=" + body.length + + '}'; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 7cf54501305..3cd5bfd02f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; +import org.jboss.logging.Logger; /** * Handles all the synchronization necessary for replication on the backup side (that is the @@ -87,7 +88,8 @@ */ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent { - private static final boolean trace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); + private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class); + private static final boolean isTrace = logger.isTraceEnabled(); private final IOCriticalErrorListener criticalErrorListener; private final ActiveMQServerImpl server; @@ -153,11 +155,18 @@ public synchronized void registerJournal(final byte id, final Journal journal) { @Override public void handlePacket(final Packet packet) { + if (isTrace) { + logger.trace("handlePacket::handling " + packet); + } PacketImpl response = new ReplicationResponseMessage(); final byte type = packet.getType(); try { if (!started) { + if (isTrace) { + logger.trace("handlePacket::ignoring " + packet); + } + return; } @@ -340,56 +349,10 @@ public void setChannel(final Channel channel) { this.channel = channel; } - public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws ActiveMQException { - if (!activation.isRemoteBackupUpToDate()) { - throw ActiveMQMessageBundle.BUNDLE.journalsNotInSync(); - } - - if (journalLoadInformation == null || journalLoadInformation.length != journalInformation.length) { - throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals(); - } - - for (int i = 0; i < journalInformation.length; i++) { - if (!journalInformation[i].equals(journalLoadInformation[i])) { - ActiveMQServerLogger.LOGGER.journalcomparisonMismatch(journalParametersToString(journalInformation)); - throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals(); - } - } - - } - - /** - * Used on tests only. To simulate missing page deletes - */ - public void setDeletePages(final boolean deletePages) { - this.deletePages = deletePages; - } - - /** - * @param journalInformation - */ - private String journalParametersToString(final JournalLoadInformation[] journalInformation) { - return "**********************************************************\n" + "parameters:\n" + - "BindingsImpl = " + - journalInformation[0] + - "\n" + - "Messaging = " + - journalInformation[1] + - "\n" + - "**********************************************************" + - "\n" + - "Expected:" + - "\n" + - "BindingsImpl = " + - journalLoadInformation[0] + - "\n" + - "Messaging = " + - journalLoadInformation[1] + - "\n" + - "**********************************************************"; - } - private void finishSynchronization(String liveID) throws Exception { + if (isTrace) { + logger.trace("finishSynchronization::" + liveID); + } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); journal.synchronizationLock(); @@ -427,7 +390,7 @@ private void finishSynchronization(String liveID) throws Exception { * @param msg * @throws Exception */ - private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { + private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { Long id = Long.valueOf(msg.getId()); byte[] data = msg.getData(); SequentialFile channel1; @@ -462,7 +425,6 @@ private synchronized void handleReplicationSynchronization(ReplicationSyncFileMe } if (data == null) { - channel1.close(); return; } @@ -477,69 +439,73 @@ private synchronized void handleReplicationSynchronization(ReplicationSyncFileMe * {@link FileWrapperJournal} in place to store messages while synchronization is going on. * * @param packet - * @throws Exception * @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise - * return an empty response + * return an empty response + * @throws Exception */ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { - ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (activation.isRemoteBackupUpToDate()) { - throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate(); - } - synchronized (this) { - if (!started) - return replicationResponseMessage; + if (isTrace) { + logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); + } + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); + if (!started) + return replicationResponseMessage; - if (packet.isSynchronizationFinished()) { - finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); - return replicationResponseMessage; - } + if (packet.isSynchronizationFinished()) { + finishSynchronization(packet.getNodeID()); + replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + return replicationResponseMessage; + } - switch (packet.getDataType()) { - case LargeMessages: - for (long msgID : packet.getFileIds()) { - createLargeMessage(msgID, true); - } - break; - case JournalBindings: - case JournalMessages: - if (wantedFailBack && !packet.isServerToFailBack()) { - ActiveMQServerLogger.LOGGER.autoFailBackDenied(); - } + switch (packet.getDataType()) { + case LargeMessages: + for (long msgID : packet.getFileIds()) { + createLargeMessage(msgID, true); + } + break; + case JournalBindings: + case JournalMessages: + if (wantedFailBack && !packet.isServerToFailBack()) { + ActiveMQServerLogger.LOGGER.autoFailBackDenied(); + } - final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType()); - final Journal journal = journalsHolder.get(journalContent); + final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType()); + final Journal journal = journalsHolder.get(journalContent); - if (packet.getNodeID() != null) { - // At the start of replication, we still do not know which is the nodeID that the live uses. - // This is the point where the backup gets this information. - backupQuorum.liveIDSet(packet.getNodeID()); - } - Map mapToFill = filesReservedForSync.get(journalContent); + if (packet.getNodeID() != null) { + // At the start of replication, we still do not know which is the nodeID that the live uses. + // This is the point where the backup gets this information. + backupQuorum.liveIDSet(packet.getNodeID()); + } + Map mapToFill = filesReservedForSync.get(journalContent); - for (Entry entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) { - mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue())); - } - FileWrapperJournal syncJournal = new FileWrapperJournal(journal); - registerJournal(journalContent.typeByte, syncJournal); - break; - default: - throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); - } + for (Entry entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) { + mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue())); + } + FileWrapperJournal syncJournal = new FileWrapperJournal(journal); + registerJournal(journalContent.typeByte, syncJournal); + break; + default: + throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); } return replicationResponseMessage; } private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { + if (isTrace) { + logger.trace("handleLargeMessageEnd on " + packet.getMessageId()); + } final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); if (message != null) { executor.execute(new Runnable() { @Override public void run() { try { + if (isTrace) { + logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd"); + } message.deleteFile(); } catch (Exception e) { @@ -560,7 +526,9 @@ private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage p } } - private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) { + private ReplicatedLargeMessage lookupLargeMessage(final long messageId, + final boolean delete, + final boolean createIfNotExists) { ReplicatedLargeMessage message; if (delete) { @@ -590,7 +558,9 @@ private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final bo private void handleLargeMessageBegin(final ReplicationLargeMessageBeginMessage packet) { final long id = packet.getMessageId(); createLargeMessage(id, false); - ActiveMQServerLogger.LOGGER.trace("Receiving Large Message " + id + " on backup"); + if (isTrace) { + logger.trace("Receiving Large Message Begin " + id + " on backup"); + } } private void createLargeMessage(final long id, boolean liveToBackupSync) { @@ -666,14 +636,14 @@ private void handleAppendAddTXRecord(final ReplicationAddTXMessage packet) throw private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception { Journal journalToUse = getJournal(packet.getJournalID()); if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) { - if (ReplicationEndpoint.trace) { - ActiveMQServerLogger.LOGGER.trace("Endpoint appendUpdate id = " + packet.getId()); + if (isTrace) { + logger.trace("Endpoint appendUpdate id = " + packet.getId()); } journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); } else { - if (ReplicationEndpoint.trace) { - ActiveMQServerLogger.LOGGER.trace("Endpoint append id = " + packet.getId()); + if (isTrace) { + logger.trace("Endpoint append id = " + packet.getId()); } journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); } @@ -807,7 +777,7 @@ public String toString() { * * @param backupQuorum */ - public synchronized void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) { + public void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) { this.backupQuorum = backupQuorum; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 4aabbeae071..4081dd99dc5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -70,6 +70,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; /** * Manages replication tasks on the live server (that is the live server side of a "remote backup" @@ -81,6 +82,10 @@ */ public final class ReplicationManager implements ActiveMQComponent, ReadyListener { + + Logger logger = Logger.getLogger(ReplicationManager.class); + final boolean isTrace = logger.isTraceEnabled(); + public enum ADD_OPERATION_TYPE { UPDATE { @Override @@ -330,7 +335,7 @@ private OperationContext sendReplicatePacket(final Packet packet) { return sendReplicatePacket(packet, true); } - private synchronized OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { + private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { if (!enabled) return null; boolean runItNow = false; @@ -578,6 +583,11 @@ public void sendStartSyncMessage(JournalFile[] datafiles, */ public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) { if (enabled) { + + if (isTrace) { + logger.trace("sendSynchronizationDone ::" + nodeID + ", " + initialReplicationSyncTimeout); + } + synchronizationIsFinishedAcknowledgement.countUp(); sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); try { From 630db2d69c1de2a497169cb601390b55cd1cdd19 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 14 Apr 2016 17:51:18 -0400 Subject: [PATCH 5/5] ARTEMIS-474 Clustering fails on certain topologies Communication between nodes will fail under certain topologies JGroups has something called JForkChannel that could be used on container systems. And be injected into Artemis. For some reason that channel cannot be reused for more than one channel per VM. And it cannot ever be closed. I am keeping the trace logs I used to debug this issue in case anything similar to this happens again. --- .../core/ChannelBroadcastEndpointFactory.java | 54 +++++- .../api/core/JGroupsBroadcastEndpoint.java | 167 ++---------------- .../core/JGroupsChannelBroadcastEndpoint.java | 6 +- .../core/JGroupsFileBroadcastEndpoint.java | 5 +- .../JGroupsFileBroadcastEndpointFactory.java | 7 +- .../JGroupsPropertiesBroadcastEndpoint.java | 8 +- ...upsPropertiesBroadcastEndpointFactory.java | 6 +- .../api/core/jgroups/JChannelManager.java | 62 +++++++ .../api/core/jgroups/JChannelWrapper.java | 145 +++++++++++++++ .../api/core/jgroups/JGroupsReceiver.java | 72 ++++++++ .../core/client/impl/ServerLocatorImpl.java | 4 +- .../server/cluster/ClusterController.java | 3 +- .../impl/SharedNothingBackupActivation.java | 108 ++++++++++- 13 files changed, 474 insertions(+), 173 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java index be5e04c4bef..d7086a52ec4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java @@ -16,6 +16,11 @@ */ package org.apache.activemq.artemis.api.core; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.jboss.logging.Logger; import org.jgroups.JChannel; /** @@ -25,11 +30,49 @@ */ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory { + private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class); + private static final boolean isTrace = logger.isTraceEnabled(); + private final JChannel channel; private final String channelName; + private final JChannelManager manager; + + private static final Map managers = new ConcurrentHashMap<>(); + + private static final JChannelManager singletonManager = new JChannelManager(); +// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly +// +// private static JChannelManager recoverManager(JChannel channel) { +// JChannelManager manager = managers.get(channel); +// if (manager == null) { +// if (isTrace) { +// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace")); +// } +// manager = new JChannelManager(); +// managers.put(channel, manager); +// } +// else { +// if (isTrace) { +// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace")); +// } +// +// } +// +// return manager; +// } +// public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) { + // TODO: use recoverManager(channel) + this(singletonManager, channel, channelName); + } + + private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String channelName) { + if (isTrace) { + logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel + ", " + channelName, new Exception("trace")); + } + this.manager = manager; this.channel = channel; this.channelName = channelName; } @@ -42,8 +85,17 @@ public String getChannelName() { return channelName; } + @Override + public String toString() { + return "ChannelBroadcastEndpointFactory{" + + "channel=" + channel + + ", channelName='" + channelName + '\'' + + ", manager=" + manager + + '}'; + } + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel(); + return new JGroupsChannelBroadcastEndpoint(manager, channel, channelName).initChannel(); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java index 5bcddbc5482..7657b0bfcb6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java @@ -16,15 +16,11 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper; +import org.apache.activemq.artemis.api.core.jgroups.JGroupsReceiver; +import org.jboss.logging.Logger; import org.jgroups.JChannel; -import org.jgroups.ReceiverAdapter; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; /** @@ -32,6 +28,9 @@ */ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { + private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class); + + private static final boolean isTrace = logger.isTraceEnabled(); private final String channelName; private boolean clientOpened; @@ -42,12 +41,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { private JGroupsReceiver receiver; - public JGroupsBroadcastEndpoint(String channelName) { + private JChannelManager manager; + + public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) { + this.manager = manager; this.channelName = channelName; } @Override public void broadcast(final byte[] data) throws Exception { + if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (broadcastOpened) { org.jgroups.Message msg = new org.jgroups.Message(); @@ -59,6 +62,7 @@ public void broadcast(final byte[] data) throws Exception { @Override public byte[] receiveBroadcast() throws Exception { + if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (clientOpened) { return receiver.receiveBroadcast(); } @@ -69,6 +73,7 @@ public byte[] receiveBroadcast() throws Exception { @Override public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { + if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (clientOpened) { return receiver.receiveBroadcast(time, unit); } @@ -99,7 +104,7 @@ public synchronized void openBroadcaster() throws Exception { public abstract JChannel createChannel() throws Exception; public JGroupsBroadcastEndpoint initChannel() throws Exception { - this.channel = JChannelManager.getJChannel(channelName, this); + this.channel = manager.getJChannel(channelName, this); return this; } @@ -128,146 +133,4 @@ protected synchronized void internalCloseChannel(JChannelWrapper channel) { channel.close(true); } - /** - * This class is used to receive messages from a JGroups channel. - * Incoming messages are put into a queue. - */ - private static final class JGroupsReceiver extends ReceiverAdapter { - - private final BlockingQueue dequeue = new LinkedBlockingDeque<>(); - - @Override - public void receive(org.jgroups.Message msg) { - dequeue.add(msg.getBuffer()); - } - - public byte[] receiveBroadcast() throws Exception { - return dequeue.take(); - } - - public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { - return dequeue.poll(time, unit); - } - } - - /** - * This class wraps a JChannel with a reference counter. The reference counter - * controls the life of the JChannel. When reference count is zero, the channel - * will be disconnected. - */ - protected static class JChannelWrapper { - - int refCount = 1; - JChannel channel; - String channelName; - final List receivers = new ArrayList<>(); - - public JChannelWrapper(String channelName, JChannel channel) throws Exception { - this.refCount = 1; - this.channelName = channelName; - this.channel = channel; - - //we always add this for the first ref count - channel.setReceiver(new ReceiverAdapter() { - - @Override - public void receive(org.jgroups.Message msg) { - synchronized (receivers) { - for (JGroupsReceiver r : receivers) { - r.receive(msg); - } - } - } - }); - } - - public synchronized void close(boolean closeWrappedChannel) { - refCount--; - if (refCount == 0) { - if (closeWrappedChannel) { - JChannelManager.closeChannel(this.channelName, channel); - } - else { - JChannelManager.removeChannel(this.channelName); - } - //we always remove the receiver as its no longer needed - channel.setReceiver(null); - } - } - - public void removeReceiver(JGroupsReceiver receiver) { - synchronized (receivers) { - receivers.remove(receiver); - } - } - - public synchronized void connect() throws Exception { - if (channel.isConnected()) - return; - channel.connect(channelName); - } - - public void addReceiver(JGroupsReceiver jGroupsReceiver) { - synchronized (receivers) { - receivers.add(jGroupsReceiver); - } - } - - public void send(org.jgroups.Message msg) throws Exception { - channel.send(msg); - } - - public JChannelWrapper addRef() { - this.refCount++; - return this; - } - - @Override - public String toString() { - return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName; - } - } - - /** - * This class maintain a global Map of JChannels wrapped in JChannelWrapper for - * the purpose of reference counting. - * - * Wherever a JChannel is needed it should only get it by calling the getChannel() - * method of this class. The real disconnect of channels are also done here only. - */ - protected static class JChannelManager { - - private static Map channels; - - public static synchronized JChannelWrapper getJChannel(String channelName, - JGroupsBroadcastEndpoint endpoint) throws Exception { - if (channels == null) { - channels = new HashMap<>(); - } - JChannelWrapper wrapper = channels.get(channelName); - if (wrapper == null) { - wrapper = new JChannelWrapper(channelName, endpoint.createChannel()); - channels.put(channelName, wrapper); - return wrapper; - } - return wrapper.addRef(); - } - - public static synchronized void closeChannel(String channelName, JChannel channel) { - channel.setReceiver(null); - channel.disconnect(); - channel.close(); - JChannelWrapper wrapper = channels.remove(channelName); - if (wrapper == null) { - throw new IllegalStateException("Did not find channel " + channelName); - } - } - - public static void removeChannel(String channelName) { - JChannelWrapper wrapper = channels.remove(channelName); - if (wrapper == null) { - throw new IllegalStateException("Did not find channel " + channelName); - } - } - } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java index 4fbb24c2ad6..96cfee64c78 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper; import org.jgroups.JChannel; /** @@ -27,8 +29,8 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint { private final JChannel jChannel; - public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception { - super(channelName); + public JGroupsChannelBroadcastEndpoint(JChannelManager manager, JChannel jChannel, final String channelName) { + super(manager, channelName); this.jChannel = jChannel; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java index 702cb5abab9..be903d37dfc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.jgroups.JChannel; import java.net.URL; @@ -27,8 +28,8 @@ public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint private String file; - public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception { - super(channelName); + public JGroupsFileBroadcastEndpoint(final JChannelManager manager, final String file, final String channelName) throws Exception { + super(manager, channelName); this.file = file; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java index 132ac3ca9cd..9f783e78133 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java @@ -16,15 +16,20 @@ */ package org.apache.activemq.artemis.api.core; + +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; + public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory { private String file; private String channelName; + private final JChannelManager manager = new JChannelManager(); + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel(); + return new JGroupsFileBroadcastEndpoint(manager, file, channelName).initChannel(); } public String getFile() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java index 25cefc37893..d10400a476a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java @@ -16,18 +16,19 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.jgroups.JChannel; import org.jgroups.conf.PlainConfigurator; /** * This class is the implementation of ActiveMQ Artemis members discovery that will use JGroups. */ -public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint { +public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint { private String properties; - public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception { - super(channelName); + public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager, final String properties, final String channelName) throws Exception { + super(manager, channelName); this.properties = properties; } @@ -37,3 +38,4 @@ public JChannel createChannel() throws Exception { return new JChannel(configurator); } } + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java index 4d804354fae..8ed03ab7d0c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java @@ -16,15 +16,19 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; + public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory { private String properties; private String channelName; + private final JChannelManager manager = new JChannelManager(); + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel(); + return new JGroupsPropertiesBroadcastEndpoint(manager, properties, channelName).initChannel(); } public String getProperties() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java new file mode 100644 index 00000000000..296dc8a934a --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.jgroups; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint; +import org.jboss.logging.Logger; + +/** + * This class maintain a global Map of JChannels wrapped in JChannelWrapper for + * the purpose of reference counting. + * + * Wherever a JChannel is needed it should only get it by calling the getChannel() + * method of this class. The real disconnect of channels are also done here only. + */ +public class JChannelManager { + + private static final Logger logger = Logger.getLogger(JChannelManager.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + private Map channels; + + public synchronized JChannelWrapper getJChannel(String channelName, + JGroupsBroadcastEndpoint endpoint) throws Exception { + if (channels == null) { + channels = new HashMap<>(); + } + JChannelWrapper wrapper = channels.get(channelName); + if (wrapper == null) { + wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel()); + channels.put(channelName, wrapper); + if (isTrace) + logger.trace("Put Channel " + channelName); + return wrapper; + } + if (isTrace) + logger.trace("Add Ref Count " + channelName); + return wrapper.addRef(); + } + + public synchronized void removeChannel(String channelName) { + channels.remove(channelName); + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java new file mode 100644 index 00000000000..08a8ff8377e --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.jgroups; + +import java.util.ArrayList; +import java.util.List; + +import org.jboss.logging.Logger; +import org.jgroups.JChannel; +import org.jgroups.ReceiverAdapter; + +/** + * This class wraps a JChannel with a reference counter. The reference counter + * controls the life of the JChannel. When reference count is zero, the channel + * will be disconnected. + */ +public class JChannelWrapper { + private static final Logger logger = Logger.getLogger(JChannelWrapper.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + private boolean connected = false; + int refCount = 1; + final JChannel channel; + final String channelName; + final List receivers = new ArrayList<>(); + private final JChannelManager manager; + + public JChannelWrapper(JChannelManager manager, final String channelName, JChannel channel) throws Exception { + this.refCount = 1; + this.channelName = channelName; + this.channel = channel; + this.manager = manager; + + + if (channel.getReceiver() != null) { + logger.warn("The channel already had a receiver previously!!!!", new Exception("trace")); + } + + //we always add this for the first ref count + channel.setReceiver(new ReceiverAdapter() { + + @Override + public void receive(org.jgroups.Message msg) { + if (isTrace) { + logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName); + } + synchronized (receivers) { + for (JGroupsReceiver r : receivers) { + r.receive(msg); + } + } + } + }); + } + + public JChannel getChannel() { + return channel; + } + + public String getChannelName() { + return channelName; + } + + public synchronized void close(boolean closeWrappedChannel) { + refCount--; + if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace")); + if (refCount == 0) { + if (closeWrappedChannel) { + connected = false; + channel.setReceiver(null); + logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace")); + channel.close(); + } + manager.removeChannel(channelName); + } + } + + public void removeReceiver(JGroupsReceiver receiver) { + if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace")); + synchronized (receivers) { + receivers.remove(receiver); + } + } + + public synchronized void connect() throws Exception { + if (isTrace) { + logger.trace(this + ":: Connecting " + channelName, new Exception("Trace")); + } + + // It is important to check this otherwise we could reconnect an already connected channel + if (connected) { + return; + } + + connected = true; + + if (!channel.isConnected()) { + channel.connect(channelName); + } + } + + public void addReceiver(JGroupsReceiver jGroupsReceiver) { + synchronized (receivers) { + if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName); + receivers.add(jGroupsReceiver); + } + } + + public void send(org.jgroups.Message msg) throws Exception { + if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg); + channel.send(msg); + } + + public JChannelWrapper addRef() { + this.refCount++; + if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName); + return this; + } + + @Override + public String toString() { + return super.toString() + + "{refCount=" + refCount + + ", channel=" + channel + + ", channelName='" + channelName + '\'' + + ", connected=" + connected + + '}'; + } +} + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java new file mode 100644 index 00000000000..c9316615573 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.jgroups; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.jboss.logging.Logger; +import org.jgroups.ReceiverAdapter; + +/** + * This class is used to receive messages from a JGroups channel. + * Incoming messages are put into a queue. + */ +public class JGroupsReceiver extends ReceiverAdapter { + + private static final Logger logger = Logger.getLogger(JGroupsReceiver.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + private final BlockingQueue dequeue = new LinkedBlockingDeque<>(); + + @Override + public void receive(org.jgroups.Message msg) { + if (isTrace) logger.trace("sending message " + msg); + dequeue.add(msg.getBuffer()); + } + + public byte[] receiveBroadcast() throws Exception { + byte[] bytes = dequeue.take(); + if (isTrace) { + logBytes("receiveBroadcast()", bytes); + } + + return bytes; + } + + private void logBytes(String methodName, byte[] bytes) { + if (bytes != null) { + logger.trace(methodName + "::" + bytes.length + " bytes"); + } + else { + logger.trace(methodName + ":: no bytes"); + } + } + + public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { + byte[] bytes = dequeue.poll(time, unit); + + if (isTrace) { + logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes); + } + + return bytes; + } +} + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index e7cc55ad57c..53ba9df862e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -809,7 +809,9 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException { // how the sendSubscription happens. // in case this ever changes. if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) { - factory.cleanup(); + if (factory != null) { + factory.cleanup(); + } throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index b84164f8fbf..175ca9961ca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -408,8 +408,9 @@ public void run() { } } catch (ActiveMQException e) { - if (!started) + if (!started) { return; + } server.getScheduledPool().schedule(this, serverLocator.getRetryInterval(), TimeUnit.MILLISECONDS); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 12e92980d59..d9a5c78c7b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -53,6 +54,10 @@ public final class SharedNothingBackupActivation extends Activation { + + private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class); + private static final boolean isTrace = logger.isTraceEnabled(); + //this is how we act when we start as a backup private ReplicaPolicy replicaPolicy; @@ -129,43 +134,86 @@ public void run() { } ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); + + if (isTrace) { + logger.trace("Waiting on cluster connection"); + } //todo do we actually need to wait? clusterController.awaitConnectionToReplicationCluster(); + if (isTrace) { + logger.trace("Cluster Connected"); + } clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator)); // nodeManager.startBackup(); - + if (isTrace) { + logger.trace("Starting backup manager"); + } activeMQServer.getBackupManager().start(); + if (isTrace) { + logger.trace("Set backup Quorum"); + } replicationEndpoint.setBackupQuorum(backupQuorum); + replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor()); EndpointConnector endpointConnector = new EndpointConnector(); + if (isTrace) { + logger.trace("Starting Backup Server"); + } + ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId()); activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); + if (isTrace) logger.trace("Setting server state as started"); + SharedNothingBackupQuorum.BACKUP_ACTIVATION signal; do { - //locate the first live server to try to replicate - nodeLocator.locateNode(); + + if (closed) { + if (isTrace) { + logger.trace("Activation is closed, so giving up"); + } return; } + + + if (isTrace) { + logger.trace("looking up the node through nodeLocator.locateNode()"); + } + //locate the first live server to try to replicate + nodeLocator.locateNode(); Pair possibleLive = nodeLocator.getLiveConfiguration(); nodeID = nodeLocator.getNodeID(); + + if (isTrace) { + logger.trace("nodeID = " + nodeID); + } //in a normal (non failback) scenario if we couldn't find our live server we should fail if (!attemptFailBack) { + if (isTrace) { + logger.trace("attemptFailback=false, nodeID=" + nodeID); + } + //this shouldn't happen - if (nodeID == null) + if (nodeID == null) { + logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false"); throw new RuntimeException("Could not establish the connection"); + } activeMQServer.getNodeManager().setNodeID(nodeID); } try { + if (isTrace) { + logger.trace("Calling clusterController.connectToNodeInReplicatedCluster(" + possibleLive.getA() + ")"); + } clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA()); } catch (Exception e) { + logger.debug(e.getMessage(), e); if (possibleLive.getB() != null) { try { clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB()); @@ -176,6 +224,10 @@ public void run() { } } if (clusterControl == null) { + + if (isTrace) { + logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry"); + } //its ok to retry here since we haven't started replication yet //it may just be the server has gone since discovery Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster()); @@ -190,23 +242,43 @@ public void run() { * process again on the next live server. All the action happens inside {@link BackupQuorum} */ signal = backupQuorum.waitForStatusChange(); + + if (isTrace) { + logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()"); + } + /** * replicationEndpoint will be holding lots of open files. Make sure they get * closed/sync'ed. */ ActiveMQServerImpl.stopComponent(replicationEndpoint); // time to give up - if (!activeMQServer.isStarted() || signal == STOP) + if (!activeMQServer.isStarted() || signal == STOP) { + if (isTrace) { + logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal); + } return; + } // time to fail over - else if (signal == FAIL_OVER) + else if (signal == FAIL_OVER) { + if (isTrace) { + logger.trace("signal == FAIL_OVER, breaking the loop"); + } break; + } // something has gone badly run restart from scratch else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) { + if (isTrace) { + logger.trace("Starting a new thread to stop the server!"); + } + Thread startThread = new Thread(new Runnable() { @Override public void run() { try { + if (isTrace) { + logger.trace("Calling activeMQServer.stop()"); + } activeMQServer.stop(); } catch (Exception e) { @@ -227,17 +299,30 @@ public void run() { } } while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING); + if (isTrace) { + logger.trace("Activation loop finished, current signal = " + signal); + } + activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum); if (!isRemoteBackupUpToDate()) { + logger.debug("throwing exception for !isRemoteBackupUptoDate"); throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync(); } + + if (isTrace) { + logger.trace("setReplicaPolicy::" + replicaPolicy); + } + replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy); activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy()); + synchronized (activeMQServer) { - if (!activeMQServer.isStarted()) + if (!activeMQServer.isStarted()) { + logger.trace("Server is stopped, giving up right before becomingLive"); return; + } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); activeMQServer.getNodeManager().stopBackup(); activeMQServer.getStorageManager().start(); @@ -262,6 +347,9 @@ public void run() { } } catch (Exception e) { + if (isTrace) { + logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e); + } if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted()) // do not log these errors if the server is being stopped. return; @@ -374,8 +462,10 @@ public void setRemoteBackupUpToDate() { * @throws ActiveMQException */ public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException { - ActiveMQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" + - backupUpToDate); + if (isTrace) { + logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" + + backupUpToDate); + } if (!activeMQServer.getHAPolicy().isBackup() || activeMQServer.getHAPolicy().isSharedStore()) { throw new ActiveMQInternalErrorException(); }