Permalink
Browse files

Merge pull request #115 from dybdahl/master

Make TrackedCoordinate close after a node is gone, without needing to wait for external stop.
  • Loading branch information...
2 parents 52eb888 + c004898 commit 5c4b475784d3c38cf7e4e861641bc81e772d9bab @bushwakko bushwakko committed Dec 6, 2012
@@ -208,6 +208,22 @@ public void process(WatchedEvent watchedEvent) {
* Implements interface TrackedCoordinate.ExpressionResolverNotify
*/
@Override
+ public void nodeDead(final String path) {
+ synchronized (instanceLock) {
+ TrackedCoordinate trackedCoordinate = coordinateByPath.remove(path);
+ if (trackedCoordinate == null) {
+ return;
+ }
+ trackedCoordinate.stop();
+ // Triggers a new scan, and potential client updates.
+ scheduleRefresh("" /** scan for all nodes */, 50 /* ms*/);
+ }
+ }
+
+ /**
+ * Implements interface TrackedCoordinate.ExpressionResolverNotify
+ */
+ @Override
public void stateChanged(final String path) {
// Something happened to a path, schedule a refetch.
scheduleRefresh(path, 50);
@@ -23,7 +23,17 @@
* The client can implement this to get notified on changes.
*/
public interface ExpressionResolverNotify {
+ /**
+ * This is called when the state has changed, it might have become unavailable.
+ * @param statusPath path of the coordinate in zookeeper.
+ */
void stateChanged(final String statusPath);
+
+ /**
+ * This is called when node is deleted, or it can not be read.
+ * @param statusPath path of the coordinate in zookeeper.
+ */
+ void nodeDead(final String statusPath);
}
private ZkCoordinateData.Snapshot coordinateData = null;
@@ -155,7 +165,7 @@ public String toString() {
synchronized (coordinateDataMonitor) {
coordinateData = new ZkCoordinateData().snapshot();
}
- client.stateChanged(path);
+ client.nodeDead(path);
return;
case NodeDataChanged:
isSynchronizedWithZookeeper.set(false);
@@ -185,8 +195,14 @@ private void refreshCoordinateData() throws CloudnameException {
}
synchronized (coordinateDataMonitor) {
String oldDataSerialized = (null == coordinateData) ? "" : coordinateData.serialize();
- coordinateData = ZkCoordinateData.loadCoordinateData(
- path, zkClient.getZookeeper(), this).snapshot();
+ try {
+ coordinateData = ZkCoordinateData.loadCoordinateData(
+ path, zkClient.getZookeeper(), this).snapshot();
+ } catch (CloudnameException e) {
+ client.nodeDead(path);
+ LOG.log(Level.FINER, "Exception while reading path " + path, e);
+ return;
+ }
isSynchronizedWithZookeeper.set(true);
if (! oldDataSerialized.equals(coordinateData.toString())) {
client.stateChanged(path);

0 comments on commit 5c4b475

Please sign in to comment.