From d9f2a3eeb39c9cec412790c961e232cf295def54 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 19 Jan 2016 10:11:17 +0000 Subject: [PATCH] ARTEMIS-350 - possible OOM in replication manager https://issues.apache.org/jira/browse/ARTEMIS-350 --- .../core/replication/ReplicationManager.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index a8984fc89f3..74f9906b217 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; @@ -65,6 +66,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -76,7 +78,7 @@ * * @see ReplicationEndpoint */ -public final class ReplicationManager implements ActiveMQComponent { +public final class ReplicationManager implements ActiveMQComponent, ReadyListener { public enum ADD_OPERATION_TYPE { UPDATE { @@ -109,6 +111,7 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) { private final Object replicationLock = new Object(); + private final ReusableLatch latch = new ReusableLatch(); private final Queue pendingTokens = new ConcurrentLinkedQueue<>(); private final ExecutorFactory executorFactory; @@ -260,6 +263,8 @@ public synchronized void stop() throws Exception { if (!started) { return; } + replicatingChannel.getConnection().getTransportConnection().fireReady(true); + latch.setCount(0); synchronized (replicationLock) { enabled = false; @@ -273,6 +278,7 @@ public synchronized void stop() throws Exception { if (toStop != null) { toStop.removeFailureListener(failureListener); } + remotingConnection = null; started = false; } @@ -332,6 +338,15 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp synchronized (replicationLock) { if (enabled) { pendingTokens.add(repliToken); + if (!replicatingChannel.getConnection().isWritable(this)) { + latch.countUp(); + try { + latch.await(); + } + catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } replicatingChannel.send(packet); } else { @@ -349,6 +364,11 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp return repliToken; } + @Override + public void readyForWriting() { + latch.countDown(); + } + /** * @throws IllegalStateException By default, all replicated packets generate a replicated * response. If your packets are triggering this exception, it may be because the