Skip to content

Commit

Permalink
ZOOKEEPER-4799: Refactor ACL check in 'addWatch' command
Browse files Browse the repository at this point in the history
As of today, it is impossible to diagnose which watch events are dropped
because of ACLs.  Let's centralize, systematize, and log the checks at
the 'process()' site in the Netty and NIO connections.

(These 'process()' methods contain some duplicated code, and should also
be refactored at some point.  This series does not change them.)

This patch also adds a substantial number of tests in order to avoid
unexpected regressions.

Co-authored-by: Patrick Hunt <phunt@apache.org>
Co-authored-by: Damien Diederen <ddiederen@apache.org>
  • Loading branch information
3 people committed Feb 12, 2024
1 parent 4a49d45 commit daf7cfd
Show file tree
Hide file tree
Showing 14 changed files with 763 additions and 35 deletions.
Expand Up @@ -191,7 +191,7 @@ void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID, null);
}
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void tearDown() {

// clear all the watches
for (String path : paths) {
watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID, null);
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ public void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID, null);
}
}
}
Expand Up @@ -445,7 +445,10 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
if (parent == null) {
throw new NoNodeException();
}
List<ACL> parentAcl;
synchronized (parent) {
parentAcl = getACL(parent);

// Add the ACL to ACL cache first, to avoid the ACL not being
// created race condition during fuzzy snapshot sync.
//
Expand Down Expand Up @@ -518,8 +521,9 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid, acl);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged, zxid, parentAcl);
}

/**
Expand Down Expand Up @@ -559,16 +563,20 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
if (node == null) {
throw new NoNodeException();
}
List<ACL> acl;
nodes.remove(path);
synchronized (node) {
acl = getACL(node);
aclCache.removeUsage(node.acl);
nodeDataSize.addAndGet(-getNodeSize(path, node.data));
}

// Synchronized to sync the containers and ttls change, probably
// only need to sync on containers and ttls, will update it in a
// separate patch.
List<ACL> parentAcl;
synchronized (parent) {
parentAcl = getACL(parent);
long owner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(owner);
if (ephemeralType == EphemeralType.CONTAINER) {
Expand Down Expand Up @@ -615,9 +623,10 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
"childWatches.triggerWatch " + parentName);
}

WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid, acl);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, acl, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
EventType.NodeChildrenChanged, zxid, parentAcl);
}

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
Expand All @@ -626,8 +635,10 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
if (n == null) {
throw new NoNodeException();
}
List<ACL> acl;
byte[] lastData;
synchronized (n) {
acl = getACL(n);
lastData = n.data;
nodes.preChange(path, n);
n.data = data;
Expand All @@ -649,7 +660,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData));

updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid, acl);
return s;
}

Expand Down
Expand Up @@ -22,8 +22,10 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.util.List;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;

Expand Down Expand Up @@ -51,7 +53,7 @@ void setSessionTimeout(int sessionTimeout) {
}

@Override
public void process(WatchedEvent event) {
public void process(WatchedEvent event, List<ACL> znodeAcl) {
mostRecentEventType = event.getType();
mostRecentZxid = event.getZxid();
mostRecentPath = event.getPath();
Expand Down
Expand Up @@ -30,14 +30,17 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
Expand Down Expand Up @@ -704,7 +707,18 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
* @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
*/
@Override
public void process(WatchedEvent event) {
public void process(WatchedEvent event, List<ACL> znodeAcl) {
try {
zkServer.checkACL(this, znodeAcl, ZooDefs.Perms.READ, getAuthInfo(), event.getPath(), null);
} catch (KeeperException.NoAuthException e) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Not delivering event " + event + " to 0x" + Long.toHexString(this.sessionId) + " (filtered by ACL)");
}
return;
}
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
Expand Down
Expand Up @@ -38,11 +38,15 @@
import java.nio.channels.SelectionKey;
import java.security.cert.Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
Expand Down Expand Up @@ -161,7 +165,18 @@ public int getSessionTimeout() {
}

@Override
public void process(WatchedEvent event) {
public void process(WatchedEvent event, List<ACL> znodeAcl) {
try {
zkServer.checkACL(this, znodeAcl, ZooDefs.Perms.READ, getAuthInfo(), event.getPath(), null);
} catch (KeeperException.NoAuthException e) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Not delivering event " + event + " to 0x" + Long.toHexString(this.sessionId) + " (filtered by ACL)");
}
return;
}
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
Expand Down
Expand Up @@ -39,9 +39,9 @@
import org.apache.jute.Record;
import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.compat.ProtocolManager;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.Counter;
Expand All @@ -54,7 +54,7 @@
* Interface to a Server connection - represents a connection from a client
* to the server.
*/
public abstract class ServerCnxn implements Stats, Watcher {
public abstract class ServerCnxn implements Stats, ServerWatcher {

// This is just an arbitrary object to represent requests issued by
// (aka owned by) this class
Expand Down Expand Up @@ -258,7 +258,11 @@ protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
/* notify the client the session is closing and close/cleanup socket */
public abstract void sendCloseSession();

public abstract void process(WatchedEvent event);
public void process(WatchedEvent event) {
process(event, null);
}

public abstract void process(WatchedEvent event, List<ACL> znodeAcl);

public abstract long getSessionId();

Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;

import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;

public interface ServerWatcher extends Watcher {

void process(WatchedEvent event, List<ACL> znodeAcl);

}
Expand Up @@ -19,9 +19,11 @@
package org.apache.zookeeper.server.watch;

import java.io.PrintWriter;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.ACL;

public interface IWatchManager {

Expand Down Expand Up @@ -114,10 +116,11 @@ default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherM
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
* @param acl ACL of the znode in path
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl);

/**
* Distribute the watch event for the given path, but ignore those
Expand All @@ -130,7 +133,7 @@ default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherM
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet suppress);

/**
* Get the size of watchers.
Expand Down
Expand Up @@ -23,15 +23,18 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ServerWatcher;
import org.apache.zookeeper.server.ZooTrace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -129,12 +132,12 @@ public synchronized void removeWatcher(Watcher watcher) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl) {
return triggerWatch(path, type, zxid, acl, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
Set<Watcher> watchers = new HashSet<>();
synchronized (this) {
Expand Down Expand Up @@ -182,7 +185,11 @@ public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, Watc
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
if (w instanceof ServerWatcher) {
((ServerWatcher) w).process(e, acl);
} else {
w.process(e);
}
}

switch (type) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand All @@ -31,8 +32,10 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ServerWatcher;
import org.apache.zookeeper.server.util.BitHashSet;
import org.apache.zookeeper.server.util.BitMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -202,12 +205,12 @@ public void processDeadWatchers(Set<Integer> deadWatchers) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl) {
return triggerWatch(path, type, zxid, acl, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);

BitHashSet watchers = remove(path);
Expand All @@ -232,7 +235,11 @@ public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, Watc
continue;
}

w.process(e);
if (w instanceof ServerWatcher) {
((ServerWatcher) w).process(e, acl);
} else {
w.process(e);
}
triggeredWatches++;
}
}
Expand Down

0 comments on commit daf7cfd

Please sign in to comment.