Skip to content

Commit

Permalink
switch some more watchers to reset-first
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanduxbury committed Apr 21, 2011
1 parent 2b050b2 commit ae756ac
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions src/java/com/rapleaf/hank/coordinator/zk/ZkHostConfig.java
Expand Up @@ -53,22 +53,20 @@ public class ZkHostConfig extends BaseZkConsumer implements HostConfig {
private static final String CURRENT_COMMAND_PATH_SEGMENT = "/current_command";

private class CommandQueueWatcher implements Watcher {
private final HostCommandQueueChangeListener listener;
private boolean cancelled = false;

public CommandQueueWatcher(HostCommandQueueChangeListener listener) throws KeeperException, InterruptedException {
this.listener = listener;
public CommandQueueWatcher() throws KeeperException, InterruptedException {
setWatch();
}

public void process(WatchedEvent event) {
LOG.trace(event);
synchronized (this) {
// connect/disconnect message
if (event.getType() == EventType.None) {
if (cancelled || event.getType() == EventType.None) {
return;
}
// reset watch immediately
try {
// reset callback
setWatch();
} catch (Exception e) {
LOG.error("Failed to reset watch!", e);
Expand All @@ -78,39 +76,49 @@ public void process(WatchedEvent event) {
case NodeDeleted:
case NodeDataChanged:
case NodeChildrenChanged:
listener.onCommandQueueChange(ZkHostConfig.this);
for (HostCommandQueueChangeListener listener : commandQueueListeners) {
listener.onCommandQueueChange(ZkHostConfig.this);
}
}
}
}

private void setWatch() throws KeeperException, InterruptedException {
zk.getChildren(hostPath + COMMAND_QUEUE_PATH_SEGMENT, this);
}

public void cancel() {
cancelled = true;
}
}

private class StateChangeWatcher implements Watcher {
private boolean cancelled = false;

public StateChangeWatcher() throws KeeperException, InterruptedException {
setWatch();
}

public void process(WatchedEvent event) {
if (event.getState() != KeeperState.SyncConnected) {
LOG.trace("Apparent disconnection! Not triggering listeners and not reregistering watch.");
return;
}
if (!cancelled && zk.getState() == States.CONNECTED) {
// reset callback
try {
setWatch();
} catch (Exception e) {
LOG.error("Failed to reset watch!", e);
}
}
switch (event.getType()) {
case NodeCreated:
case NodeDeleted:
case NodeDataChanged:
for (HostStateChangeListener listener : stateListeners) {
listener.onHostStateChange(ZkHostConfig.this);
}
if (!cancelled && zk.getState() == States.CONNECTED) {
// reset callback
try {
setWatch();
} catch (Exception e) {
LOG.error("Failed to reset watch!", e);
}
}
}
}

Expand All @@ -130,8 +138,10 @@ public void cancel() {
private final PartDaemonAddress address;

private final Set<HostStateChangeListener> stateListeners = new HashSet<HostStateChangeListener>();
private final StateChangeWatcher stateChangeWatcher;

private StateChangeWatcher stateChangeWatcher;
private final Set<HostCommandQueueChangeListener> commandQueueListeners = new HashSet<HostCommandQueueChangeListener>();
private final CommandQueueWatcher commandQueueWatcher;

public ZkHostConfig(ZooKeeper zk, String hostPath) throws KeeperException, InterruptedException {
super(zk);
Expand All @@ -143,6 +153,7 @@ public ZkHostConfig(ZooKeeper zk, String hostPath) throws KeeperException, Inter

stateChangeWatcher = new StateChangeWatcher();
stateChangeWatcher.setWatch();
commandQueueWatcher = new CommandQueueWatcher();
}

@Override
Expand Down Expand Up @@ -347,12 +358,9 @@ public HostCommand processNextCommand() throws IOException {
}

@Override
public void setCommandQueueChangeListener(HostCommandQueueChangeListener listener)
throws IOException {
try {
new CommandQueueWatcher(listener);
} catch (Exception e) {
throw new IOException(e);
public void setCommandQueueChangeListener(HostCommandQueueChangeListener listener) {
synchronized (commandQueueListeners) {
commandQueueListeners.add(listener);
}
}

Expand Down Expand Up @@ -381,5 +389,6 @@ public void cancelStateChangeListener(HostStateChangeListener listener) {

public void close() {
stateChangeWatcher.cancel();
commandQueueWatcher.cancel();
}
}

0 comments on commit ae756ac

Please sign in to comment.