Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Browse files Browse the repository at this point in the history
  • Loading branch information
symat authored and Mate Szalay-Beko committed Nov 13, 2019
2 parents 31805e7 + cd46594 commit f875f5c
Show file tree
Hide file tree
Showing 3 changed files with 396 additions and 87 deletions.
Expand Up @@ -41,9 +41,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -66,6 +65,7 @@
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.util.CircularBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -149,17 +149,13 @@ public class QuorumCnxManager {
* Mapping from Peer to Thread number
*/
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

/*
* Reception queue
*/
public final ArrayBlockingQueue<Message> recvQueue;
/*
* Object to synchronize access to recvQueue
*/
private final Object recvQLock = new Object();
public final BlockingQueue<Message> recvQueue;

/*
* Shutdown flag
Expand Down Expand Up @@ -273,7 +269,7 @@ public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.
QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {

this.recvQueue = new ArrayBlockingQueue<>(RECV_CAPACITY);
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<>();
this.senderWorkerMap = new ConcurrentHashMap<>();
this.lastMessageSent = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -462,7 +458,8 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
}

senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));

queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

sw.start();
rw.start();
Expand Down Expand Up @@ -597,7 +594,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti

senderWorkerMap.put(sid, sw);

queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

sw.start();
rw.start();
Expand All @@ -622,10 +619,9 @@ public void toSend(Long sid, ByteBuffer b) {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new ArrayBlockingQueue<>(SEND_CAPACITY));
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);

}
}

Expand Down Expand Up @@ -761,9 +757,10 @@ public void connectAll() {
* Check if all queues are empty, indicating that all messages have been delivered.
*/
boolean haveDelivered() {
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
LOG.debug("Queue size: {}", queue.size());
if (queue.size() == 0) {
for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
final int queueSize = queue.size();
LOG.debug("Queue size: {}", queueSize);
if (queueSize == 0) {
return true;
}
}
Expand Down Expand Up @@ -1207,7 +1204,7 @@ public void run() {
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
Expand All @@ -1225,7 +1222,7 @@ public void run() {

ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
Expand Down Expand Up @@ -1365,37 +1362,19 @@ public void run() {
}

/**
* Inserts an element in the specified queue. If the Queue is full, this
* method removes an element from the head of the Queue and then inserts
* the element at the tail. It can happen that an element is removed
* by another thread in {@link SendWorker#run() }
* method before this method attempts to remove an element from the queue.
* This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
* exception, which is safe to ignore.
*
* Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
* not need to be synchronized since there is only one thread that inserts
* an element in the queue and another thread that reads from the queue.
* Inserts an element in the provided {@link BlockingQueue}. This method
* assumes that if the Queue is full, an element from the head of the Queue is
* removed and the new item is inserted at the tail of the queue. This is done
* to prevent a thread from blocking while inserting an element in the queue.
*
* @param queue
* Reference to the Queue
* @param buffer
* Reference to the buffer to be inserted in the queue
* @param queue Reference to the Queue
* @param buffer Reference to the buffer to be inserted in the queue
*/
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
if (queue.remainingCapacity() == 0) {
try {
queue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty Queue. Ignoring exception.", ne);
}
}
try {
queue.add(buffer);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert an element in the queue ", ie);
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
final ByteBuffer buffer) {
final boolean success = queue.offer(buffer);
if (!success) {
throw new RuntimeException("Could not insert into receive queue");
}
}

Expand All @@ -1406,7 +1385,7 @@ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buf
* @return
* true if the specified queue is empty
*/
private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
private boolean isSendQueueEmpty(final BlockingQueue<ByteBuffer> queue) {
return queue.isEmpty();
}

Expand All @@ -1415,60 +1394,37 @@ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
* waiting up to the specified wait time if necessary for an element to
* become available.
*
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
final long timeout, final TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

/**
* Inserts an element in the {@link #recvQueue}. If the Queue is full, this
* methods removes an element from the head of the Queue and then inserts
* the element at the tail of the queue.
*
* This method is synchronized to achieve fairness between two threads that
* are trying to insert an element in the queue. Each thread checks if the
* queue is full, then removes the element at the head of the queue, and
* then inserts an element at the tail. This three-step process is done to
* prevent a thread from blocking while inserting an element in the queue.
* If we do not synchronize the call to this method, then a thread can grab
* a slot in the queue created by the second thread. This can cause the call
* to insert by the second thread to fail.
* Note that synchronizing this method does not block another thread
* from polling the queue since that synchronization is provided by the
* queue itself.
* methods removes an element from the head of the Queue and then inserts the
* element at the tail of the queue.
*
* @param msg
* Reference to the message to be inserted in the queue
* @param msg Reference to the message to be inserted in the queue
*/
public void addToRecvQueue(Message msg) {
synchronized (recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception.", ne);
}
}
try {
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue ", ie);
}
}
public void addToRecvQueue(final Message msg) {
final boolean success = this.recvQueue.offer(msg);
if (!success) {
throw new RuntimeException("Could not insert into receive queue");
}
}

/**
* Retrieves and removes a message at the head of this queue,
* waiting up to the specified wait time if necessary for an element to
* become available.
*
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
return recvQueue.poll(timeout, unit);
public Message pollRecvQueue(final long timeout, final TimeUnit unit)
throws InterruptedException {
return this.recvQueue.poll(timeout, unit);
}

public boolean connectedToPeer(long peerSid) {
Expand Down

0 comments on commit f875f5c

Please sign in to comment.