Skip to content

Commit

Permalink
ZOOKEEPER-4799: Refactor ACL check in 'addWatch' command
Browse files Browse the repository at this point in the history
As of today, it is impossible to diagnose which watch events are dropped
because of ACLs.  Let's centralize, systematize, and log the checks at
the 'process()' site in the Netty and NIO connections.

(These 'process()' methods contain some duplicated code, and should also
be refactored at some point.  This series does not change them.)

This patch also adds a substantial number of tests in order to avoid
unexpected regressions.

Co-authored-by: Patrick Hunt <phunt@apache.org>
Co-authored-by: Damien Diederen <ddiederen@apache.org>
  • Loading branch information
3 people committed Feb 12, 2024
1 parent b8eb6a3 commit 65b91d2
Show file tree
Hide file tree
Showing 14 changed files with 763 additions and 35 deletions.
Expand Up @@ -191,7 +191,7 @@ void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, null);
}
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void tearDown() {

// clear all the watches
for (String path : paths) {
watchManager.triggerWatch(path, event);
watchManager.triggerWatch(path, event, null);
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ public void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, null);
}
}
}
Expand Up @@ -450,7 +450,10 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
if (parent == null) {
throw new KeeperException.NoNodeException();
}
List<ACL> parentAcl;
synchronized (parent) {
parentAcl = getACL(parent);

// Add the ACL to ACL cache first, to avoid the ACL not being
// created race condition during fuzzy snapshot sync.
//
Expand Down Expand Up @@ -527,8 +530,9 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, acl);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged, parentAcl);
}

/**
Expand Down Expand Up @@ -568,16 +572,20 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce
if (node == null) {
throw new KeeperException.NoNodeException();
}
List<ACL> acl;
nodes.remove(path);
synchronized (node) {
acl = getACL(node);
aclCache.removeUsage(node.acl);
nodeDataSize.addAndGet(-getNodeSize(path, node.data));
}

// Synchronized to sync the containers and ttls change, probably
// only need to sync on containers and ttls, will update it in a
// separate patch.
List<ACL> parentAcl;
synchronized (parent) {
parentAcl = getACL(parent);
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
Expand Down Expand Up @@ -624,9 +632,10 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce
"childWatches.triggerWatch " + parentName);
}

WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, acl);
childWatches.triggerWatch(path, EventType.NodeDeleted, acl, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
EventType.NodeChildrenChanged, parentAcl);
}

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
Expand All @@ -635,8 +644,10 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
if (n == null) {
throw new KeeperException.NoNodeException();
}
List<ACL> acl;
byte[] lastdata = null;
synchronized (n) {
acl = getACL(n);
lastdata = n.data;
nodes.preChange(path, n);
n.data = data;
Expand All @@ -658,7 +669,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));

updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, acl);
return s;
}

Expand Down
Expand Up @@ -22,8 +22,10 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.util.List;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;

Expand All @@ -48,7 +50,7 @@ void setSessionTimeout(int sessionTimeout) {
}

@Override
public void process(WatchedEvent event) {
public void process(WatchedEvent event, List<ACL> znodeAcl) {
}

@Override
Expand Down
Expand Up @@ -30,14 +30,17 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
Expand Down Expand Up @@ -697,7 +700,18 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
* @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
*/
@Override
public void process(WatchedEvent event) {
public void process(WatchedEvent event, List<ACL> znodeAcl) {
try {
zkServer.checkACL(this, znodeAcl, ZooDefs.Perms.READ, getAuthInfo(), event.getPath(), null);
} catch (KeeperException.NoAuthException e) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Not delivering event " + event + " to 0x" + Long.toHexString(this.sessionId) + " (filtered by ACL)");
}
return;
}
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
Expand Down
Expand Up @@ -38,11 +38,15 @@
import java.nio.channels.SelectionKey;
import java.security.cert.Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
Expand Down Expand Up @@ -159,7 +163,18 @@ public int getSessionTimeout() {
}

@Override
public void process(WatchedEvent event) {
public void process(WatchedEvent event, List<ACL> znodeAcl) {
try {
zkServer.checkACL(this, znodeAcl, ZooDefs.Perms.READ, getAuthInfo(), event.getPath(), null);
} catch (KeeperException.NoAuthException e) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Not delivering event " + event + " to 0x" + Long.toHexString(this.sessionId) + " (filtered by ACL)");
}
return;
}
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
Expand Down
Expand Up @@ -39,8 +39,8 @@
import org.apache.jute.Record;
import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.Counter;
Expand All @@ -53,7 +53,7 @@
* Interface to a Server connection - represents a connection from a client
* to the server.
*/
public abstract class ServerCnxn implements Stats, Watcher {
public abstract class ServerCnxn implements Stats, ServerWatcher {

// This is just an arbitrary object to represent requests issued by
// (aka owned by) this class
Expand Down Expand Up @@ -264,7 +264,11 @@ protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
/* notify the client the session is closing and close/cleanup socket */
public abstract void sendCloseSession();

public abstract void process(WatchedEvent event);
public void process(WatchedEvent event) {
process(event, null);
}

public abstract void process(WatchedEvent event, List<ACL> znodeAcl);

public abstract long getSessionId();

Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;

import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;

public interface ServerWatcher extends Watcher {

void process(WatchedEvent event, List<ACL> znodeAcl);

}
Expand Up @@ -19,8 +19,10 @@
package org.apache.zookeeper.server.watch;

import java.io.PrintWriter;
import java.util.List;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.ACL;

public interface IWatchManager {

Expand Down Expand Up @@ -82,10 +84,11 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @param path znode path
* @param type the watch event type
* @param acl ACL of the znode in path
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type);
WatcherOrBitSet triggerWatch(String path, EventType type, List<ACL> acl);

/**
* Distribute the watch event for the given path, but ignore those
Expand All @@ -97,7 +100,7 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
WatcherOrBitSet triggerWatch(String path, EventType type, List<ACL> acl, WatcherOrBitSet suppress);

/**
* Get the size of watchers.
Expand Down
Expand Up @@ -22,15 +22,18 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ServerWatcher;
import org.apache.zookeeper.server.ZooTrace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,12 +118,12 @@ public synchronized void removeWatcher(Watcher watcher) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, List<ACL> acl) {
return triggerWatch(path, type, acl, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
public WatcherOrBitSet triggerWatch(String path, EventType type, List<ACL> acl, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
Expand Down Expand Up @@ -165,7 +168,11 @@ public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
if (w instanceof ServerWatcher) {
((ServerWatcher) w).process(e, acl);
} else {
w.process(e);
}
}

switch (type) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand All @@ -31,8 +32,10 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ServerWatcher;
import org.apache.zookeeper.server.util.BitHashSet;
import org.apache.zookeeper.server.util.BitMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -202,12 +205,12 @@ public void processDeadWatchers(Set<Integer> deadWatchers) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, List<ACL> acl) {
return triggerWatch(path, type, acl, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
public WatcherOrBitSet triggerWatch(String path, EventType type, List<ACL> acl, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);

BitHashSet watchers = remove(path);
Expand All @@ -232,7 +235,11 @@ public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet
continue;
}

w.process(e);
if (w instanceof ServerWatcher) {
((ServerWatcher) w).process(e, acl);
} else {
w.process(e);
}
triggeredWatches++;
}
}
Expand Down

0 comments on commit 65b91d2

Please sign in to comment.