Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-XXXX: Merge readOnly field into Connect{Request|Response} #1832

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ module org.apache.zookeeper.proto {
int timeOut;
long sessionId;
buffer passwd;
boolean readOnly;
}
class ConnectResponse {
int protocolVersion;
int timeOut;
long sessionId;
buffer passwd;
boolean readOnly;
}
class SetWatches {
long relativeZxid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static class AuthData {
* server on the other side of the wire is partitioned it'll accept
* read-only clients only.
*/
private boolean readOnly;
private final boolean readOnly;

final String chrootPath;

Expand Down Expand Up @@ -285,33 +285,20 @@ static class Packet {

WatchRegistration watchRegistration;

public boolean readOnly;

WatchDeregistration watchDeregistration;

/** Convenience ctor */
Packet(
RequestHeader requestHeader,
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration) {
this(requestHeader, replyHeader, request, response, watchRegistration, false);
}

Packet(
RequestHeader requestHeader,
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration,
boolean readOnly) {
WatchRegistration watchRegistration
) {

this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
this.response = response;
this.readOnly = readOnly;
this.watchRegistration = watchRegistration;
}

Expand All @@ -325,8 +312,6 @@ public void createBB() {
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
Expand Down Expand Up @@ -1001,14 +986,14 @@ ClientCnxnSocket getClientCnxnSocket() {
/**
* Setup session, previous watches, authentication.
*/
void primeConnection() throws IOException {
void primeConnection() {
LOG.info(
"Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd, readOnly);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
Expand Down Expand Up @@ -1088,7 +1073,7 @@ record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch
null,
null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null));
clientCnxnSocket.connectionPrimed();
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
Expand Down Expand Up @@ -1404,12 +1389,6 @@ private void cleanup() {
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.
*
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
* @param isRO
* @throws IOException
*/
void onConnected(
int _negotiatedSessionTimeout,
Expand Down Expand Up @@ -1627,7 +1606,7 @@ public void sendPacket(Record request, Record response, AsyncCallback cb, int op
ReplyHeader r = new ReplyHeader();
r.setXid(xid);

Packet p = new Packet(h, r, request, response, null, false);
Packet p = new Packet(h, r, request, response, null);
p.cb = cb;
sendThread.sendPacket(p);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,8 @@ void readConnectResult() throws IOException {
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");

// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}

this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), conRsp.getReadOnly());
}

abstract boolean isConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,6 @@ public abstract class ServerCnxn implements Stats, Watcher {

private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());

/**
* If the client is of old version, we don't send r-o mode info to it.
* The reason is that if we would, old C client doesn't read it, which
* results in TCP RST packet, i.e. "connection reset by peer".
*/
boolean isOldClient = true;
Copy link
Member Author

@tisonkun tisonkun Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This workaround is introduced at ZOOKEEPER-784. And after 3.4.0 both the server & the client handle readOnly field.

However, as jute's serialize & deserialize implemented atomically (no optional field I think?), this PR should logically break client version less than 3.4 as documented here. I'll give it a test and wonder what the version policy and compatibility promise we should keep.

cc @eolivelli @phunt

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirm work start from 3.4.0, not for 3.3.6 and before.


AtomicLong outstandingCount = new AtomicLong();

/** The ZooKeeperServer for this connection. May be null if the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,14 +1062,12 @@ public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
valid ? cnxn.getSessionTimeout() : 0,
valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
this instanceof ReadOnlyZooKeeperServer);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
Expand Down Expand Up @@ -1398,18 +1396,7 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)

ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);

boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
// this is ok -- just a packet from an old client which
// doesn't contain readOnly field
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ public MockPacket(
super(requestHeader, replyHeader, request, response, watchRegistration);
}

public MockPacket(
RequestHeader requestHeader,
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration,
boolean readOnly) {
super(requestHeader, replyHeader, request, response, watchRegistration, readOnly);
}

public ByteBuffer createAndReturnBB() {
createBB();
return this.bb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ private ByteBuffer createConnRequest() {
Random r = new Random(SESSION_ID ^ superSecret);
byte[] p = new byte[16];
r.nextBytes(p);
ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p);
MockPacket packet = new MockPacket(null, null, conReq, null, null, false);
ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p, false);
MockPacket packet = new MockPacket(null, null, conReq, null, null);
return packet.createAndReturnBB();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,16 @@ public CnxnThread(int i) {
}

public void run() {
SocketChannel sChannel = null;
try {
try (SocketChannel sChannel = SocketChannel.open()) {
/*
* For future unwary socket programmers: although connect 'blocks' it
* does not require an accept on the server side to return. Therefore
* you can not assume that all the sockets are connected at the end of
* this for loop.
*/
sChannel = SocketChannel.open();
sChannel.connect(new InetSocketAddress(host, port));
// Construct a connection request
ConnectRequest conReq = new ConnectRequest(0, 0, 10000, 0, "password".getBytes());
ConnectRequest conReq = new ConnectRequest(0, 0, 10000, 0, "password".getBytes(), false);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len");
Expand Down Expand Up @@ -95,14 +93,6 @@ public void run() {
}
} catch (IOException io) {
// "Connection reset by peer"
} finally {
if (sChannel != null) {
try {
sChannel.close();
} catch (Exception e) {
// Do nothing
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
public class ReadOnlyModeTest extends ZKTestCase {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ReadOnlyModeTest.class);
private static int CONNECTION_TIMEOUT = QuorumBase.CONNECTION_TIMEOUT;
private QuorumUtil qu = new QuorumUtil(1);
private static final int CONNECTION_TIMEOUT = QuorumBase.CONNECTION_TIMEOUT;
private final QuorumUtil qu = new QuorumUtil(1);

@BeforeEach
public void setUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testCreateAfterCloseShouldFail() throws Exception {

// open a connection
boa.writeInt(44, "len");
ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16]);
ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16], false);
conReq.serialize(boa, "connect");

// close connection
Expand Down