Skip to content

Commit

Permalink
ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stab…
Browse files Browse the repository at this point in the history
…le internal value

When packets are added to ClientCnxn's outgoing packet queue we ensure there's no conflict with an ongoing flush of that queue because of connection loss.

Synchronization used to be on the state field's value. This value is both not stable (its value changes over time), possibly causing improper synchronization, and global, which can cause contention in applications that run several ZooKeeper clients.

We now synchronize on outgoingQueue which is both local to a ClientCnxn's instance and stable.

Author: Sylvain Wallez <sylvain@bluxte.net>

Reviewers: maoling <maoling@apache.org>, Mohammad Arshad <arshad@apache.org>

Closes apache#1257 from swallez/ZOOKEEPER-3652 and squashes the following commits:

82e2cad [Sylvain Wallez] Instruct SpotBugs that we know what we're doing when synchronizing on outgoingQueue
b0bc03d [Sylvain Wallez] ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stable internal value
  • Loading branch information
swallez authored and anurag-harness committed Jan 13, 2023
1 parent 21542ee commit b03d3c4
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,7 @@ private void logStartConnect(InetSocketAddress addr) {
}

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
Expand Down Expand Up @@ -1303,7 +1304,7 @@ public void run() {
}
}

synchronized (state) {
synchronized (outgoingQueue) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
Expand Down Expand Up @@ -1645,6 +1646,7 @@ public Packet queuePacket(
return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
}

@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Expand All @@ -1671,7 +1673,7 @@ public Packet queuePacket(
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
synchronized (outgoingQueue) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
Expand Down

0 comments on commit b03d3c4

Please sign in to comment.