diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute index 796ea396755..d52b4afefe2 100644 --- a/zookeeper-jute/src/main/resources/zookeeper.jute +++ b/zookeeper-jute/src/main/resources/zookeeper.jute @@ -65,12 +65,14 @@ module org.apache.zookeeper.proto { int timeOut; long sessionId; buffer passwd; + boolean readOnly; } class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; + boolean readOnly; } class SetWatches { long relativeZxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 0c9799c7345..837c12d5a08 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -285,8 +285,6 @@ static class Packet { WatchRegistration watchRegistration; - public boolean readOnly; - WatchDeregistration watchDeregistration; /** Convenience ctor */ @@ -295,23 +293,12 @@ static class Packet { ReplyHeader replyHeader, Record request, Record response, - WatchRegistration watchRegistration) { - this(requestHeader, replyHeader, request, response, watchRegistration, false); - } - - Packet( - RequestHeader requestHeader, - ReplyHeader replyHeader, - Record request, - Record response, - WatchRegistration watchRegistration, - boolean readOnly) { - + WatchRegistration watchRegistration + ) { this.requestHeader = requestHeader; this.replyHeader = replyHeader; this.request = request; this.response = response; - this.readOnly = readOnly; this.watchRegistration = watchRegistration; } @@ -325,8 +312,6 @@ public void createBB() { } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); - // append "am-I-allowed-to-be-readonly" flag - boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } @@ -1008,7 +993,7 @@ void primeConnection() throws IOException { clientCnxnSocket.getRemoteSocketAddress()); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; - ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); + ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd, readOnly); // We add backwards since we are pushing into the front // Only send if there's a pending watch if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) { @@ -1088,7 +1073,7 @@ record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch null, null)); } - outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); + outgoingQueue.addFirst(new Packet(null, null, conReq, null, null)); clientCnxnSocket.connectionPrimed(); LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress()); } @@ -1406,12 +1391,6 @@ private void cleanup() { /** * Callback invoked by the ClientCnxnSocket once a connection has been * established. - * - * @param _negotiatedSessionTimeout - * @param _sessionId - * @param _sessionPasswd - * @param isRO - * @throws IOException */ void onConnected( int _negotiatedSessionTimeout, @@ -1629,7 +1608,7 @@ public void sendPacket(Record request, Record response, AsyncCallback cb, int op ReplyHeader r = new ReplyHeader(); r.setXid(xid); - Packet p = new Packet(h, r, request, response, null, false); + Packet p = new Packet(h, r, request, response, null); p.cb = cb; sendThread.sendPacket(p); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index 9b53107e3d7..35af4a2f121 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -31,6 +31,7 @@ import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.common.ZKConfig; +import org.apache.zookeeper.compat.ProtocolManager; import org.apache.zookeeper.proto.ConnectResponse; import org.apache.zookeeper.server.ByteBufferInputStream; import org.slf4j.Logger; @@ -48,6 +49,8 @@ abstract class ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class); + private final ProtocolManager protocolManager = new ProtocolManager(); + protected boolean initialized; /** @@ -131,27 +134,18 @@ void readConnectResult() throws IOException { } buf.append("]"); if (LOG.isTraceEnabled()) { - LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString()); + LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf); } } ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); - ConnectResponse conRsp = new ConnectResponse(); - conRsp.deserialize(bbia, "connect"); - - // read "is read-only" flag - boolean isRO = false; - try { - isRO = bbia.readBool("readOnly"); - } catch (IOException e) { - // this is ok -- just a packet from an old server which - // doesn't contain readOnly field + ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia); + if (protocolManager.isReadonlyAvailable()) { LOG.warn("Connected to an old server; r-o mode will be unavailable"); } - this.sessionId = conRsp.getSessionId(); - sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); + sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), conRsp.getReadOnly()); } abstract boolean isConnected(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java new file mode 100644 index 00000000000..5633b81c17d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.compat; + +import java.io.IOException; +import org.apache.jute.InputArchive; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; + +/** + * A manager for switching behaviours between difference wire protocol. + *
+ * Basically, wire protocol should be backward and forward compatible between minor versions.
+ * However, there are several cases that it's different due to Jute's limitations.
+ */
+public final class ProtocolManager {
+ private volatile Boolean isReadonlyAvailable = null;
+
+ public boolean isReadonlyAvailable() {
+ return isReadonlyAvailable != null && isReadonlyAvailable;
+ }
+
+ /**
+ * Deserializing {@link ConnectRequest} should be specially handled for request from client
+ * version before and including ZooKeeper 3.3 which doesn't understand readOnly field.
+ */
+ public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException {
+ if (isReadonlyAvailable != null) {
+ if (isReadonlyAvailable) {
+ return deserializeConnectRequestWithReadonly(inputArchive);
+ } else {
+ return deserializeConnectRequestWithoutReadonly(inputArchive);
+ }
+ }
+
+ final ConnectRequest request = deserializeConnectRequestWithoutReadonly(inputArchive);
+ try {
+ request.setReadOnly(inputArchive.readBool("readOnly"));
+ this.isReadonlyAvailable = true;
+ } catch (Exception e) {
+ request.setReadOnly(false); // old version doesn't have readonly concept
+ this.isReadonlyAvailable = false;
+ }
+ return request;
+ }
+
+ private ConnectRequest deserializeConnectRequestWithReadonly(InputArchive inputArchive) throws IOException {
+ final ConnectRequest request = new ConnectRequest();
+ request.deserialize(inputArchive, "connect");
+ return request;
+ }
+
+ private ConnectRequest deserializeConnectRequestWithoutReadonly(InputArchive inputArchive) throws IOException {
+ final ConnectRequest request = new ConnectRequest();
+ inputArchive.startRecord("connect");
+ request.setProtocolVersion(inputArchive.readInt("protocolVersion"));
+ request.setLastZxidSeen(inputArchive.readLong("lastZxidSeen"));
+ request.setTimeOut(inputArchive.readInt("timeOut"));
+ request.setSessionId(inputArchive.readLong("sessionId"));
+ request.setPasswd(inputArchive.readBuffer("passwd"));
+ inputArchive.endRecord("connect");
+ return request;
+ }
+
+ /**
+ * Deserializing {@link ConnectResponse} should be specially handled for response from server
+ * version before and including ZooKeeper 3.3 which doesn't understand readOnly field.
+ */
+ public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException {
+ if (isReadonlyAvailable != null) {
+ if (isReadonlyAvailable) {
+ return deserializeConnectResponseWithReadonly(inputArchive);
+ } else {
+ return deserializeConnectResponseWithoutReadonly(inputArchive);
+ }
+ }
+
+ final ConnectResponse response = deserializeConnectResponseWithoutReadonly(inputArchive);
+ try {
+ response.setReadOnly(inputArchive.readBool("readOnly"));
+ this.isReadonlyAvailable = true;
+ } catch (Exception e) {
+ response.setReadOnly(false); // old version doesn't have readonly concept
+ this.isReadonlyAvailable = false;
+ }
+ return response;
+ }
+
+ private ConnectResponse deserializeConnectResponseWithReadonly(InputArchive inputArchive) throws IOException {
+ final ConnectResponse response = new ConnectResponse();
+ response.deserialize(inputArchive, "connect");
+ return response;
+ }
+
+ private ConnectResponse deserializeConnectResponseWithoutReadonly(InputArchive inputArchive) throws IOException {
+ final ConnectResponse response = new ConnectResponse();
+ inputArchive.startRecord("connect");
+ response.setProtocolVersion(inputArchive.readInt("protocolVersion"));
+ response.setTimeOut(inputArchive.readInt("timeOut"));
+ response.setSessionId(inputArchive.readLong("sessionId"));
+ response.setPasswd(inputArchive.readBuffer("passwd"));
+ inputArchive.endRecord("connect");
+ return response;
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index b5b2645838d..661c2aa2f09 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -41,6 +41,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.compat.ProtocolManager;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.Counter;
@@ -60,16 +61,9 @@ public abstract class ServerCnxn implements Stats, Watcher {
public static final Object me = new Object();
private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class);
- private Set