Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27780 FileChangeWatcher improvements #5164

Merged
merged 2 commits into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.nio.file.WatchService;
import java.util.function.Consumer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,7 +71,8 @@ public enum State {
* relative to <code>dirPath</code>.
* @throws IOException if there is an error creating the WatchService.
*/
public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
public FileChangeWatcher(Path dirPath, String threadNameSuffix, Consumer<WatchEvent<?>> callback)
throws IOException {
FileSystem fs = dirPath.getFileSystem();
WatchService watchService = fs.newWatchService();

Expand All @@ -83,7 +83,7 @@ public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws
StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW });
state = State.NEW;
this.watcherThread = new WatcherThread(watchService, callback);
this.watcherThread = new WatcherThread(threadNameSuffix, watchService, callback);
this.watcherThread.setDaemon(true);
}

Expand Down Expand Up @@ -172,20 +172,30 @@ public void stop() {
}
}

String getWatcherThreadName() {
return watcherThread.getName();
}

private static void handleException(Thread thread, Throwable e) {
LOG.warn("Exception occurred from thread {}", thread.getName(), e);
}

/**
* Inner class that implements the watcher thread logic.
*/
private class WatcherThread extends ZooKeeperThread {
private class WatcherThread extends Thread {

private static final String THREAD_NAME = "FileChangeWatcher";
private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";

final WatchService watchService;
final Consumer<WatchEvent<?>> callback;

WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
super(THREAD_NAME);
WatcherThread(String threadNameSuffix, WatchService watchService,
Consumer<WatchEvent<?>> callback) {
super(THREAD_NAME_PREFIX + threadNameSuffix);
this.watchService = watchService;
this.callback = callback;
setUncaughtExceptionHandler(FileChangeWatcher::handleException);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ public final class X509Util {
static final String CONFIG_PREFIX = "hbase.rpc.tls.";
public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + "keystore.location";
static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type";
static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password";
static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location";
static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type";
static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password";
public static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type";
public static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password";
public static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location";
public static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type";
public static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password";
public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED =
Expand Down Expand Up @@ -417,7 +417,11 @@ public static void enableCertFileReloading(Configuration config,
String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext));
// we are using the same callback for both. there's no reason to kick off two
// threads if keystore/truststore are both at the same location
if (!keyStoreLocation.equals(trustStoreLocation)) {
trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext));
}
}

private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext)
Expand All @@ -430,9 +434,10 @@ private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runna
if (parentPath == null) {
throw new IOException("Key/trust store path does not have a parent: " + filePath);
}
FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> {
handleWatchEvent(filePath, watchEvent, resetContext);
});
FileChangeWatcher fileChangeWatcher =
new FileChangeWatcher(parentPath, Objects.toString(filePath.getFileName()), watchEvent -> {
handleWatchEvent(filePath, watchEvent, resetContext);
});
fileChangeWatcher.start();
return fileChangeWatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.io;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
Expand All @@ -29,11 +32,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -76,12 +83,43 @@ public static void cleanupTempDir() {
UTIL.cleanupTestDir();
}

@Test
public void testEnableCertFileReloading() throws IOException {
Configuration myConf = new Configuration();
String sharedPath = "/tmp/foo.jks";
myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>();
AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>();
X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
});
assertNotNull(keystoreWatcher.get());
assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks"));
assertNull(truststoreWatcher.get());

keystoreWatcher.getAndSet(null).stop();
truststoreWatcher.set(null);

String truststorePath = "/tmp/bar.jks";
myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
});

assertNotNull(keystoreWatcher.get());
assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks"));
assertNotNull(truststoreWatcher.get());
assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-bar.jks"));

keystoreWatcher.getAndSet(null).stop();
truststoreWatcher.getAndSet(null).stop();
}

@Test
public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
// Filter out the extra ENTRY_CREATE events that are
// sometimes seen at the start. Even though we create the watcher
Expand Down Expand Up @@ -124,7 +162,7 @@ public void testCallbackWorksOnFileTouched() throws IOException, InterruptedExce
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
// Filter out the extra ENTRY_CREATE events that are
// sometimes seen at the start. Even though we create the watcher
Expand Down Expand Up @@ -164,7 +202,7 @@ public void testCallbackWorksOnFileAdded() throws IOException, InterruptedExcept
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
synchronized (events) {
events.add(event);
Expand Down Expand Up @@ -198,7 +236,7 @@ public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedExce
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
// Filter out the extra ENTRY_CREATE events that are
// sometimes seen at the start. Even though we create the watcher
Expand Down Expand Up @@ -238,7 +276,7 @@ public void testCallbackErrorDoesNotCrashWatcherThread()
FileChangeWatcher watcher = null;
try {
final AtomicInteger callCount = new AtomicInteger(0);
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
int oldValue;
synchronized (callCount) {
Expand Down