Skip to content

Commit

Permalink
ZOOKEEPER-442. need a way to remove watches that are no longer of int…
Browse files Browse the repository at this point in the history
…erest (Rakesh R, Daniel Gómez Ferro via phunt)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1560904 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
phunt committed Jan 24, 2014
1 parent da950d6 commit 98789ea
Show file tree
Hide file tree
Showing 15 changed files with 1,356 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -20,6 +20,9 @@ NEW FEATURES:

ZOOKEEPER-1691. Add a flag to disable standalone mode (Helen Hastings via michim)

ZOOKEEPER-442. need a way to remove watches that are no longer of
interest (Rakesh R, Daniel Gómez Ferro via phunt)

BUGFIXES:

ZOOKEEPER-786. Exception in ZooKeeper.toString
Expand Down
137 changes: 126 additions & 11 deletions src/java/main/org/apache/zookeeper/ClientCnxn.java
Expand Up @@ -28,10 +28,13 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;

Expand All @@ -50,6 +53,7 @@
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.OpResult.ErrorResult;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.EventType;
Expand Down Expand Up @@ -257,6 +261,8 @@ static class Packet {

public boolean readOnly;

WatchDeregistration watchDeregistration;

/** Convenience ctor */
Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
Expand Down Expand Up @@ -457,21 +463,35 @@ class EventThread extends Thread {
}

public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}

private void queueEvent(WatchedEvent event,
Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();

// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}

public void queueCallback(AsyncCallback cb, int rc, String path,
Object ctx) {
waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
}

public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
Expand Down Expand Up @@ -525,7 +545,31 @@ private void processEvent(Object event) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
} else if (event instanceof LocalCallback) {
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null);
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
Expand Down Expand Up @@ -646,8 +690,34 @@ private void processEvent(Object event) {
}

private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
LOG.error("Failed to find watcher!", nwe);
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
LOG.error("Exception when removing watcher", ke);
p.replyHeader.setErr(ke.code().intValue());
}
}

if (p.cb == null) {
Expand All @@ -661,6 +731,22 @@ private void finishPacket(Packet p) {
}
}

void queueEvent(String clientPath, int err,
Set<Watcher> materializedWatchers, EventType eventType) {
KeeperState sessionState = KeeperState.SyncConnected;
if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
|| KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
sessionState = Event.KeeperState.Disconnected;
}
WatchedEvent event = new WatchedEvent(eventType, sessionState,
clientPath);
eventThread.queueEvent(event, materializedWatchers);
}

void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
eventThread.queueCallback(cb, rc, path, ctx);
}

private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
Expand Down Expand Up @@ -1360,9 +1446,16 @@ synchronized public int getXid() {
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}

public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
null, watchRegistration, watchDeregistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
Expand Down Expand Up @@ -1394,8 +1487,15 @@ public void sendPacket(Record request, Record response, AsyncCallback cb, int op

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
String serverPath, Object ctx, WatchRegistration watchRegistration) {
return queuePacket(h, r, request, response, cb, clientPath, serverPath,
ctx, watchRegistration, null);
}

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;

// Note that we do not generate the Xid for the packet yet. It is
Expand All @@ -1407,6 +1507,7 @@ Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
Expand Down Expand Up @@ -1435,4 +1536,18 @@ public void addAuthInfo(String scheme, byte auth[]) {
States getState() {
return state;
}

private static class LocalCallback {
private final AsyncCallback cb;
private final int rc;
private final String path;
private final Object ctx;

public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) {
this.cb = cb;
this.rc = rc;
this.path = path;
this.ctx = ctx;
}
}
}
21 changes: 20 additions & 1 deletion src/java/main/org/apache/zookeeper/KeeperException.java
Expand Up @@ -137,6 +137,8 @@ public static KeeperException create(Code code) {
return new NotReadOnlyException();
case EPHEMERALONLOCALSESSION:
return new EphemeralOnLocalSessionException();
case NOWATCHER:
return new NoWatcherException();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code");
Expand Down Expand Up @@ -372,7 +374,9 @@ public static enum Code implements CodeDeprecated {
/** State-changing request is passed to read-only server */
NOTREADONLY (-119),
/** Attempt to create ephemeral node on a local session */
EPHEMERALONLOCALSESSION (EphemeralOnLocalSession);
EPHEMERALONLOCALSESSION (EphemeralOnLocalSession),
/** Attempts to remove a non-existing watcher */
NOWATCHER (-123);

private static final Map<Integer,Code> lookup
= new HashMap<Integer,Code>();
Expand Down Expand Up @@ -455,6 +459,8 @@ static String getCodeMessage(Code code) {
return "Not a read-only call";
case EPHEMERALONLOCALSESSION:
return "Ephemeral node on local session";
case NOWATCHER:
return "No such watcher";
default:
return "Unknown error " + code;
}
Expand Down Expand Up @@ -768,4 +774,17 @@ public UnimplementedException() {
super(Code.UNIMPLEMENTED);
}
}

/**
* @see Code#NOWATCHER
*/
public static class NoWatcherException extends KeeperException {
public NoWatcherException() {
super(Code.NOWATCHER);
}

public NoWatcherException(String path) {
super(Code.NOWATCHER, path);
}
}
}
70 changes: 70 additions & 0 deletions src/java/main/org/apache/zookeeper/WatchDeregistration.java
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import java.util.Map;
import java.util.Set;

import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper.ZKWatchManager;

/**
* Handles the special case of removing watches which has registered for a
* client path
*/
public class WatchDeregistration {

private final String clientPath;
private final Watcher watcher;
private final WatcherType watcherType;
private final boolean local;
private final ZKWatchManager zkManager;

public WatchDeregistration(String clientPath, Watcher watcher,
WatcherType watcherType, boolean local, ZKWatchManager zkManager) {
this.clientPath = clientPath;
this.watcher = watcher;
this.watcherType = watcherType;
this.local = local;
this.zkManager = zkManager;
}

/**
* Unregistering watcher that was added on path.
*
* @param rc
* the result code of the operation that attempted to remove
* watch on the path.
*/
public Map<EventType, Set<Watcher>> unregister(int rc)
throws KeeperException {
return zkManager.removeWatcher(clientPath, watcher, watcherType, local,
rc);
}

/**
* Returns client path which has specified for unregistering its watcher
*
* @return client path
*/
public String getClientPath() {
return clientPath;
}
}

0 comments on commit 98789ea

Please sign in to comment.