diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index cb5cd2715e1b..d797d74370b1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -256,6 +256,15 @@ public enum OperationStatusCode { /** Configuration key for ZooKeeper session timeout */ public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; + /** Timeout for the ZK sync() call */ + public static final String ZK_SYNC_BLOCKING_TIMEOUT_MS = "hbase.zookeeper.sync.timeout.millis"; + // Choice of the default value is based on the following ZK recommendation (from docs). Keeping it + // lower lets the callers fail fast in case of any issues. + // "The clients view of the system is guaranteed to be up-to-date within a certain time bound. + // (On the order of tens of seconds.) Either system changes will be seen by a client within this + // bound, or the client will detect a service outage." + public static final long ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS = 30 * 1000; + /** Default value for ZooKeeper session timeout */ public static final int DEFAULT_ZK_SESSION_TIMEOUT = 90 * 1000; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index c72231ac08e6..516075c462ec 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -209,7 +209,7 @@ public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, * @param prefix The prefix of every created Thread's name * @return a {@link java.util.concurrent.ThreadFactory} that names threads */ - private static ThreadFactory getNamedThreadFactory(final String prefix) { + public static ThreadFactory getNamedThreadFactory(final String prefix) { SecurityManager s = System.getSecurityManager(); final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() .getThreadGroup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java index f93c3c0ff0ea..9978f4a67d80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java @@ -124,7 +124,7 @@ private void enable(ZKWatcher zooKeeper, byte[] table) throws KeeperException { */ private void disable(ZKWatcher zooKeeper, byte[] table) throws KeeperException { // ensure the latest state of the archive node is found - zooKeeper.sync(archiveZnode); + zooKeeper.syncOrTimeout(archiveZnode); // if the top-level archive node is gone, then we are done if (ZKUtil.checkExists(zooKeeper, archiveZnode) < 0) { @@ -133,7 +133,7 @@ private void disable(ZKWatcher zooKeeper, byte[] table) throws KeeperException { // delete the table node, from the archive String tableNode = this.getTableNode(table); // make sure the table is the latest version so the delete takes - zooKeeper.sync(tableNode); + zooKeeper.syncOrTimeout(tableNode); LOG.debug("Attempting to delete table node:" + tableNode); ZKUtil.deleteNodeRecursively(zooKeeper, tableNode); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java index 0bbc8d3a35c4..bcb3b8ba4fbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java @@ -111,7 +111,7 @@ public void nodeDeleted(String path) { public void nodeDataChanged(String path) { if (path.equals(labelZnode) || path.equals(userAuthsZnode)) { try { - watcher.sync(path); + watcher.syncOrTimeout(path); byte[] data = ZKUtil.getDataAndWatch(watcher, path); if (path.equals(labelZnode)) { refreshVisibilityLabelsCache(data); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index f89c8fd0ef74..9cc1ab21bdee 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -667,7 +667,7 @@ public synchronized ZooKeeper getZooKeeper() { } public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { - checkZk().sync(path, cb, null); + checkZk().sync(path, cb, ctx); } /** diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index 9774a51b88df..fca8738778a4 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -23,13 +23,20 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -79,8 +86,22 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { // listeners to be notified private final List listeners = new CopyOnWriteArrayList<>(); + // Single threaded executor pool that processes event notifications from Zookeeper. Events are + // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do + // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. + // EventThread internally runs a single while loop to serially process all the events. When events + // are processed by the listeners in the same thread, that blocks the EventThread from processing + // subsequent events. Processing events in a separate thread frees up the event thread to continue + // and further prevents deadlocks if the process method itself makes other zookeeper calls. + // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the + // requests using a single while loop and hence there is no performance degradation. + private final ExecutorService zkEventProcessor = + Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor")); + private final Configuration conf; + private final long zkSyncTimeout; + /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); @@ -169,6 +190,8 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, throw zce; } } + this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, + HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); } private void createBaseZNodes() throws ZooKeeperConnectionException { @@ -479,21 +502,8 @@ public ZNodePaths getZNodePaths() { return znodePaths; } - /** - * Method called from ZooKeeper for events and connection status. - *

- * Valid events are passed along to listeners. Connection status changes - * are dealt with locally. - */ - @Override - public void process(WatchedEvent event) { - LOG.debug(prefix("Received ZooKeeper Event, " + - "type=" + event.getType() + ", " + - "state=" + event.getState() + ", " + - "path=" + event.getPath())); - + private void processEvent(WatchedEvent event) { switch(event.getType()) { - // If event type is NONE, this is a connection status change case None: { connectionEvent(event); @@ -501,7 +511,6 @@ public void process(WatchedEvent event) { } // Otherwise pass along to the listeners - case NodeCreated: { for(ZKListener listener : listeners) { listener.nodeCreated(event.getPath()); @@ -530,10 +539,26 @@ public void process(WatchedEvent event) { break; } default: - throw new IllegalStateException("Received event is not valid: " + event.getState()); + LOG.error("Invalid event of type {} received for path {}. Ignoring.", + event.getState(), event.getPath()); } } + /** + * Method called from ZooKeeper for events and connection status. + *

+ * Valid events are passed along to listeners. Connection status changes + * are dealt with locally. + */ + @Override + public void process(WatchedEvent event) { + LOG.debug(prefix("Received ZooKeeper Event, " + + "type=" + event.getType() + ", " + + "state=" + event.getState() + ", " + + "path=" + event.getPath())); + zkEventProcessor.submit(() -> processEvent(event)); + } + // Connection management /** @@ -585,7 +610,8 @@ private void connectionEvent(WatchedEvent event) { } /** - * Forces a synchronization of this ZooKeeper client connection. + * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a + * timeout lets the callers fail-fast rather than wait forever for the sync to finish. *

* Executing this method before running other methods will ensure that the * subsequent operations are up-to-date and consistent as of the time that @@ -595,9 +621,28 @@ private void connectionEvent(WatchedEvent event) { * data of an existing node and delete or transition that node, utilizing the * previously read version and data. We want to ensure that the version read * is up-to-date from when we begin the operation. + *

*/ - public void sync(String path) throws KeeperException { - this.recoverableZooKeeper.sync(path, null, null); + public void syncOrTimeout(String path) throws KeeperException { + final CountDownLatch latch = new CountDownLatch(1); + long startTime = EnvironmentEdgeManager.currentTime(); + this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); + try { + if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { + LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " + + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); + throw new KeeperException.RequestTimeoutException(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for ZK sync() to finish.", e); + Thread.currentThread().interrupt(); + return; + } + if (LOG.isDebugEnabled()) { + // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a + // useful metric to have since the latency of sync() impacts the callers. + LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); + } } /** @@ -647,6 +692,7 @@ public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLa */ @Override public void close() { + zkEventProcessor.shutdownNow(); try { recoverableZooKeeper.close(); } catch (InterruptedException e) {