Skip to content

Commit

Permalink
HBASE-24603: Make Zookeeper sync() call synchronous (#1945) (#1976)
Browse files Browse the repository at this point in the history
Writing a test for this is tricky. There is enough coverage for
functional tests. Only concern is performance, but there is enough
logging for it to detect timed out/badly performing sync calls.

Additionally, this patch decouples the ZK event processing into it's
own thread rather than doing it in the EventThread's context. That
avoids deadlocks and stalls of the event thread.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 84e246f)
(cherry picked from commit 2379a25)
  • Loading branch information
bharathv committed Jun 26, 2020
1 parent 54c38c8 commit 32690e1
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ public synchronized byte[] getSessionPasswd() {
}

public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
checkZk().sync(path, cb, null);
checkZk().sync(path, cb, ctx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
byte [] data = rt.toByteArray();
String node = getNodeName(zkw, region.getEncodedName());
zkw.sync(node);
zkw.syncOrTimeout(node);
int version = ZKUtil.checkExists(zkw, node);
if (version == -1) {
return ZKUtil.createAndWatch(zkw, node, data);
Expand Down Expand Up @@ -444,7 +444,7 @@ public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
"node " + encodedRegionName + " in expected state " + expectedState));
}
String node = getNodeName(zkw, encodedRegionName);
zkw.sync(node);
zkw.syncOrTimeout(node);
Stat stat = new Stat();
byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
if (bytes == null) {
Expand Down Expand Up @@ -645,7 +645,7 @@ public static int confirmNodeOpening(ZooKeeperWatcher zkw,
}

String node = getNodeName(zkw, encoded);
zkw.sync(node);
zkw.syncOrTimeout(node);

// Read existing data of the node
Stat stat = new Stat();
Expand Down Expand Up @@ -727,7 +727,7 @@ public static boolean checkClosingState(ZooKeeperWatcher zkw, HRegionInfo region
int expectedVersion) throws KeeperException {

final String encoded = getNodeName(zkw, region.getEncodedName());
zkw.sync(encoded);
zkw.syncOrTimeout(encoded);

// Read existing data of the node
Stat stat = new Stat();
Expand Down Expand Up @@ -807,7 +807,7 @@ public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
}

String node = getNodeName(zkw, encoded);
zkw.sync(node);
zkw.syncOrTimeout(node);

// Read existing data of the node
Stat stat = new Stat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -38,9 +41,11 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
Expand Down Expand Up @@ -86,6 +91,18 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
private final List<ZooKeeperListener> listeners =
new CopyOnWriteArrayList<ZooKeeperListener>();

// Single threaded executor pool that processes event notifications from Zookeeper. Events are
// processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do
// this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context.
// EventThread internally runs a single while loop to serially process all the events. When events
// are processed by the listeners in the same thread, that blocks the EventThread from processing
// subsequent events. Processing events in a separate thread frees up the event thread to continue
// and further prevents deadlocks if the process method itself makes other zookeeper calls.
// It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
// requests using a single while loop and hence there is no performance degradation.
private final ExecutorService zkEventProcessor =
Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor"));

// Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
// negotiation to complete
public CountDownLatch saslLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -143,6 +160,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {

private final Configuration conf;

private final long zkSyncTimeout;

/* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");

Expand Down Expand Up @@ -196,6 +215,8 @@ public ZooKeeperWatcher(Configuration conf, String identifier,
throw zce;
}
}
this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS,
HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
}

private void createBaseZNodes() throws ZooKeeperConnectionException {
Expand Down Expand Up @@ -609,59 +630,66 @@ public String getBaseZNode() {
return baseZNode;
}

/**
* Method called from ZooKeeper for events and connection status.
* <p>
* Valid events are passed along to listeners. Connection status changes
* are dealt with locally.
*/
@Override
public void process(WatchedEvent event) {
LOG.debug(prefix("Received ZooKeeper Event, " +
"type=" + event.getType() + ", " +
"state=" + event.getState() + ", " +
"path=" + event.getPath()));

private void processEvent(WatchedEvent event) {
switch(event.getType()) {

// If event type is NONE, this is a connection status change
case None: {
connectionEvent(event);
break;
}

// Otherwise pass along to the listeners

case NodeCreated: {
for(ZooKeeperListener listener : listeners) {
listener.nodeCreated(event.getPath());
}
break;
}

case NodeDeleted: {
for(ZooKeeperListener listener : listeners) {
listener.nodeDeleted(event.getPath());
}
break;
}

case NodeDataChanged: {
for(ZooKeeperListener listener : listeners) {
listener.nodeDataChanged(event.getPath());
}
break;
}

case NodeChildrenChanged: {
for(ZooKeeperListener listener : listeners) {
listener.nodeChildrenChanged(event.getPath());
}
break;
}
default: {
LOG.error(String.format("Invalid event of type %s received for path %s. Ignoring",
event.getType(), event.getPath()));
break;
}
}
}

/**
* Method called from ZooKeeper for events and connection status.
* <p>
* Valid events are passed along to listeners. Connection status changes
* are dealt with locally.
*/
@Override
public void process(final WatchedEvent event) {
LOG.debug(prefix("Received ZooKeeper Event, " +
"type=" + event.getType() + ", " +
"state=" + event.getState() + ", " +
"path=" + event.getPath()));
zkEventProcessor.submit(new Runnable() {
@Override
public void run() {
processEvent(event);
}
});
}

// Connection management

/**
Expand Down Expand Up @@ -709,7 +737,8 @@ private void connectionEvent(WatchedEvent event) {
}

/**
* Forces a synchronization of this ZooKeeper client connection.
* Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a
* timeout lets the callers fail-fast rather than wait forever for the sync to finish.
* <p>
* Executing this method before running other methods will ensure that the
* subsequent operations are up-to-date and consistent as of the time that
Expand All @@ -720,8 +749,33 @@ private void connectionEvent(WatchedEvent event) {
* previously read version and data. We want to ensure that the version read
* is up-to-date from when we begin the operation.
*/
public void sync(String path) throws KeeperException {
this.recoverableZooKeeper.sync(path, null, null);
public void syncOrTimeout(String path) throws KeeperException {
final CountDownLatch latch = new CountDownLatch(1);
long startTime = EnvironmentEdgeManager.currentTime();
this.recoverableZooKeeper.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int i, String s, Object o) {
latch.countDown();
}
}, null);
try {
if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) {
LOG.warn(String.format("sync() operation to ZK timed out. Configured timeout: %s ms. " +
"This usually points to a ZK side issue. Check ZK server logs and metrics.",
zkSyncTimeout));
throw new KeeperException.OperationTimeoutException();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted waiting for ZK sync() to finish.", e);
Thread.currentThread().interrupt();
return;
}
if (LOG.isDebugEnabled()) {
// TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a
// useful metric to have since the latency of sync() impacts the callers.
LOG.debug(String.format("ZK sync() operation took %d ms",
EnvironmentEdgeManager.currentTime() - startTime));
}
}

/**
Expand Down Expand Up @@ -770,6 +824,7 @@ public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLa
*/
@Override
public void close() {
zkEventProcessor.shutdownNow();
try {
recoverableZooKeeper.close();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ public enum OperationStatusCode {
/** Configuration key for ZooKeeper session timeout */
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";

/** Timeout for the ZK sync() call */
public static final String ZK_SYNC_BLOCKING_TIMEOUT_MS = "hbase.zookeeper.sync.timeout.millis";
// Choice of the default value is based on the following ZK recommendation (from docs). Keeping it
// lower lets the callers fail fast in case of any issues.
// "The clients view of the system is guaranteed to be up-to-date within a certain time bound.
// (On the order of tens of seconds.) Either system changes will be seen by a client within this
// bound, or the client will detect a service outage."
public static final long ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS = 30 * 1000;

/** Default value for ZooKeeper session timeout */
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private void enable(ZooKeeperWatcher zooKeeper, byte[] table)
*/
private void disable(ZooKeeperWatcher zooKeeper, byte[] table) throws KeeperException {
// ensure the latest state of the archive node is found
zooKeeper.sync(archiveZnode);
zooKeeper.syncOrTimeout(archiveZnode);

// if the top-level archive node is gone, then we are done
if (ZKUtil.checkExists(zooKeeper, archiveZnode) < 0) {
Expand All @@ -132,7 +132,7 @@ private void disable(ZooKeeperWatcher zooKeeper, byte[] table) throws KeeperExce
// delete the table node, from the archive
String tableNode = this.getTableNode(table);
// make sure the table is the latest version so the delete takes
zooKeeper.sync(tableNode);
zooKeeper.syncOrTimeout(tableNode);

LOG.debug("Attempting to delete table node:" + tableNode);
ZKUtil.deleteNodeRecursively(zooKeeper, tableNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void nodeDeleted(String path) {
public void nodeDataChanged(String path) {
if (path.equals(labelZnode) || path.equals(userAuthsZnode)) {
try {
watcher.sync(path);
watcher.syncOrTimeout(path);
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
if (path.equals(labelZnode)) {
refreshVisibilityLabelsCache(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static boolean verifyRegionState(ZooKeeperWatcher zkw, HRegionInfo region, Event
String encoded = region.getEncodedName();

String node = ZKAssign.getNodeName(zkw, encoded);
zkw.sync(node);
zkw.syncOrTimeout(node);

// Read existing data of the node
byte [] existingBytes = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ public void testSplitWithRegionReplicas() throws Exception {
assertTrue("not able to find a splittable region", region != null);
String node = ZKAssign.getNodeName(regionServer.getZooKeeper(),
region.getRegionInfo().getEncodedName());
regionServer.getZooKeeper().sync(node);
regionServer.getZooKeeper().syncOrTimeout(node);
SplitTransactionImpl st = new SplitTransactionImpl(region, Bytes.toBytes("row2"));
try {
st.prepare();
Expand Down Expand Up @@ -1318,7 +1318,7 @@ public PairOfSameType<Region> stepsBeforePONR(final Server server,
};
String node = ZKAssign.getNodeName(regionServer.getZooKeeper(),
region.getRegionInfo().getEncodedName());
regionServer.getZooKeeper().sync(node);
regionServer.getZooKeeper().syncOrTimeout(node);
for (int i = 0; i < 100; i++) {
// We expect the znode to be deleted by this time. Here the
// znode could be in OPENED state and the
Expand Down

0 comments on commit 32690e1

Please sign in to comment.