Skip to content

Commit

Permalink
ZOOKEEPER-3131: Remove watcher when session closed in NettyServerCnxn
Browse files Browse the repository at this point in the history
Currently, it doesn't remove itself from ZK server when the cnxn is closed, which
will leak watchers, close it to make it align with NIO implementation.

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: hanm, anmolnar, nkalmar

Closes apache#612 from lvfangmin/ZOOKEEPER-3131
  • Loading branch information
Fangmin Lyu authored and hanm committed Sep 7, 2018
1 parent 0b65e3d commit 95557a3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
18 changes: 11 additions & 7 deletions src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class NettyServerCnxn extends ServerCnxn {

NettyServerCnxnFactory factory;
boolean initialized;

NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
this.channel = channel;
this.closingChannel = false;
Expand All @@ -83,11 +83,11 @@ public class NettyServerCnxn extends ServerCnxn {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
}

@Override
public void close() {
closingChannel = true;

if (LOG.isDebugEnabled()) {
LOG.debug("close called for sessionid:0x"
+ Long.toHexString(sessionId));
Expand Down Expand Up @@ -119,6 +119,10 @@ public void close() {
}
}

if (zkServer != null) {
zkServer.removeCnxn(this);
}

if (channel.isOpen()) {
// Since we don't check on the futures created by write calls to the channel complete we need to make sure
// that all writes have been completed before closing the channel or we risk data loss
Expand Down Expand Up @@ -174,7 +178,7 @@ static class ResumeMessageEvent implements MessageEvent {
@Override
public ChannelFuture getFuture() {return null;}
};

@Override
public void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException {
Expand Down Expand Up @@ -226,7 +230,7 @@ public void sendBuffer(ByteBuffer sendBuffer) {
*/
private class SendBufferWriter extends Writer {
private StringBuffer sb = new StringBuffer();

/**
* Check if we are ready to send another chunk.
* @param force force sending, even if not a full chunk
Expand Down Expand Up @@ -415,15 +419,15 @@ public void receiveMessage(ChannelBuffer message) {
public void disableRecv() {
disableRecvNoWait().awaitUninterruptibly();
}

private ChannelFuture disableRecvNoWait() {
throttled = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Throttling - disabling recv " + this);
}
return channel.setReadable(false);
}

@Override
public long getOutstandingRequests() {
return outstandingCount.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void setUp() throws Exception {
* servercnxnfactory should remove all channel references to avoid
* duplicate channel closure. Duplicate closure may result in indefinite
* hanging due to netty open issue.
*
*
* @see <a href="https://issues.jboss.org/browse/NETTY-412">NETTY-412</a>
*/
@Test(timeout = 40000)
Expand All @@ -66,13 +66,16 @@ public void testSendCloseSession() throws Exception {
serverFactory instanceof NettyServerCnxnFactory);

final ZooKeeper zk = createClient();
final ZooKeeperServer zkServer = getServer(serverFactory);
final String path = "/a";
try {
// make sure zkclient works
zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// set on watch
Assert.assertNotNull("Didn't create znode:" + path,
zk.exists(path, false));
zk.exists(path, true));
Assert.assertEquals(1, zkServer.getZKDatabase().getDataTree().getWatchCount());
Iterable<ServerCnxn> connections = serverFactory.getConnections();
Assert.assertEquals("Mismatch in number of live connections!", 1,
serverFactory.getNumAliveConnections());
Expand All @@ -88,6 +91,8 @@ public void testSendCloseSession() throws Exception {
Assert.fail("The number of live connections should be 0");
}
}
// make sure the watch is removed when the connection closed
Assert.assertEquals(0, zkServer.getZKDatabase().getDataTree().getWatchCount());
} finally {
zk.close();
}
Expand Down

0 comments on commit 95557a3

Please sign in to comment.