diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java index 4195d946c5..b35968b94f 100644 --- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java +++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java @@ -43,6 +43,7 @@ public enum MapKey { private String _pathChanged; private String _eventName; private long _creationTime; + private boolean _isChildChange; /** * Get the name associated with the event @@ -227,4 +228,12 @@ public HelixConstants.ChangeType getChangeType() { public void setChangeType(HelixConstants.ChangeType changeType) { this._changeType = changeType; } + + public boolean getIsChildChange() { + return _isChildChange; + } + + public void setIsChildChange(boolean isChildChange) { + this._isChildChange = isChildChange; + } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index dece3de0f4..a57a678182 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -118,8 +118,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT)); } - // processor to handle async zk event resubscription. - private static DedupEventProcessor SubscribeChangeEventProcessor; private final String _path; private final Object _listener; @@ -142,50 +140,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { // indicated whether this CallbackHandler is ready to serve event callback from ZkClient. private boolean _ready = false; - static { - SubscribeChangeEventProcessor = new DedupEventProcessor( - "Singleton", "CallbackHandler-AsycSubscribe") { - @Override - protected void handleEvent(SubscribeChangeEvent event) { - logger.info("CallbackHandler {}, resubscribe change listener to path: {}, for listener: {}, watchChild: {}", - event.handler._uid, event.path, event.listener, event.watchChild); - try { - if (event.handler.isReady()) { - event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild); - } else { - logger.info("CallbackHandler is not ready, stop subscribing changes listener to " - + "path: {} for listener: {} watchChild: {}", event.path, event.listener, - event.listener); - } - } catch (Exception e) { - logger.error("Failed to resubscribe change to path: {} for listener: {}", event.path, - event.listener, e); - } - } - }; - - SubscribeChangeEventProcessor.start(); - } - - class SubscribeChangeEvent { - final CallbackHandler handler; - final String path; - final NotificationContext.Type callbackType; - final Object listener; - final boolean watchChild; - - SubscribeChangeEvent(CallbackHandler handler, NotificationContext.Type callbackType, - String path, boolean watchChild, Object listener) { - this.handler = handler; - this.path = path; - this.callbackType = callbackType; - this.listener = listener; - this.watchChild = watchChild; - } - } - - class CallbackProcessor - extends DedupEventProcessor { + class CallbackProcessor extends DedupEventProcessor { private CallbackHandler _handler; public CallbackProcessor(CallbackHandler handler) { @@ -402,13 +357,9 @@ public void invoke(NotificationContext changeContext) throws Exception { } _expectTypes = nextNotificationType.get(type); - if (type == Type.INIT || type == Type.FINALIZE) { + if (type == Type.INIT || type == Type.FINALIZE || changeContext.getIsChildChange()) { subscribeForChanges(changeContext.getType(), _path, _watchChild); - } else { - // put SubscribeForChange run in async thread to reduce the latency of zk callback handling. - subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild); } - if (_changeType == IDEAL_STATE) { IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener; List idealStates = preFetch(_propertyKey); @@ -598,14 +549,6 @@ private void subscribeDataChange(String path, NotificationContext.Type callbackT } } - /** Subscribe Changes in asynchronously */ - private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path, - boolean watchChild) { - SubscribeChangeEvent subscribeEvent = - new SubscribeChangeEvent(this, callbackType, path, watchChild, _listener); - SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, subscribeEvent); - } - private void subscribeForChanges(NotificationContext.Type callbackType, String path, boolean watchChild) { logger.info("CallbackHandler {} subscribing changes listener to path: {}, callback type: {}, " @@ -734,6 +677,7 @@ public void handleDataChange(String dataPath, Object data) { changeContext.setType(NotificationContext.Type.CALLBACK); changeContext.setPathChanged(dataPath); changeContext.setChangeType(_changeType); + changeContext.setIsChildChange(false); enqueueTask(changeContext); } } catch (Exception e) { @@ -796,7 +740,7 @@ public void handleChildChange(String parentPath, List currentChilds) { changeContext.setType(NotificationContext.Type.CALLBACK); changeContext.setPathChanged(parentPath); changeContext.setChangeType(_changeType); - subscribeForChanges(changeContext.getType(), _path, _watchChild); + changeContext.setIsChildChange(true); enqueueTask(changeContext); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java index 60dbf914af..2f27d4b360 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java @@ -26,9 +26,6 @@ import org.apache.helix.CurrentStateChangeListener; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyType; @@ -40,12 +37,12 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.integration.manager.ZkTestManager; import org.apache.helix.manager.zk.CallbackHandler; -import org.apache.helix.spectator.RoutingTableProvider; -import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.model.CurrentState; +import org.apache.helix.spectator.RoutingTableProvider; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.slf4j.Logger; @@ -467,23 +464,32 @@ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception { cs.setSessionId(jobSessionId); cs.setStateModelDefRef(db0.getStateModelDefRef()); + Map> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); + Assert.assertFalse(rpWatchPaths.get("dataWatches").contains(jobKey.getPath())); + LOG.info("add job"); - boolean rtJob = false; for (int i = 0; i < mJobUpdateCnt; i++) { - rtJob = jobAccesor.setProperty(jobKey, cs); + jobAccesor.setProperty(jobKey, cs); } + // verify new watcher is installed on the new node + boolean result = TestHelper.verify(() -> { + return ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath()); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Should get initial clusterConfig callback invoked"); + rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); + Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath())); + LOG.info("remove job"); - rtJob = jobParticipant.getZkClient().delete(jobKey.getPath()); + jobParticipant.getZkClient().delete(jobKey.getPath()); // validate the job watch is not leaked. Thread.sleep(5000); Map> listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR); - boolean jobKeyExists = listenersByZkPath.keySet().contains(jobKey.getPath()); - Assert.assertFalse(jobKeyExists); + Assert.assertFalse(listenersByZkPath.keySet().contains(jobKey.getPath())); - Map> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); + rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); List existWatches = rpWatchPaths.get("existWatches"); Assert.assertTrue(existWatches.isEmpty());