Skip to content

Commit

Permalink
ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stab…
Browse files Browse the repository at this point in the history
…le internal value

When packets are added to ClientCnxn's outgoing packet queue we ensure there's no conflict with an ongoing flush of that queue because of connection loss.

Synchronization used to be on the state field's value. This value is both not stable (its value changes over time), possibly causing improper synchronization, and global, which can cause contention in applications that run several ZooKeeper clients.

We now synchronize on outgoingQueue which is both local to a ClientCnxn's instance and stable.

Author: Sylvain Wallez <sylvain@bluxte.net>

Reviewers: maoling <maoling@apache.org>, Mohammad Arshad <arshad@apache.org>

Closes #1257 from swallez/ZOOKEEPER-3652 and squashes the following commits:

82e2cad [Sylvain Wallez] Instruct SpotBugs that we know what we're doing when synchronizing on outgoingQueue
b0bc03d [Sylvain Wallez] ZOOKEEPER-3652: Synchronize ClientCnxn outgoing queue flush on a stable internal value

(cherry picked from commit 91e0520)
  • Loading branch information
swallez authored and symat committed Apr 5, 2022
1 parent 73ea6aa commit e925265
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static class AuthData {
* operation)
*/
private volatile boolean closing = false;

/**
* A set of ZooKeeper hosts this client could connect to.
*/
Expand Down Expand Up @@ -649,7 +649,7 @@ private void processEvent(Object event) {
.substring(chrootPath.length())), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
Expand Down Expand Up @@ -764,7 +764,7 @@ static class EndOfStreamException extends IOException {
public EndOfStreamException(String msg) {
super(msg);
}

@Override
public String toString() {
return "EndOfStreamException: " + getMessage();
Expand All @@ -778,7 +778,7 @@ public SessionTimeoutException(String msg) {
super(msg);
}
}

private static class SessionExpiredException extends IOException {
private static final long serialVersionUID = -1388816932076193249L;

Expand Down Expand Up @@ -826,10 +826,10 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
eventThread.queueEventOfDeath();
}
Expand Down Expand Up @@ -940,7 +940,7 @@ else if (serverPath.length() > chrootPath.length())
// Runnable
/**
* Used by ClientCnxnSocket
*
*
* @return
*/
ZooKeeper.States getZkState() {
Expand Down Expand Up @@ -1114,6 +1114,7 @@ private void logStartConnect(InetSocketAddress addr) {
private static final String RETRY_CONN_MSG =
", closing socket connection and attempting reconnect";
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
Expand Down Expand Up @@ -1180,7 +1181,7 @@ public void run() {
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
Expand All @@ -1193,8 +1194,8 @@ public void run() {
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
Expand Down Expand Up @@ -1256,7 +1257,8 @@ public void run() {
}
}
}
synchronized (state) {

synchronized (outgoingQueue) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
Expand Down Expand Up @@ -1362,7 +1364,7 @@ private void cleanup() {
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.
*
*
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
Expand Down Expand Up @@ -1589,10 +1591,18 @@ public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
ctx, watchRegistration, null);
}

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;

// Note that we do not generate the Xid for the packet yet. It is
Expand All @@ -1608,7 +1618,7 @@ public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
synchronized (outgoingQueue) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
Expand Down

0 comments on commit e925265

Please sign in to comment.