Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryThread;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
Expand Down Expand Up @@ -96,6 +99,10 @@ public class ZkClient implements Watcher {
private PathBasedZkSerializer _pathBasedZkSerializer;
private ZkClientMonitor _monitor;

// To automatically retry the async operation, we need a separate thread other than the
// ZkEventThread. Otherwise the retry request might block the normal event processing.
protected final ZkAsyncRetryThread _asyncCallRetryThread;

private class IZkDataListenerEntry {
final IZkDataListener _dataListener;
final boolean _prefetchData;
Expand Down Expand Up @@ -183,6 +190,9 @@ protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long opera
_operationRetryTimeoutInMillis = operationRetryTimeout;
_isNewSessionEventFired = false;

_asyncCallRetryThread = new ZkAsyncRetryThread(zkConnection.getServers());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should give name of this thread that can be tied to the ZkEvent thread name. This way, when we debug it, we know the relation. Otherwise it would be very hard to correlate and reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me do it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I was confused by myself. Name already given in this PR.
"ZkClient-AsyncCallback-Retry-" + getId() + "-" + name.

_asyncCallRetryThread.start();

connect(connectionTimeout, this);

// initiate monitor
Expand Down Expand Up @@ -1736,15 +1746,23 @@ public void asyncCreate(final String path, Object datat, final CreateMode mode,
data = (datat == null ? null : serialize(datat, path));
} catch (ZkMarshallingError e) {
cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
return;
}
doAsyncCreate(path, data, mode, startT, cb);
}

private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb) {
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper()
.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
// Arrays.asList(DEFAULT_ACL),
mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
data == null ? 0 : data.length, false));
.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) {
@Override
protected void doRetry() {
doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb);
}
});
return null;
});
}
Expand All @@ -1758,50 +1776,66 @@ public void asyncSetData(final String path, Object datat, final int version,
data = serialize(datat, path);
} catch (ZkMarshallingError e) {
cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
return;
}
doAsyncSetData(path, data, version, startT, cb);
}

private void doAsyncSetData(final String path, byte[] data, final int version, final long startT,
final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
data == null ? 0 : data.length, false));
new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
data == null ? 0 : data.length, false) {
@Override
protected void doRetry() {
doAsyncSetData(path, data, version, System.currentTimeMillis(), cb);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this recursively self calling OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the design. If no connectivity issue, it won't be triggered. The assumption here is that the connectivity issue is transient and won't happen continuously.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite familiar with the way you use. I just wonder would that cause infinite call then stack overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I say it won't, I guess you won't believe it so easily. Please take a look at the code carefully. In general, it is not a recursive call, it is a callback triggered in a different thread after this method is done. Even we keep retrying, only one call exists in the stack.

}
});
return null;
});
}

public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb) {
final long startT = System.currentTimeMillis();
retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
return null;
}
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
@Override
protected void doRetry() {
asyncGetData(path, cb);
}
});
return null;
});
}

public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb) {
final long startT = System.currentTimeMillis();
retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
return null;
}
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
@Override
protected void doRetry() {
asyncExists(path, cb);
}
});
return null;
});
}

public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb) {
final long startT = System.currentTimeMillis();
retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
return null;
}
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) {
@Override
protected void doRetry() {
asyncDelete(path, cb);
}
});
return null;
});
}

Expand Down Expand Up @@ -1955,6 +1989,10 @@ public void close() throws ZkInterruptedException {
return;
}
setShutdownTrigger(true);
if (_asyncCallRetryThread != null) {
_asyncCallRetryThread.interrupt();
_asyncCallRetryThread.join(2000);
}
_eventThread.interrupt();
_eventThread.join(2000);
if (isManagingZkConnection()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.apache.helix.zookeeper.zkclient.callback;

public interface CancellableZkAsyncCallback {
/**
* Notify all the callers that are waiting for the callback to cancel the wait.
*/
void notifyCallers();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.helix.zookeeper.zkclient.callback;

import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;

public class ZkAsyncCallMonitorContext {
private final long _startTimeMilliSec;
private final ZkClientMonitor _monitor;
private final boolean _isRead;
private int _bytes;

/**
* @param monitor ZkClient monitor for update the operation result.
* @param startTimeMilliSec Operation initialization time.
* @param bytes The data size in bytes that is involved in the operation.
* @param isRead True if the operation is readonly.
*/
public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes,
boolean isRead) {
_monitor = monitor;
_startTimeMilliSec = startTimeMilliSec;
_bytes = bytes;
_isRead = isRead;
}

/**
* Update the operated data size in bytes.
* @param bytes
*/
void setBytes(int bytes) {
_bytes = bytes;
}

/**
* Record the operation result into the specified ZkClient monitor.
* @param path
*/
void recordAccess(String path) {
if (_monitor != null) {
if (_isRead) {
_monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ);
} else {
_monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE);
}
}
}
}
Loading