Skip to content

Commit

Permalink
This closes #327
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jan 19, 2016
2 parents 8e4aca1 + 68faa1d commit 54222d8
Showing 1 changed file with 27 additions and 5 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;

Expand All @@ -76,7 +78,7 @@
*
* @see ReplicationEndpoint
*/
public final class ReplicationManager implements ActiveMQComponent {
public final class ReplicationManager implements ActiveMQComponent, ReadyListener {

public enum ADD_OPERATION_TYPE {
UPDATE {
Expand Down Expand Up @@ -109,6 +111,7 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) {

private final Object replicationLock = new Object();

private final ReusableLatch latch = new ReusableLatch();
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();

private final ExecutorFactory executorFactory;
Expand Down Expand Up @@ -261,11 +264,16 @@ public synchronized void stop() throws Exception {
return;
}

enabled = false;

// This is to avoid the write holding a lock while we are trying to close it
if (replicatingChannel != null) {
replicatingChannel.close();
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
latch.setCount(0);
}

synchronized (replicationLock) {
enabled = false;
if (replicatingChannel != null) {
replicatingChannel.close();
}
clearReplicationTokens();
}

Expand Down Expand Up @@ -332,6 +340,15 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp
synchronized (replicationLock) {
if (enabled) {
pendingTokens.add(repliToken);
if (!replicatingChannel.getConnection().isWritable(this)) {
latch.countUp();
try {
latch.await();
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
replicatingChannel.send(packet);
}
else {
Expand All @@ -349,6 +366,11 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp
return repliToken;
}

@Override
public void readyForWriting() {
latch.countDown();
}

/**
* @throws IllegalStateException By default, all replicated packets generate a replicated
* response. If your packets are triggering this exception, it may be because the
Expand Down

0 comments on commit 54222d8

Please sign in to comment.