Skip to content
Permalink
Browse files
HBASE-26437 Clean up the znodes for the src after a rename.
HBOSS was orphaning znodes from the src of a path which is renamed. Over
time, this will result in a very large usage of ZK due to HBOSS.

Add some logging to dump the contents of ZK

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>

Closes #29
  • Loading branch information
joshelser committed Nov 18, 2021
1 parent 0aaacf6 commit 21e1e920b7f15dbb1bfb8b261d39d303ea97f3e2
Showing 7 changed files with 246 additions and 14 deletions.
@@ -24,6 +24,8 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -452,13 +454,21 @@ public boolean rename(Path src, Path dst) throws IOException {
long startTime = System.currentTimeMillis();
long lockAcquiredTime = startTime;
long doneTime = startTime;
try (AutoLock l = sync.lockRename(src, dst)) {
// Future to pass into the AutoLock so it knows if it should clean up.
final CompletableFuture<Boolean> renameResult = new CompletableFuture<>();
try (AutoLock l = sync.lockRename(src, dst, renameResult)) {
lockAcquiredTime = System.currentTimeMillis();
metrics.updateAcquireRenameLockHisto(lockAcquiredTime- startTime);
boolean result = fs.rename(src, dst);
doneTime = System.currentTimeMillis();
metrics.updateRenameFsOperationHisto(doneTime - lockAcquiredTime);
return result;
// Defaulting to false in the case that fs.rename throws an exception
boolean result = false;
try {
result = fs.rename(src, dst);
return result;
} finally {
renameResult.complete(result);
doneTime = System.currentTimeMillis();
metrics.updateRenameFsOperationHisto(doneTime - lockAcquiredTime);
}
}
finally {
long releasedLocksTime = System.currentTimeMillis();
@@ -21,7 +21,10 @@
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@@ -471,7 +474,7 @@ public void close() throws IOException {
* @return AutoCloseable to release both paths
* @throws IOException at any possible IO failure.
*/
public AutoLock lockRename(Path rawSrc, Path rawDst) throws IOException {
public AutoLock lockRename(Path rawSrc, Path rawDst, Future<Boolean> successFuture) throws IOException {
Path src = norm(rawSrc);
Path dst = norm(rawDst);
LOG.debug("About to lock for rename: from {} to {}", src, dst);
@@ -484,8 +487,29 @@ public AutoLock lockRename(Path rawSrc, Path rawDst) throws IOException {
}
return new AutoLock() {
public void close() throws IOException {
// We have to clean up the src znodes:
// 1. If the rename was successful
// 2. While we still hold the write lock
LOG.debug("About to unlock after rename: from {} to {}", src, dst);
try {
Boolean renameSuccess;
try {
renameSuccess = successFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Unable to determine if filesystem rename was successful. Assuming it failed.", e);
renameSuccess = false;
}
if (renameSuccess != null && renameSuccess.booleanValue()) {
// Tricky... HBossContract tests tough things like
// `rename("/", "/somethingelse")`
// This means we grabbed write locks on
// / (src)
// /somethingelse (dst)
// Thus, we can't safely delete the znodes for src as it may
// then also affect the (held) lock on the dst. This is why
// we only delete the znodes on success.
recursiveDelete(src);
}
writeUnlock(src);
} finally {
writeUnlock(dst);
@@ -68,7 +68,7 @@ private void setRoot() {
root = "/hboss";
}

private static final String lockSubZnode = ".hboss-lock-znode";
public static final String LOCK_SUB_ZNODE = ".hboss-lock-znode";

private Map<Path,InterProcessReadWriteLock> lockCache = new HashMap<>();

@@ -161,7 +161,7 @@ protected void writeUnlock(Path p) throws IOException {
get(p).writeLock().release();
} catch(IllegalMonitorStateException e) {
// Reentrant locks might be acquired multiple times
LOG.error("Tried to release unacquired write lock: {}", p);
LOG.error("Tried to release unacquired write lock: {}", p, e);
throw e;
} catch (Exception e) {
throw new IOException("Exception during write unlocking of path " + p, e);
@@ -226,10 +226,11 @@ public boolean readLockBelow(Path p, Depth depth) throws IOException {

@Override
protected void recursiveDelete(Path p) throws IOException {
LOG.debug("Removing all mutex and znodes for paths beneath {}", p);
try {
ZKPaths.deleteChildren(curator.getZookeeperClient().getZooKeeper(),
p.toString(), !p.isRoot());
// Before this method is called, we have a guarantee that
// Before this method is called, we have a guarantee that
// 1. There are no write locks above or below us
// 2. There are no read locks below us
// As such, we can just remove locks beneath us as we find them.
@@ -257,7 +258,7 @@ synchronized void removeInMemoryLocks(Path p) {
*
* Specifically, this method will return true if the given path is a sub-directory
* of the parent or a file in the directory represented by the parent. This method
* returns false if the parent and the given path are the same.
* returns false if the parent and the given path are the same.
*/
boolean isBeneath(Path parent, Path given) {
if (parent.equals(given)) {
@@ -287,7 +288,7 @@ private boolean writeLockBelow(Path p, int level, int maxLevel) throws IOExcepti
if (level < maxLevel) {
List<String> children = curator.getChildren().forPath(p.toString());
for (String child : children) {
if (child.equals(lockSubZnode)) {
if (child.equals(LOCK_SUB_ZNODE)) {
continue;
}
if (writeLockBelow(new Path(p, child), level+1, maxLevel)) {
@@ -312,7 +313,7 @@ private boolean readLockBelow(Path p, int level, int maxLevel) throws IOExceptio
if (level < maxLevel) {
List<String> children = curator.getChildren().forPath(p.toString());
for (String child : children) {
if (child.equals(lockSubZnode)) {
if (child.equals(LOCK_SUB_ZNODE)) {
continue;
}
if (readLockBelow(new Path(p, child), level+1, maxLevel)) {
@@ -357,6 +358,9 @@ public String summarizeLocks() {
StringBuilder sb = new StringBuilder();
Map<Path,InterProcessReadWriteLock> cache = getUnmodifiableCache();
for (Entry<Path,InterProcessReadWriteLock> entry : cache.entrySet()) {
if (sb.length() > 0) {
sb.append("\n");
}
sb.append(entry.getKey()).append("=").append(describeLock(entry.getValue()));
}
return sb.toString();
@@ -382,7 +386,7 @@ public synchronized Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {

private synchronized InterProcessReadWriteLock get(Path path) throws IOException {
if (!lockCache.containsKey(path)) {
String zkPath = new Path(path, lockSubZnode).toString();
String zkPath = new Path(path, LOCK_SUB_ZNODE).toString();
try {
ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath, true);
} catch (KeeperException.NodeExistsException e) {
@@ -43,6 +43,10 @@ public Path testPath(String path) {
return TestUtils.testPath(hboss, path);
}

public TreeLockManager getLockManager() {
return sync;
}

@Before
public void setup() throws Exception {
Configuration conf = new Configuration();
@@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.oss;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Validates that when some entry in the object store is deleted, the corresponding
* data in ZooKeeper is also deleted.
*/
public class TestZNodeCleanup extends HBaseObjectStoreSemanticsTest {
private static final Logger LOG = LoggerFactory.getLogger(TestZNodeCleanup.class);

private ZKTreeLockManager lockManager;
private ZooKeeper zk;

@Before
public void setup() throws Exception {
super.setup();
assumeTrue("Lock manager is a " + getLockManager().getClass(),
getLockManager() instanceof ZKTreeLockManager);
lockManager = (ZKTreeLockManager) getLockManager();
Configuration conf = hboss.getConf();
LOG.info("Waiting for ZK client to connect");
// TODO should wait for ZK to connect
final CountDownLatch latch = new CountDownLatch(1);
// Root the ZK connection beneath /hboss
zk = new ZooKeeper(conf.get(Constants.ZK_CONN_STRING) + "/hboss", 60000, new Watcher() {
@Override
public void process(WatchedEvent event) {
LOG.info("Caught event " + event);
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
LOG.info("ZooKeeper client is connected");
}

@After
public void teardown() throws Exception {
String zkRoot = lockManager.norm(TestUtils.testPathRoot(hboss)).toString();
LOG.info("Dumping contents of ZooKeeper after test from {}", zkRoot);
printZkBFS(zkRoot);
if (zk != null) {
zk.close();
zk = null;
}
}

void printZkBFS(String path) throws Exception {
LOG.info(path);
List<String> children = zk.getChildren(path, false);
for (String child : children) {
printZkBFS(path + "/" + child);
}
}

String getZNodeFromPath(Path p) {
return Path.getPathWithoutSchemeAndAuthority(p).toString().substring(1);
}

void validatePathInZk(Path zkPath) throws Exception {
assertNotNull(zkPath + " did not exist in ZK", zk.exists(zkPath.toString(), false));
String hbossLock = new Path(zkPath, ZKTreeLockManager.LOCK_SUB_ZNODE).toString();
assertNotNull(hbossLock + " did not exist in ZK", zk.exists(hbossLock, false));
}

void validatePathNotInZk(Path zkPath) throws Exception {
assertNull(zkPath + " incorrectly exists in ZK.", zk.exists(zkPath.toString(), false));
}

@Test
public void testRename() throws Exception {
// Rename src to dest and validate that the znode for src is cleaned up.
final Path src = TestUtils.testPath(hboss, "src");
final Path dest = TestUtils.testPath(hboss, "dest");
assertTrue(hboss.mkdirs(src));
// The src znode should exist after creating the dir in S3
validatePathInZk(lockManager.norm(src));
// `mv src dest`
assertTrue(hboss.rename(src, dest));
// We should have a znode for dest (we just locked it)
validatePathInZk(lockManager.norm(dest));
// We should no longer have a znode for src (we effectively deleted it from S3)
validatePathNotInZk(lockManager.norm(src));
}

@Test
public void testFailedRename() throws Exception {
// Rename src to dest and validate that the znode for src is cleaned up.
final Path src = TestUtils.testPathRoot(hboss);
final Path dest = TestUtils.testPath(hboss, "dest");
assertTrue(hboss.mkdirs(src));
// The src znode should exist after creating the dir in S3
validatePathInZk(lockManager.norm(src));
// The move should fail
assertFalse(hboss.rename(src, dest));
// We should have a znode for dest (we just locked it)
validatePathInZk(lockManager.norm(src));
// We should no longer have a znode for src (we effectively deleted it from S3)
validatePathInZk(lockManager.norm(dest));
}

@Test
public void testRenameDeeperToHigher() throws Exception {
// Rename src to dest and validate that the znode for src is cleaned up.
final Path src = TestUtils.testPath(hboss, "/a/b/1");
final Path dest = TestUtils.testPath(hboss, "/a/1");
assertTrue(hboss.mkdirs(src));
// The src znode should exist after creating the dir in S3
validatePathInZk(lockManager.norm(src));
// mv /a/b/1 /a/1
assertTrue(hboss.rename(src, dest));
// We should not have a znode for the src
validatePathNotInZk(lockManager.norm(src));
// We should have a lock for the dest
validatePathInZk(lockManager.norm(dest));
}

@Test
public void testRenameHigherToDeeper() throws Exception {
// Rename src to dest and validate that the znode for src is cleaned up.
final Path src = TestUtils.testPath(hboss, "/a/1");
final Path dest = TestUtils.testPath(hboss, "/a/b/1");
// `mkdir /a/1`
assertTrue(hboss.mkdirs(src));
// The src znode should exist after creating the dir in S3
validatePathInZk(lockManager.norm(src));
// `mkdir /a/b`
assertTrue(hboss.mkdirs(dest.getParent()));
// `mv /a/1 /a/b/1`
assertTrue(hboss.rename(src, dest));
// We should have a znode for dest (we just locked it)
validatePathNotInZk(lockManager.norm(src));
// We should no longer have a znode for src (we effectively deleted it from S3)
validatePathInZk(lockManager.norm(dest));
}

@Test
public void testDelete() throws Exception {
// Delete src and validate that the znode for src is cleaned up.
final Path src = TestUtils.testPath(hboss, "src");
assertTrue(hboss.mkdirs(src));
// The src znode should exist after creating the dir in S3
validatePathInZk(lockManager.norm(src));
// `mv src dest`
assertTrue(hboss.delete(src, true));
// We should no longer have a znode for src since we deleted it from S3
validatePathNotInZk(lockManager.norm(src));
}
}
@@ -36,7 +36,7 @@ public class TestTreeLockManager extends HBaseObjectStoreSemanticsTest {

@Test
public void testLockBelowChecks() throws Exception {
Assume.assumeFalse(sync instanceof NullTreeLockManager);
Assume.assumeFalse(getLockManager() instanceof NullTreeLockManager);

Path parent = testPath("testListingLevels");
Path child = new Path(parent, "child");
@@ -17,3 +17,5 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.logger.org.apache.hadoop=DEBUG
log4j.logger.org.apache.hadoop.metrics2=WARN
log4j.logger.org.apache.hadoop.fs=WARN

0 comments on commit 21e1e92

Please sign in to comment.