Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-2251:Add Client side packet response timeout to avoid infinite wait. #119

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 66 additions & 12 deletions src/java/main/org/apache/zookeeper/ClientCnxn.java
Expand Up @@ -201,6 +201,11 @@ static class AuthData {
public ZooKeeperSaslClient zooKeeperSaslClient;

private final ZKClientConfig clientConfig;
/**
* If any request's response in not received in configured requestTimeout
* then it is assumed that the response packet is lost.
*/
private long requestTimeout;

public long getSessionId() {
return sessionId;
Expand Down Expand Up @@ -395,6 +400,7 @@ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeo
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
initRequestTimeout();
}

public void start() {
Expand Down Expand Up @@ -671,7 +677,8 @@ private void processEvent(Object event) {
}
}

private void finishPacket(Packet p) {
// @VisibleForTesting
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
Expand Down Expand Up @@ -1246,15 +1253,7 @@ public void run() {
}
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
cleanAndNotifyState();
}
}
}
Expand All @@ -1275,6 +1274,16 @@ public void run() {
+ Long.toHexString(getSessionId()));
}

private void cleanAndNotifyState() {
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}

private void pingRwServer() throws RWServerFoundException {
String result = null;
InetSocketAddress addr = hostProvider.next(0);
Expand Down Expand Up @@ -1506,13 +1515,40 @@ public ReplyHeader submitRequest(RequestHeader h, Record request,
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}

/**
* Wait for request completion with timeout.
*/
private void waitForPacketFinish(ReplyHeader r, Packet packet)
throws InterruptedException {
long waitStartTime = Time.currentElapsedTime();
while (!packet.finished) {
packet.wait(requestTimeout);
if (!packet.finished && ((Time.currentElapsedTime()
- waitStartTime) >= requestTimeout)) {
LOG.error("Timeout error occurred for the packet '{}'.",
packet);
r.setErr(Code.REQUESTTIMEOUT.intValue());
break;
}
}
}

public void saslCompleted() {
sendThread.getClientCnxnSocket().saslCompleted();
}
Expand Down Expand Up @@ -1603,4 +1639,22 @@ public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) {
this.ctx = ctx;
}
}

private void initRequestTimeout() {
try {
requestTimeout = clientConfig.getLong(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
LOG.info("{} value is {}. feature enabled=",
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
requestTimeout, requestTimeout > 0);
} catch (NumberFormatException e) {
LOG.error(
"Configured value {} for property {} can not be parsed to long.",
clientConfig.getProperty(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
throw e;
}
}
}
13 changes: 13 additions & 0 deletions src/java/main/org/apache/zookeeper/KeeperException.java
Expand Up @@ -144,6 +144,8 @@ public static KeeperException create(Code code) {
return new NoWatcherException();
case RECONFIGDISABLED:
return new ReconfigDisabledException();
case REQUESTTIMEOUT:
return new RequestTimeoutException();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code");
Expand Down Expand Up @@ -392,6 +394,8 @@ public static enum Code implements CodeDeprecated {
EPHEMERALONLOCALSESSION (EphemeralOnLocalSession),
/** Attempts to remove a non-existing watcher */
NOWATCHER (-121),
/** Request not completed within max allowed time.*/
REQUESTTIMEOUT (-122),
/** Attempts to perform a reconfiguration operation when reconfiguration feature is disabled. */
RECONFIGDISABLED(-123);

Expand Down Expand Up @@ -843,4 +847,13 @@ public ReconfigDisabledException(String path) {
super(Code.RECONFIGDISABLED, path);
}
}

/**
* @see Code#REQUESTTIMEOUT
*/
public static class RequestTimeoutException extends KeeperException {
public RequestTimeoutException() {
super(Code.REQUESTTIMEOUT);
}
}
}
11 changes: 10 additions & 1 deletion src/java/main/org/apache/zookeeper/ZooKeeper.java
Expand Up @@ -876,12 +876,21 @@ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
connectString);
hostProvider = aHostProvider;

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}

// @VisibleForTesting
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}

/**
* To create a ZooKeeper client object, the application needs to pass a
* connection string containing a comma separated list of host:port pairs,
Expand Down
37 changes: 37 additions & 0 deletions src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
Expand Up @@ -56,9 +56,15 @@ public class ZKClientConfig extends ZKConfig {
@SuppressWarnings("deprecation")
public static final String SECURE_CLIENT = ZooKeeper.SECURE_CLIENT;
public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 4096 * 1024; /* 4 MB */
public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
/**
* Feature is disabled by default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about enabling it by default with a default computed from desired session expire timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session timeout and this client request timeout are completely different. I think we should not mix these two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree, they are very different things. I mentioned it because configured session timeout is like a client side hint about how much time you can tolerate to be disconnected/not responsive, but it is better to keep it separate from this low level knob.
Anyway I wonder if it would be better to set a default timeout.
If there is a chance of infinite wait maybe it can be worth.
We can use a very large timeout, in the order of minutes.
Tipical session timeout is in the order of seconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to learn towards the conservative side to have this feature turned off by default, because I feel we don't have deep insight regarding what's the value we could use as default value (if we turn the feature on by default), and a premature setting of that value might impact existing users. For few of users who do have the problem they can tune the parameter based on their prod env.

Copy link
Contributor

@eolivelli eolivelli Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, no default

*/
public static final long ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT = 0;

public ZKClientConfig() {
super();
initFromJavaSystemProperties();
}

public ZKClientConfig(File configFile) throws ConfigException {
Expand All @@ -69,6 +75,15 @@ public ZKClientConfig(String configPath) throws ConfigException {
super(configPath);
}

/**
* Initialize all the ZooKeeper client properties which are configurable as
* java system property
*/
private void initFromJavaSystemProperties() {
setProperty(ZOOKEEPER_REQUEST_TIMEOUT,
System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
}

@Override
protected void handleBackwardCompatibility() {
/**
Expand Down Expand Up @@ -100,4 +115,26 @@ protected void handleBackwardCompatibility() {
public boolean isSaslClientEnabled() {
return Boolean.valueOf(getProperty(ENABLE_CLIENT_SASL_KEY, ENABLE_CLIENT_SASL_DEFAULT));
}

/**
* Get the value of the <code>key</code> property as an <code>long</code>.
* If property is not set, the provided <code>defaultValue</code> is
* returned
*
* @param key
* property key.
* @param defaultValue
* default value.
* @throws NumberFormatException
* when the value is invalid
* @return return property value as an <code>long</code>, or
* <code>defaultValue</code>
*/
public long getLong(String key, long defaultValue) {
String value = getProperty(key);
if (value != null) {
return Long.parseLong(value.trim());
}
return defaultValue;
}
}