Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ZOOKEEPER-909. Extract NIO specific code from ClientCnxn

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@1033770 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 426778e9d645beec699fc79b2f0fdf8c23053ecd 1 parent 42736ad
@phunt phunt authored
View
9 CHANGES.txt
@@ -187,9 +187,14 @@ IMPROVEMENTS:
ZOOKEEPER-864. Hedwig C++ client improvements (Ivan Kelly via breed)
- ZOOKEEPER-862. Hedwig created ledgers with hardcoded Bookkeeper ensemble and quorum size. Make these a server config parameter instead. (Erwin T via breed)
+ ZOOKEEPER-862. Hedwig created ledgers with hardcoded Bookkeeper ensemble and
+ quorum size. Make these a server config parameter instead. (Erwin T via breed)
- ZOOKEEPER-926. Fork Hadoop common's test-patch.sh and modify for Zookeeper. (nigel)
+ ZOOKEEPER-926. Fork Hadoop common's test-patch.sh and modify for Zookeeper.
+ (nigel)
+
+ ZOOKEEPER-909. Extract NIO specific code from ClientCnxn
+ (Thomas Koch via phunt)
NEW FEATURES:
ZOOKEEPER-729. Java client API to recursively delete a subtree.
View
2  ivy.xml
@@ -41,7 +41,7 @@
<dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
<dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
- <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+ <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.2.2.Final">
<artifact name="netty" type="jar" conf="default"/>
</dependency>
View
2  ivysettings.xml
@@ -20,7 +20,7 @@
<property name="repo.maven.org"
value="http://repo1.maven.org/maven2/" override="false"/>
<property name="repo.jboss.org"
- value="http://repository.jboss.com/maven2/" override="false"/>
+ value="http://repository.jboss.org/nexus/content/groups/public/" override="false"/>
<property name="repo.sun.org"
value="http://download.java.net/maven/2/" override="false"/>
<property name="maven2.pattern"
View
571 src/java/main/org/apache/zookeeper/ClientCnxn.java
@@ -25,14 +25,12 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryInputArchive;
@@ -55,7 +53,6 @@
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.proto.AuthPacket;
import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLResponse;
@@ -85,8 +82,6 @@
* option allows the client to turn off this behavior by setting
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
private static boolean disableAutoWatchReset;
-
- public static final int packetLen;
static {
// this var should not be public, but otw there is no easy way
// to test
@@ -96,7 +91,6 @@
LOG.debug("zookeeper.disableAutoWatchReset is "
+ disableAutoWatchReset);
}
- packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
}
private final ArrayList<InetSocketAddress> serverAddrs =
@@ -113,7 +107,7 @@
byte data[];
}
- private final ArrayList<AuthData> authInfo = new ArrayList<AuthData>();
+ private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
/**
* These are the packets that have been sent and are waiting for a response.
@@ -129,10 +123,11 @@
private int connectTimeout;
- /** The timeout in ms the client negotiated with the server. This is the
- * "real" timeout, not the timeout request by the client (which may
- * have been increased/decreased by the server which applies bounds
- * to this value.
+ /**
+ * The timeout in ms the client negotiated with the server. This is the
+ * "real" timeout, not the timeout request by the client (which may have
+ * been increased/decreased by the server which applies bounds to this
+ * value.
*/
private volatile int negotiatedSessionTimeout;
@@ -154,15 +149,13 @@
final EventThread eventThread;
- final Selector selector = Selector.open();
-
/**
* Set to true when close is called. Latches the connection such that we
* don't attempt to re-connect to the server if in the middle of closing the
* connection (client sends session disconnect to server as part of close
* operation)
*/
- volatile boolean closing = false;
+ private volatile boolean closing = false;
public long getSessionId() {
return sessionId;
@@ -180,56 +173,22 @@ public int getSessionTimeout() {
public String toString() {
StringBuilder sb = new StringBuilder();
- SocketAddress local = getLocalSocketAddress();
- SocketAddress remote = getRemoteSocketAddress();
+ SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
+ SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
sb
.append("sessionid:0x").append(Long.toHexString(getSessionId()))
.append(" local:").append(local)
.append(" remoteserver:").append(remote)
.append(" lastZxid:").append(lastZxid)
.append(" xid:").append(xid)
- .append(" sent:").append(sendThread.sentCount)
- .append(" recv:").append(sendThread.recvCount)
+ .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
+ .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
.append(" queuedpkts:").append(outgoingQueue.size())
.append(" pendingresp:").append(pendingQueue.size())
.append(" queuedevents:").append(eventThread.waitingEvents.size());
return sb.toString();
}
-
- /**
- * Returns the address to which the socket is connected.
- * @return ip address of the remote side of the connection or null if
- * not connected
- */
- SocketAddress getRemoteSocketAddress() {
- // a lot could go wrong here, so rather than put in a bunch of code
- // to check for nulls all down the chain let's do it the simple
- // yet bulletproof way
- try {
- return ((SocketChannel)sendThread.sockKey.channel())
- .socket().getRemoteSocketAddress();
- } catch (NullPointerException e) {
- return null;
- }
- }
-
- /**
- * Returns the local address to which the socket is bound.
- * @return ip address of the remote side of the connection or null if
- * not connected
- */
- SocketAddress getLocalSocketAddress() {
- // a lot could go wrong here, so rather than put in a bunch of code
- // to check for nulls all down the chain let's do it the simple
- // yet bulletproof way
- try {
- return ((SocketChannel)sendThread.sockKey.channel())
- .socket().getLocalSocketAddress();
- } catch (NullPointerException e) {
- return null;
- }
- }
/**
* This class allows us to pass the headers and the relevant records around.
@@ -319,13 +278,14 @@ public String toString() {
* @param zooKeeper
* the zookeeper object that this connection is related to.
* @param watcher watcher for this connection
+ * @param clientCnxnSocket
+ * the socket implementation used (e.g. NIO/Netty)
* @throws IOException
*/
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
- ClientWatchManager watcher)
- throws IOException
- {
- this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket)
+ throws IOException {
+ this(hosts, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
}
/**
@@ -340,14 +300,15 @@ public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
* @param zooKeeper
* the zookeeper object that this connection is related to.
* @param watcher watcher for this connection
+ * @param clientCnxnSocket
+ * the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @throws IOException
*/
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
- ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
- throws IOException
- {
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+ long sessionId, byte[] sessionPasswd) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
@@ -389,7 +350,7 @@ public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
connectTimeout = sessionTimeout / hostsList.length;
readTimeout = sessionTimeout * 2 / 3;
Collections.shuffle(serverAddrs);
- sendThread = new SendThread();
+ sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
@@ -412,9 +373,10 @@ public void start() {
eventThread.start();
}
- Object eventOfDeath = new Object();
+ private Object eventOfDeath = new Object();
- final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+ private final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("from " + t.getName(), e);
}
@@ -640,7 +602,7 @@ private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
- switch(zooKeeper.state) {
+ switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
@@ -653,15 +615,16 @@ private void conLossPacket(Packet p) {
finishPacket(p);
}
- volatile long lastZxid;
+ private volatile long lastZxid;
- private static class EndOfStreamException extends IOException {
+ static class EndOfStreamException extends IOException {
private static final long serialVersionUID = -5438877188796231422L;
public EndOfStreamException(String msg) {
super(msg);
}
+ @Override
public String toString() {
return "EndOfStreamException: " + getMessage();
}
@@ -683,75 +646,21 @@ public SessionExpiredException(String msg) {
}
}
+ public static final int packetLen = Integer.getInteger("jute.maxbuffer",
+ 4096 * 1024);
+
/**
* This class services the outgoing request queue and generates the heart
* beats. It also spawns the ReadThread.
*/
class SendThread extends Thread {
- SelectionKey sockKey;
-
- final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
-
- ByteBuffer incomingBuffer = lenBuffer;
-
- boolean initialized;
-
private long lastPingSentNs;
+ private final ClientCnxnSocket clientCnxnSocket;
+ private int lastConnectIndex = -1;
+ private int currentConnectIndex;
+ private Random r = new Random(System.nanoTime());
- long sentCount = 0;
- long recvCount = 0;
-
- void readLength() throws IOException {
- int len = incomingBuffer.getInt();
- if (len < 0 || len >= packetLen) {
- throw new IOException("Packet len" + len + " is out of range!");
- }
- incomingBuffer = ByteBuffer.allocate(len);
- }
-
- void readConnectResult() throws IOException {
- if (LOG.isTraceEnabled()) {
- StringBuffer buf = new StringBuffer("0x[");
- for (byte b : incomingBuffer.array()) {
- buf.append(Integer.toHexString(b) + ",");
- }
- buf.append("]");
- LOG.trace("readConnectRestult " + incomingBuffer.remaining()
- + " " + buf.toString());
- }
- ByteBufferInputStream bbis = new ByteBufferInputStream(
- incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ConnectResponse conRsp = new ConnectResponse();
- conRsp.deserialize(bbia, "connect");
- negotiatedSessionTimeout = conRsp.getTimeOut();
- if (negotiatedSessionTimeout <= 0) {
- zooKeeper.state = States.CLOSED;
-
- eventThread.queueEvent(new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Expired, null));
- eventThread.queueEventOfDeath();
- throw new SessionExpiredException(
- "Unable to reconnect to ZooKeeper service, session 0x"
- + Long.toHexString(sessionId) + " has expired");
- }
- readTimeout = negotiatedSessionTimeout * 2 / 3;
- connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
- sessionId = conRsp.getSessionId();
- sessionPasswd = conRsp.getPasswd();
- zooKeeper.state = States.CONNECTED;
- LOG.info("Session establishment complete on server "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress()
- + ", sessionid = 0x"
- + Long.toHexString(sessionId)
- + ", negotiated timeout = " + negotiatedSessionTimeout);
- eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.SyncConnected, null));
- }
-
- void readResponse() throws IOException {
+ void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -772,7 +681,7 @@ void readResponse() throws IOException {
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
- zooKeeper.state = States.AUTH_FAILED;
+ state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
@@ -809,12 +718,12 @@ void readResponse() throws IOException {
eventThread.queueEvent( we );
return;
}
- if (pendingQueue.size() == 0) {
- throw new IOException("Nothing in the queue, but got "
- + replyHdr.getXid());
- }
- Packet packet = null;
+ Packet packet;
synchronized (pendingQueue) {
+ if (pendingQueue.size() == 0) {
+ throw new IOException("Nothing in the queue, but got "
+ + replyHdr.getXid());
+ }
packet = pendingQueue.remove();
}
/*
@@ -825,9 +734,13 @@ void readResponse() throws IOException {
if (packet.header.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
- throw new IOException("Xid out of order. Got "
- + replyHdr.getXid() + " expected "
- + packet.header.getXid());
+ throw new IOException("Xid out of order. Got Xid "
+ + replyHdr.getXid() + " with err " +
+ + replyHdr.getErr() +
+ " expected Xid "
+ + packet.header.getXid()
+ + " for a packet with details: "
+ + packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
@@ -849,113 +762,34 @@ void readResponse() throws IOException {
}
}
- /**
- * @return true if a packet was received
- * @throws InterruptedException
- * @throws IOException
- */
- boolean doIO() throws InterruptedException, IOException {
- boolean packetReceived = false;
- SocketChannel sock = (SocketChannel) sockKey.channel();
- if (sock == null) {
- throw new IOException("Socket is null!");
- }
- if (sockKey.isReadable()) {
- int rc = sock.read(incomingBuffer);
- if (rc < 0) {
- throw new EndOfStreamException(
- "Unable to read additional data from server sessionid 0x"
- + Long.toHexString(sessionId)
- + ", likely server has closed socket");
- }
- if (!incomingBuffer.hasRemaining()) {
- incomingBuffer.flip();
- if (incomingBuffer == lenBuffer) {
- recvCount++;
- readLength();
- } else if (!initialized) {
- readConnectResult();
- enableRead();
- if (!outgoingQueue.isEmpty()) {
- enableWrite();
- }
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- packetReceived = true;
- initialized = true;
- } else {
- readResponse();
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- packetReceived = true;
- }
- }
- }
- if (sockKey.isWritable()) {
- synchronized (outgoingQueue) {
- if (!outgoingQueue.isEmpty()) {
- ByteBuffer pbb = outgoingQueue.getFirst().bb;
- sock.write(pbb);
- if (!pbb.hasRemaining()) {
- sentCount++;
- Packet p = outgoingQueue.removeFirst();
- if (p.header != null
- && p.header.getType() != OpCode.ping
- && p.header.getType() != OpCode.auth) {
- pendingQueue.add(p);
- }
- }
- }
- }
- }
- if (outgoingQueue.isEmpty()) {
- disableWrite();
- } else {
- enableWrite();
- }
- return packetReceived;
- }
-
- synchronized private void enableWrite() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_WRITE) == 0) {
- sockKey.interestOps(i | SelectionKey.OP_WRITE);
- }
- }
-
- synchronized private void disableWrite() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_WRITE) != 0) {
- sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
- }
- }
-
- synchronized private void enableRead() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_READ) == 0) {
- sockKey.interestOps(i | SelectionKey.OP_READ);
- }
+ SendThread(ClientCnxnSocket clientCnxnSocket) {
+ super(makeThreadName("-SendThread()"));
+ state = States.CONNECTING;
+ this.clientCnxnSocket = clientCnxnSocket;
+ setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ setDaemon(true);
}
- synchronized private void disableRead() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_READ) != 0) {
- sockKey.interestOps(i & (~SelectionKey.OP_READ));
- }
+ // TODO: can not name this method getState since Thread.getState()
+ // already exists
+ // It would be cleaner to make class SendThread an implementation of
+ // Runnable
+ /**
+ * Used by ClientCnxnSocket
+ *
+ * @return
+ */
+ ZooKeeper.States getZkState() {
+ return state;
}
- SendThread() {
- super(makeThreadName("-SendThread()"));
- zooKeeper.state = States.CONNECTING;
- setUncaughtExceptionHandler(uncaughtExceptionHandler);
- setDaemon(true);
+ ClientCnxnSocket getClientCnxnSocket() {
+ return clientCnxnSocket;
}
- private void primeConnection(SelectionKey k) throws IOException {
+ void primeConnection() throws IOException {
LOG.info("Socket connection established to "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress()
- + ", initiating session");
+ + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
lastConnectIndex = currentConnectIndex;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessionId, sessionPasswd);
@@ -970,11 +804,12 @@ private void primeConnection(SelectionKey k) throws IOException {
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
- if (!disableAutoWatchReset &&
- (!zooKeeper.getDataWatches().isEmpty()
- || !zooKeeper.getExistWatches().isEmpty()
- || !zooKeeper.getChildWatches().isEmpty()))
- {
+ // TODO: here we have the only remaining use of zooKeeper in
+ // this class. It's to be eliminated!
+ if (!disableAutoWatchReset
+ && (!zooKeeper.getDataWatches().isEmpty()
+ || !zooKeeper.getExistWatches().isEmpty() || !zooKeeper
+ .getChildWatches().isEmpty())) {
SetWatches sw = new SetWatches(lastZxid,
zooKeeper.getDataWatches(),
zooKeeper.getExistWatches(),
@@ -995,13 +830,10 @@ private void primeConnection(SelectionKey k) throws IOException {
outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
null)));
}
- synchronized (this) {
- k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- }
+ clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress());
+ + clientCnxnSocket.getRemoteSocketAddress());
}
}
@@ -1011,12 +843,6 @@ private void sendPing() {
queuePacket(h, null, null, null, null, null, null, null, null);
}
- int lastConnectIndex = -1;
-
- int currentConnectIndex;
-
- Random r = new Random(System.nanoTime());
-
private void startConnect() throws IOException {
if (lastConnectIndex == -1) {
// We don't want to delay the first try at a connect, so we
@@ -1037,7 +863,7 @@ private void startConnect() throws IOException {
}
}
}
- zooKeeper.state = States.CONNECTING;
+ state = States.CONNECTING;
currentConnectIndex = nextAddrToTry;
InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
nextAddrToTry++;
@@ -1045,24 +871,11 @@ private void startConnect() throws IOException {
nextAddrToTry = 0;
}
LOG.info("Opening socket connection to server " + addr);
- SocketChannel sock;
- sock = SocketChannel.open();
- sock.configureBlocking(false);
- sock.socket().setSoLinger(false, -1);
- sock.socket().setTcpNoDelay(true);
+
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
- sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
- if (sock.connect(addr)) {
- primeConnection(sockKey);
- }
- initialized = false;
- /*
- * Reset incomingBuffer
- */
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
+ clientCnxnSocket.connect(addr);
}
private static final String RETRY_CONN_MSG =
@@ -1070,39 +883,38 @@ private void startConnect() throws IOException {
@Override
public void run() {
- long now = System.currentTimeMillis();
- long lastHeard = now;
- long lastSend = now;
- while (zooKeeper.state.isAlive()) {
+ clientCnxnSocket.introduce(this,sessionId);
+ clientCnxnSocket.updateNow();
+ clientCnxnSocket.updateLastSendAndHeard();
+ while (state.isAlive()) {
try {
- if (sockKey == null) {
+ if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
startConnect();
- lastSend = now;
- lastHeard = now;
+ clientCnxnSocket.updateLastSendAndHeard();
}
- int idleRecv = (int) (now - lastHeard);
- int idleSend = (int) (now - lastSend);
- int to = readTimeout - idleRecv;
- if (zooKeeper.state != States.CONNECTED) {
- to = connectTimeout - idleRecv;
+
+ int to = readTimeout - clientCnxnSocket.getIdleRecv();
+ if (state != States.CONNECTED) {
+ to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
throw new SessionTimeoutException(
"Client session timed out, have not heard from server in "
- + idleRecv + "ms"
- + " for sessionid 0x"
- + Long.toHexString(sessionId));
+ + clientCnxnSocket.getIdleRecv() + "ms"
+ + " for sessionid 0x"
+ + Long.toHexString(sessionId));
}
- if (zooKeeper.state == States.CONNECTED) {
- int timeToNextPing = readTimeout/2 - idleSend;
+ if (state == States.CONNECTED) {
+ int timeToNextPing = readTimeout / 2
+ - clientCnxnSocket.getIdleSend();
if (timeToNextPing <= 0) {
sendPing();
- lastSend = now;
- enableWrite();
+ clientCnxnSocket.updateLastSend();
+ clientCnxnSocket.enableWrite();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
@@ -1110,42 +922,8 @@ public void run() {
}
}
- selector.select(to);
- Set<SelectionKey> selected;
- synchronized (this) {
- selected = selector.selectedKeys();
- }
- // Everything below and until we get back to the select is
- // non blocking, so time is effectively a constant. That is
- // Why we just have to do this once, here
- now = System.currentTimeMillis();
- for (SelectionKey k : selected) {
- SocketChannel sc = ((SocketChannel) k.channel());
- if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
- if (sc.finishConnect()) {
- lastHeard = now;
- lastSend = now;
- primeConnection(k);
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- if (outgoingQueue.size() > 0) {
- // We have something to send so it's the same
- // as if we do the send now.
- lastSend = now;
- }
- if (doIO()) {
- lastHeard = now;
- }
- }
- }
- if (zooKeeper.state == States.CONNECTED) {
- if (outgoingQueue.size() > 0) {
- enableWrite();
- } else {
- disableWrite();
- }
- }
- selected.clear();
+ clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
+
} catch (Exception e) {
if (closing) {
if (LOG.isDebugEnabled()) {
@@ -1164,92 +942,38 @@ public void run() {
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else {
- LOG.warn("Session 0x"
- + Long.toHexString(getSessionId())
- + " for server "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress()
- + ", unexpected error"
- + RETRY_CONN_MSG,
- e);
+ LOG.warn(
+ "Session 0x"
+ + Long.toHexString(getSessionId())
+ + " for server "
+ + clientCnxnSocket.getRemoteSocketAddress()
+ + ", unexpected error"
+ + RETRY_CONN_MSG, e);
}
cleanup();
- if (zooKeeper.state.isAlive()) {
+ if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
-
- now = System.currentTimeMillis();
- lastHeard = now;
- lastSend = now;
+ clientCnxnSocket.updateNow();
+ clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Doing client selector close");
- }
- selector.close();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Closed client selector");
- }
- } catch (IOException e) {
- LOG.warn("Ignoring exception during selector close", e);
- }
- if (zooKeeper.state.isAlive()) {
- eventThread.queueEvent(new WatchedEvent(
- Event.EventType.None,
- Event.KeeperState.Disconnected,
- null));
+ clientCnxnSocket.close();
+ if (state.isAlive()) {
+ eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+ Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop.");
}
private void cleanup() {
- if (sockKey != null) {
- SocketChannel sock = (SocketChannel) sockKey.channel();
- sockKey.cancel();
- try {
- sock.socket().shutdownInput();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during shutdown input", e);
- }
- }
- try {
- sock.socket().shutdownOutput();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during shutdown output", e);
- }
- }
- try {
- sock.socket().close();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during socket close", e);
- }
- }
- try {
- sock.close();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during channel close", e);
- }
- }
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SendThread interrupted during sleep, ignoring");
- }
- }
- sockKey = null;
+ clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
@@ -1264,11 +988,50 @@ private void cleanup() {
}
}
- public void close() {
- synchronized (this) {
- zooKeeper.state = States.CLOSED;
- selector.wakeup();
+ /**
+ * Callback invoked by the ClientCnxnSocket once a connection has been
+ * established.
+ *
+ * @param _negotiatedSessionTimeout
+ * @param _sessionId
+ * @param _sessionPasswd
+ * @throws IOException
+ */
+ void onConnected(int _negotiatedSessionTimeout, long _sessionId,
+ byte[] _sessionPasswd) throws IOException {
+ negotiatedSessionTimeout = _negotiatedSessionTimeout;
+ if (negotiatedSessionTimeout <= 0) {
+ state = States.CLOSED;
+
+ eventThread.queueEvent(new WatchedEvent(
+ Watcher.Event.EventType.None,
+ Watcher.Event.KeeperState.Expired, null));
+ eventThread.queueEventOfDeath();
+ throw new SessionExpiredException(
+ "Unable to reconnect to ZooKeeper service, session 0x"
+ + Long.toHexString(sessionId) + " has expired");
}
+ readTimeout = negotiatedSessionTimeout * 2 / 3;
+ connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
+ sessionId = _sessionId;
+ sessionPasswd = _sessionPasswd;
+ state = States.CONNECTED;
+ LOG.info("Session establishment complete on server "
+ + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x"
+ + Long.toHexString(sessionId) + ", negotiated timeout = "
+ + negotiatedSessionTimeout);
+ eventThread.queueEvent(new WatchedEvent(
+ Watcher.Event.EventType.None,
+ Watcher.Event.KeeperState.SyncConnected, null));
+ }
+
+ void close() {
+ state = States.CLOSED;
+ clientCnxnSocket.wakeupCnxn();
+ }
+
+ void testableCloseSocket() throws IOException {
+ clientCnxnSocket.testableCloseSocket();
}
}
@@ -1314,6 +1077,8 @@ public void close() throws IOException {
private int xid = 1;
+ private volatile States state;
+
synchronized private int getXid() {
return xid++;
}
@@ -1347,7 +1112,7 @@ Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
- if (!zooKeeper.state.isAlive() || closing) {
+ if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
@@ -1358,14 +1123,12 @@ Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
outgoingQueue.add(packet);
}
}
- synchronized (sendThread) {
- selector.wakeup();
- }
+ sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
public void addAuthInfo(String scheme, byte auth[]) {
- if (!zooKeeper.state.isAlive()) {
+ if (!state.isAlive()) {
return;
}
authInfo.add(new AuthData(scheme, auth));
@@ -1373,4 +1136,8 @@ public void addAuthInfo(String scheme, byte auth[]) {
new AuthPacket(0, scheme, auth), null, null, null, null,
null, null);
}
+
+ States getState() {
+ return state;
+ }
}
View
154 src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+/**
+ * A ClientCnxnSocket does the lower level communication with a socket
+ * implementation.
+ *
+ * This code has been moved out of ClientCnxn so that a Netty implementation can
+ * be provided as an alternative to the NIO socket code.
+ *
+ */
+abstract class ClientCnxnSocket {
+ private static final Logger LOG = Logger.getLogger(ClientCnxnSocket.class);
+
+ protected boolean initialized;
+
+ /**
+ * This buffer is only used to read the length of the incoming message.
+ */
+ protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
+
+ /**
+ * After the length is read, a new incomingBuffer is allocated in
+ * readLength() to receive the full message.
+ */
+ protected ByteBuffer incomingBuffer = lenBuffer;
+ protected long sentCount = 0;
+ protected long recvCount = 0;
+ protected long lastHeard;
+ protected long lastSend;
+ protected long now;
+ protected ClientCnxn.SendThread sendThread;
+
+ protected long sessionId;
+
+ void introduce(ClientCnxn.SendThread sendThread, long sessionId) {
+ this.sendThread = sendThread;
+ this.sessionId = sessionId;
+ }
+
+ void updateNow() {
+ now = System.currentTimeMillis();
+ }
+
+ int getIdleRecv() {
+ return (int) (now - lastHeard);
+ }
+
+ int getIdleSend() {
+ return (int) (now - lastSend);
+ }
+
+ long getSentCount() {
+ return sentCount;
+ }
+
+ long getRecvCount() {
+ return recvCount;
+ }
+
+ void updateLastHeard() {
+ this.lastHeard = now;
+ }
+
+ void updateLastSend() {
+ this.lastSend = now;
+ }
+
+ void updateLastSendAndHeard() {
+ this.lastSend = now;
+ this.lastHeard = now;
+ }
+
+ protected void readLength() throws IOException {
+ int len = incomingBuffer.getInt();
+ if (len < 0 || len >= ClientCnxn.packetLen) {
+ throw new IOException("Packet len" + len + " is out of range!");
+ }
+ incomingBuffer = ByteBuffer.allocate(len);
+ }
+
+ void readConnectResult() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ StringBuffer buf = new StringBuffer("0x[");
+ for (byte b : incomingBuffer.array()) {
+ buf.append(Integer.toHexString(b) + ",");
+ }
+ buf.append("]");
+ LOG.trace("readConnectRestult " + incomingBuffer.remaining() + " "
+ + buf.toString());
+ }
+ ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
+ BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
+ ConnectResponse conRsp = new ConnectResponse();
+ conRsp.deserialize(bbia, "connect");
+ this.sessionId = conRsp.getSessionId();
+ sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
+ conRsp.getPasswd());
+ }
+
+ abstract boolean isConnected();
+
+ abstract void connect(InetSocketAddress addr) throws IOException;
+
+ abstract SocketAddress getRemoteSocketAddress();
+
+ abstract SocketAddress getLocalSocketAddress();
+
+ abstract void cleanup();
+
+ abstract void close();
+
+ abstract void wakeupCnxn();
+
+ abstract void enableWrite();
+
+ abstract void enableReadWriteOnly();
+
+ abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
+ LinkedList<Packet> outgoingQueue) throws IOException,
+ InterruptedException;
+
+ abstract void testableCloseSocket() throws IOException;
+}
View
318 src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ClientCnxnSocketNIO extends ClientCnxnSocket {
+ private static final Logger LOG = Logger
+ .getLogger(ClientCnxnSocketNIO.class);
+
+ private final Selector selector = Selector.open();
+
+ private SelectionKey sockKey;
+
+ ClientCnxnSocketNIO() throws IOException {
+ super();
+ }
+
+ @Override
+ boolean isConnected() {
+ return sockKey != null;
+ }
+
+ /**
+ * @return true if a packet was received
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ boolean doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
+ boolean packetReceived = false;
+ SocketChannel sock = (SocketChannel) sockKey.channel();
+ if (sock == null) {
+ throw new IOException("Socket is null!");
+ }
+ if (sockKey.isReadable()) {
+ int rc = sock.read(incomingBuffer);
+ if (rc < 0) {
+ throw new EndOfStreamException(
+ "Unable to read additional data from server sessionid 0x"
+ + Long.toHexString(sessionId)
+ + ", likely server has closed socket");
+ }
+ if (!incomingBuffer.hasRemaining()) {
+ incomingBuffer.flip();
+ if (incomingBuffer == lenBuffer) {
+ recvCount++;
+ readLength();
+ } else if (!initialized) {
+ readConnectResult();
+ enableRead();
+ if (!outgoingQueue.isEmpty()) {
+ enableWrite();
+ }
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ packetReceived = true;
+ initialized = true;
+ } else {
+ sendThread.readResponse(incomingBuffer);
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ packetReceived = true;
+ }
+ }
+ }
+ if (sockKey.isWritable()) {
+ synchronized (outgoingQueue) {
+ if (!outgoingQueue.isEmpty()) {
+ ByteBuffer pbb = outgoingQueue.getFirst().bb;
+ sock.write(pbb);
+ if (!pbb.hasRemaining()) {
+ sentCount++;
+ Packet p = outgoingQueue.removeFirst();
+ if (p.header != null
+ && p.header.getType() != OpCode.ping
+ && p.header.getType() != OpCode.auth) {
+ pendingQueue.add(p);
+ }
+ }
+ }
+ }
+ }
+ if (outgoingQueue.isEmpty()) {
+ disableWrite();
+ } else {
+ enableWrite();
+ }
+ return packetReceived;
+ }
+
+ @Override
+ void cleanup() {
+ if (sockKey != null) {
+ SocketChannel sock = (SocketChannel) sockKey.channel();
+ sockKey.cancel();
+ try {
+ sock.socket().shutdownInput();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during shutdown input", e);
+ }
+ }
+ try {
+ sock.socket().shutdownOutput();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during shutdown output",
+ e);
+ }
+ }
+ try {
+ sock.socket().close();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during socket close", e);
+ }
+ }
+ try {
+ sock.close();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during channel close", e);
+ }
+ }
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SendThread interrupted during sleep, ignoring");
+ }
+ }
+ sockKey = null;
+ }
+
+ @Override
+ void close() {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Doing client selector close");
+ }
+ selector.close();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closed client selector");
+ }
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during selector close", e);
+ }
+ }
+
+ @Override
+ void connect(InetSocketAddress addr) throws IOException {
+ SocketChannel sock;
+ sock = SocketChannel.open();
+ sock.configureBlocking(false);
+ sock.socket().setSoLinger(false, -1);
+ sock.socket().setTcpNoDelay(true);
+ sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
+ if (sock.connect(addr)) {
+ sendThread.primeConnection();
+ }
+ initialized = false;
+
+ /*
+ * Reset incomingBuffer
+ */
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ }
+
+ /**
+ * Returns the address to which the socket is connected.
+ *
+ * @return ip address of the remote side of the connection or null if not
+ * connected
+ */
+ @Override
+ SocketAddress getRemoteSocketAddress() {
+ // a lot could go wrong here, so rather than put in a bunch of code
+ // to check for nulls all down the chain let's do it the simple
+ // yet bulletproof way
+ try {
+ return ((SocketChannel) sockKey.channel()).socket()
+ .getRemoteSocketAddress();
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Returns the local address to which the socket is bound.
+ *
+ * @return ip address of the remote side of the connection or null if not
+ * connected
+ */
+ @Override
+ SocketAddress getLocalSocketAddress() {
+ // a lot could go wrong here, so rather than put in a bunch of code
+ // to check for nulls all down the chain let's do it the simple
+ // yet bulletproof way
+ try {
+ return ((SocketChannel) sockKey.channel()).socket()
+ .getLocalSocketAddress();
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ @Override
+ synchronized void wakeupCnxn() {
+ selector.wakeup();
+ }
+
+ @Override
+ void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue )
+ throws IOException, InterruptedException {
+ selector.select(waitTimeOut);
+ Set<SelectionKey> selected;
+ synchronized (this) {
+ selected = selector.selectedKeys();
+ }
+ // Everything below and until we get back to the select is
+ // non blocking, so time is effectively a constant. That is
+ // Why we just have to do this once, here
+ updateNow();
+ for (SelectionKey k : selected) {
+ SocketChannel sc = ((SocketChannel) k.channel());
+ if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
+ if (sc.finishConnect()) {
+ updateLastSendAndHeard();
+ sendThread.primeConnection();
+ }
+ } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+ if (outgoingQueue.size() > 0) {
+ // We have something to send so it's the same
+ // as if we do the send now.
+ updateLastSend();
+ }
+ if (doIO(pendingQueue, outgoingQueue)) {
+ updateLastHeard();
+ }
+ }
+ }
+ if (sendThread.getZkState() == States.CONNECTED) {
+ if (outgoingQueue.size() > 0) {
+ enableWrite();
+ } else {
+ disableWrite();
+ }
+ }
+ selected.clear();
+ }
+
+ //TODO should this be synchronized?
+ @Override
+ void testableCloseSocket() throws IOException {
+ LOG.info("testableCloseSocket() called");
+ ((SocketChannel) sockKey.channel()).socket().close();
+ }
+
+ @Override
+ synchronized void enableWrite() {
+ int i = sockKey.interestOps();
+ if ((i & SelectionKey.OP_WRITE) == 0) {
+ sockKey.interestOps(i | SelectionKey.OP_WRITE);
+ }
+ }
+
+ private synchronized void disableWrite() {
+ int i = sockKey.interestOps();
+ if ((i & SelectionKey.OP_WRITE) != 0) {
+ sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
+ }
+ }
+
+ synchronized private void enableRead() {
+ int i = sockKey.interestOps();
+ if ((i & SelectionKey.OP_READ) == 0) {
+ sockKey.interestOps(i | SelectionKey.OP_READ);
+ }
+ }
+
+ @Override
+ synchronized void enableReadWriteOnly() {
+ sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+}
View
36 src/java/main/org/apache/zookeeper/ZooKeeper.java
@@ -105,6 +105,7 @@
*/
public class ZooKeeper {
private static final Logger LOG;
+ public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
static {
LOG = Logger.getLogger(ZooKeeper.class);
@@ -154,6 +155,7 @@ final private void addTo(Set<Watcher> from, Set<Watcher> to) {
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
*/
+ @Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
@@ -322,8 +324,6 @@ public boolean isAlive() {
}
}
- volatile States state;
-
protected final ClientCnxn cnxn;
/**
@@ -376,7 +376,8 @@ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
- cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager);
+ cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+ watchManager, getClientCnxnSocket());
cnxn.start();
}
@@ -443,8 +444,8 @@ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+ (sessionPasswd == null ? "<null>" : "<hidden>"));
watchManager.defaultWatcher = watcher;
- cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager,
- sessionId, sessionPasswd);
+ cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+ watchManager, getClientCnxnSocket(), sessionId, sessionPasswd);
cnxn.start();
}
@@ -518,7 +519,7 @@ public synchronized void register(Watcher watcher) {
* @throws InterruptedException
*/
public synchronized void close() throws InterruptedException {
- if (!state.isAlive()) {
+ if (!cnxn.getState().isAlive()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Close called on already closed client");
}
@@ -1557,7 +1558,7 @@ public void sync(final String path, VoidCallback cb, Object ctx){
}
public States getState() {
- return state;
+ return cnxn.getState();
}
/**
@@ -1617,7 +1618,7 @@ protected boolean testableWaitForShutdown(int wait)
* not connected
*/
protected SocketAddress testableRemoteSocketAddress() {
- return cnxn.getRemoteSocketAddress();
+ return cnxn.sendThread.getClientCnxnSocket().getRemoteSocketAddress();
}
/**
@@ -1630,6 +1631,23 @@ protected SocketAddress testableRemoteSocketAddress() {
* not connected
*/
protected SocketAddress testableLocalSocketAddress() {
- return cnxn.getLocalSocketAddress();
+ return cnxn.sendThread.getClientCnxnSocket().getLocalSocketAddress();
+ }
+
+ private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
+ String clientCnxnSocketName = System
+ .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ if (clientCnxnSocketName == null) {
+ clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
+ }
+ try {
+ return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
+ .newInstance();
+ } catch (Exception e) {
+ IOException ioe = new IOException("Couldn't instantiate "
+ + clientCnxnSocketName);
+ ioe.initCause(e);
+ throw ioe;
+ }
}
}
View
2  src/java/main/org/apache/zookeeper/ZooKeeperMain.java
@@ -653,7 +653,7 @@ protected boolean processZKCmd(MyCommandOptions co)
}
// Below commands all need a live connection
- if (zk == null || !zk.state.isAlive()) {
+ if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
View
4 src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -33,8 +33,8 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -61,7 +61,7 @@
* NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
* this class gets access to the member variables and methods.
*/
- @ChannelPipelineCoverage("all")
+ @Sharable
class CnxnChannelHandler extends SimpleChannelHandler {
@Override
View
2  src/java/test/org/apache/zookeeper/TestableZooKeeper.java
@@ -59,7 +59,7 @@ public void run() {
synchronized(cnxn) {
try {
try {
- ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+ cnxn.sendThread.testableCloseSocket();
} catch (IOException e) {
e.printStackTrace();
}
Please sign in to comment.
Something went wrong with that request. Please try again.