Skip to content

Commit

Permalink
ZOOKEEPER-3439: Observability improvements on client / server connect…
Browse files Browse the repository at this point in the history
…ion close.

Currently when server closes a client connection there is not enough information recorded (except few exception logs) which makes it hard to do postmortems. On the other side, having a complete view of the aggregated connection closing reason will provide more signals based on which we can better operate the clusters (e.g. predicate an incident might happen based on the trending of the connection closing reasons).

Server metrics was not added in this PR as we internally use a different metrics system - so some work needed to migrate to ServerMetrics. Want to submit first to get community feedback on this.

Author: Michael Han <lhan@twitter.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, maoling <maoling199210191@sina.com>, enixon

Closes apache#997 from hanm/twitter/cc6d87505d9745caace2c86acf58d6ffa085281e
  • Loading branch information
hanm committed Jul 2, 2019
1 parent 96eefaf commit 9bee98b
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void process(WatchedEvent event) { }
int getSessionTimeout() { return 0; }

@Override
public void close() { }
public void close(DisconnectReason reason) { }

@Override
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ private boolean closeSession(ServerCnxnFactory serverCnxnFactory, long sessionId
if (serverCnxnFactory == null) {
return false;
}
return serverCnxnFactory.closeSession(sessionId);
return serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_CLOSED_SESSION);
}

private boolean connClosedByClient(Request request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ private void readPayload() throws IOException, InterruptedException, ClientCnxnL
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
+ ", likely client has closed socket",
DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
}
}

Expand Down Expand Up @@ -224,7 +225,8 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
ByteBuffer bb;
while ((bb = outgoingBuffers.peek()) != null) {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
throw new CloseRequestException("close requested",
DisconnectReason.CLIENT_CLOSED_CONNECTION);
}
if (bb == packetSentinel) {
packetSent();
Expand Down Expand Up @@ -274,7 +276,8 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
// Remove the buffers that we have sent
while ((bb = outgoingBuffers.peek()) != null) {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
throw new CloseRequestException("close requested",
DisconnectReason.CLIENT_CLOSED_CONNECTION);
}
if (bb == packetSentinel) {
packetSent();
Expand Down Expand Up @@ -319,7 +322,8 @@ void doIO(SelectionKey k) throws InterruptedException {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
+ ", likely client has closed socket",
DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
Expand All @@ -345,7 +349,8 @@ void doIO(SelectionKey k) throws InterruptedException {
handleWrite(k);

if (!initialized && !getReadInterest() && !getWriteInterest()) {
throw new CloseRequestException("responded to info probe");
throw new CloseRequestException("responded to info probe",
DisconnectReason.INFO_PROBE);
}
}
} catch (CancelledKeyException e) {
Expand All @@ -354,29 +359,29 @@ void doIO(SelectionKey k) throws InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("CancelledKeyException stack trace", e);
}
close();
close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
} catch (CloseRequestException e) {
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
LOG.warn(e.getMessage());
// expecting close to log session closure
close();
close(e.getReason());
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
if (LOG.isDebugEnabled()) {
LOG.debug("Exception causing close of session 0x"
+ Long.toHexString(sessionId) + ": " + e.getMessage());
}
close();
close(DisconnectReason.CLIENT_CNX_LIMIT);
} catch (IOException e) {
LOG.warn("Exception causing close of session 0x"
+ Long.toHexString(sessionId) + ": " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("IOException stack trace", e);
}
close();
close(DisconnectReason.IO_EXCEPTION);
}
}

Expand Down Expand Up @@ -576,11 +581,17 @@ public String toString() {
" sessionId: 0x" + Long.toHexString(sessionId);
}


/**
* Close the cnxn and remove it from the factory cnxns list.
*/
@Override
public void close() {
public void close(DisconnectReason reason) {
disconnectReason = reason;
close();
}

private void close() {
setStale();
if (!factory.removeCnxn(this)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void run() {
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
cleanupSelectionKey(key);
}
Expand Down Expand Up @@ -532,7 +532,7 @@ public void doWork() throws InterruptedException {

// Check if we shutdown or doIO() closed this connection
if (stopped) {
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
return;
}
if (!key.isValid()) {
Expand All @@ -548,13 +548,13 @@ public void doWork() throws InterruptedException {
// on the current set of interest ops, which may have changed
// as a result of the I/O operations we just performed.
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
}
}

@Override
public void cleanup() {
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.CLEAN_UP);
}
}

Expand All @@ -577,7 +577,7 @@ public void run() {
}
for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
conn.close();
conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED);
}
}

Expand Down Expand Up @@ -867,12 +867,12 @@ private int getClientCnxnCount(InetAddress cl) {
*/
@Override
@SuppressWarnings("unchecked")
public void closeAll() {
public void closeAll(ServerCnxn.DisconnectReason reason) {
// clear all the connections on which we are selecting
for (ServerCnxn cnxn : cnxns) {
try {
// This will remove the cnxn from cnxns
cnxn.close();
cnxn.close(reason);
} catch (Exception e) {
LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+ Long.toHexString(cnxn.getSessionId()), e);
Expand Down Expand Up @@ -921,7 +921,7 @@ public void shutdown() {
join();

// close all open connections
closeAll();
closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);

if (login != null) {
login.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,15 @@ public class NettyServerCnxn extends ServerCnxn {
addAuthInfo(new Id("ip", addr.getHostAddress()));
}

/**
* Close the cnxn and remove it from the factory cnxns list.
*/
@Override
public void close(DisconnectReason reason) {
disconnectReason = reason;
close();
}

public void close() {
closingChannel = true;

Expand Down Expand Up @@ -194,7 +202,7 @@ public void setSessionId(long sessionId) {
@Override
public void sendBuffer(ByteBuffer... buffers) {
if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
close();
close(DisconnectReason.CLIENT_CLOSED_CONNECTION);
return;
}
channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(onSendBufferDoneListener);
Expand Down Expand Up @@ -533,14 +541,14 @@ private void receiveMessage(ByteBuf message) {
}
} catch(IOException e) {
LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
close();
close(DisconnectReason.IO_EXCEPTION);
} catch(ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
if (LOG.isDebugEnabled()) {
LOG.debug("Closing connection to " + getRemoteSocketAddress(), e);
}
close();
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.X509KeyManager;
Expand All @@ -58,7 +57,6 @@
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.OptionalSslHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
Expand Down Expand Up @@ -222,7 +220,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel inactive caused close {}", cnxn);
}
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_DISCONNECTED);
}
}

Expand All @@ -234,7 +232,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
if (LOG.isDebugEnabled()) {
LOG.debug("Closing {}", cnxn);
}
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_CLOSED_EXCEPTION);
}
}

Expand Down Expand Up @@ -354,15 +352,15 @@ public void operationComplete(Future<Channel> future) {

if (authProvider == null) {
LOG.error("X509 Auth provider not found: {}", authProviderProp);
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.AUTH_PROVIDER_NOT_FOUND);
return;
}

if (KeeperException.Code.OK !=
authProvider.handleAuthentication(cnxn, null)) {
LOG.error("Authentication failed for session 0x{}",
Long.toHexString(cnxn.getSessionId()));
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.SASL_AUTH_FAILURE);
return;
}
}
Expand All @@ -373,7 +371,7 @@ public void operationComplete(Future<Channel> future) {
} else {
LOG.error("Unsuccessful handshake with session 0x{}",
Long.toHexString(cnxn.getSessionId()));
cnxn.close();
cnxn.close(ServerCnxn.DisconnectReason.FAILED_HANDSHAKE);
}
}
}
Expand Down Expand Up @@ -471,7 +469,7 @@ private synchronized void initSSL(ChannelPipeline p, boolean supportPlaintext)
}

@Override
public void closeAll() {
public void closeAll(ServerCnxn.DisconnectReason reason) {
if (LOG.isDebugEnabled()) {
LOG.debug("closeAll()");
}
Expand All @@ -480,7 +478,7 @@ public void closeAll() {
for (ServerCnxn cnxn : cnxns) {
try {
// This will remove the cnxn from cnxns
cnxn.close();
cnxn.close(reason);
} catch (Exception e) {
LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+ Long.toHexString(cnxn.getSessionId()), e);
Expand Down Expand Up @@ -559,7 +557,7 @@ public void shutdown() {
bossGroup.shutdownGracefully();
});
}
closeAll();
closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
ChannelGroupFuture allChannelsCloseFuture = allChannels.close();
if (workerGroup != null) {
allChannelsCloseFuture.addListener(future -> {
Expand Down
Loading

0 comments on commit 9bee98b

Please sign in to comment.