Skip to content

Commit

Permalink
ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stab…
Browse files Browse the repository at this point in the history
…le internal value

When packets are added to ClientCnxn's outgoing packet queue we ensure there's no conflict with an ongoing flush of that queue because of connection loss.

Synchronization used to be on the state field's value. This value is both not stable (its value changes over time), possibly causing improper synchronization, and global, which can cause contention in applications that run several ZooKeeper clients.

We now synchronize on outgoingQueue which is both local to a ClientCnxn's instance and stable.

Author: Sylvain Wallez <sylvain@bluxte.net>

Reviewers: maoling <maoling@apache.org>, Mohammad Arshad <arshad@apache.org>

Closes #1257 from swallez/ZOOKEEPER-3652 and squashes the following commits:

82e2cad [Sylvain Wallez] Instruct SpotBugs that we know what we're doing when synchronizing on outgoingQueue
b0bc03d [Sylvain Wallez] ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stable internal value

(cherry picked from commit 91e0520)
Signed-off-by: Mohammad Arshad <arshad@apache.org>
  • Loading branch information
swallez authored and arshadmohammad committed Mar 30, 2022
1 parent 16bf376 commit 9310338
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,7 @@ private void logStartConnect(InetSocketAddress addr) {
}

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
Expand Down Expand Up @@ -1303,7 +1304,7 @@ public void run() {
}
}

synchronized (state) {
synchronized (outgoingQueue) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
Expand Down Expand Up @@ -1645,6 +1646,7 @@ public Packet queuePacket(
return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
}

@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Expand All @@ -1671,7 +1673,7 @@ public Packet queuePacket(
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
synchronized (outgoingQueue) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
Expand Down

0 comments on commit 9310338

Please sign in to comment.