Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum MapKey {
private String _pathChanged;
private String _eventName;
private long _creationTime;
private boolean _isChildChange;

/**
* Get the name associated with the event
Expand Down Expand Up @@ -227,4 +228,12 @@ public HelixConstants.ChangeType getChangeType() {
public void setChangeType(HelixConstants.ChangeType changeType) {
this._changeType = changeType;
}

public boolean getIsChildChange() {
return _isChildChange;
}

public void setIsChildChange(boolean isChildChange) {
this._isChildChange = isChildChange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
}

// processor to handle async zk event resubscription.
private static DedupEventProcessor SubscribeChangeEventProcessor;

private final String _path;
private final Object _listener;
Expand All @@ -142,50 +140,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
// indicated whether this CallbackHandler is ready to serve event callback from ZkClient.
private boolean _ready = false;

static {
SubscribeChangeEventProcessor = new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>(
"Singleton", "CallbackHandler-AsycSubscribe") {
@Override
protected void handleEvent(SubscribeChangeEvent event) {
logger.info("CallbackHandler {}, resubscribe change listener to path: {}, for listener: {}, watchChild: {}",
event.handler._uid, event.path, event.listener, event.watchChild);
try {
if (event.handler.isReady()) {
event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
} else {
logger.info("CallbackHandler is not ready, stop subscribing changes listener to "
+ "path: {} for listener: {} watchChild: {}", event.path, event.listener,
event.listener);
}
} catch (Exception e) {
logger.error("Failed to resubscribe change to path: {} for listener: {}", event.path,
event.listener, e);
}
}
};

SubscribeChangeEventProcessor.start();
}

class SubscribeChangeEvent {
final CallbackHandler handler;
final String path;
final NotificationContext.Type callbackType;
final Object listener;
final boolean watchChild;

SubscribeChangeEvent(CallbackHandler handler, NotificationContext.Type callbackType,
String path, boolean watchChild, Object listener) {
this.handler = handler;
this.path = path;
this.callbackType = callbackType;
this.listener = listener;
this.watchChild = watchChild;
}
}

class CallbackProcessor
extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
class CallbackProcessor extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
private CallbackHandler _handler;

public CallbackProcessor(CallbackHandler handler) {
Expand Down Expand Up @@ -402,13 +357,9 @@ public void invoke(NotificationContext changeContext) throws Exception {
}
_expectTypes = nextNotificationType.get(type);

if (type == Type.INIT || type == Type.FINALIZE) {
if (type == Type.INIT || type == Type.FINALIZE || changeContext.getIsChildChange()) {
subscribeForChanges(changeContext.getType(), _path, _watchChild);
} else {
// put SubscribeForChange run in async thread to reduce the latency of zk callback handling.
subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we try to get rid of async subscribe in this PR and do all subscription synchronously? I thought we only remove the extra one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR. The duplicated subscribeForChanges in HandleChildChange and do resubscribe for child change in line 361. For data changes, this async subscribe is also duplicated since the path is already resubscribed in zkClient.

}

if (_changeType == IDEAL_STATE) {
IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
List<IdealState> idealStates = preFetch(_propertyKey);
Expand Down Expand Up @@ -598,14 +549,6 @@ private void subscribeDataChange(String path, NotificationContext.Type callbackT
}
}

/** Subscribe Changes in asynchronously */
private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path,
boolean watchChild) {
SubscribeChangeEvent subscribeEvent =
new SubscribeChangeEvent(this, callbackType, path, watchChild, _listener);
SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, subscribeEvent);
}

private void subscribeForChanges(NotificationContext.Type callbackType, String path,
boolean watchChild) {
logger.info("CallbackHandler {} subscribing changes listener to path: {}, callback type: {}, "
Expand Down Expand Up @@ -734,6 +677,7 @@ public void handleDataChange(String dataPath, Object data) {
changeContext.setType(NotificationContext.Type.CALLBACK);
changeContext.setPathChanged(dataPath);
changeContext.setChangeType(_changeType);
changeContext.setIsChildChange(false);
enqueueTask(changeContext);
}
} catch (Exception e) {
Expand Down Expand Up @@ -796,7 +740,7 @@ public void handleChildChange(String parentPath, List<String> currentChilds) {
changeContext.setType(NotificationContext.Type.CALLBACK);
changeContext.setPathChanged(parentPath);
changeContext.setChangeType(_changeType);
subscribeForChanges(changeContext.getType(), _path, _watchChild);
changeContext.setIsChildChange(true);
enqueueTask(changeContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@

import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
Expand All @@ -40,12 +37,12 @@
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.manager.ZkTestManager;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.CurrentState;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.slf4j.Logger;
Expand Down Expand Up @@ -467,23 +464,32 @@ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
cs.setSessionId(jobSessionId);
cs.setStateModelDefRef(db0.getStateModelDefRef());

Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
Assert.assertFalse(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));

LOG.info("add job");
boolean rtJob = false;
for (int i = 0; i < mJobUpdateCnt; i++) {
rtJob = jobAccesor.setProperty(jobKey, cs);
jobAccesor.setProperty(jobKey, cs);
}

// verify new watcher is installed on the new node
boolean result = TestHelper.verify(() -> {
return ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath());
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get initial clusterConfig callback invoked");
rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));

LOG.info("remove job");
rtJob = jobParticipant.getZkClient().delete(jobKey.getPath());
jobParticipant.getZkClient().delete(jobKey.getPath());

// validate the job watch is not leaked.
Thread.sleep(5000);

Map<String, Set<String>> listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR);
boolean jobKeyExists = listenersByZkPath.keySet().contains(jobKey.getPath());
Assert.assertFalse(jobKeyExists);
Assert.assertFalse(listenersByZkPath.keySet().contains(jobKey.getPath()));

Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
List<String> existWatches = rpWatchPaths.get("existWatches");
Assert.assertTrue(existWatches.isEmpty());

Expand Down