Skip to content

Commit

Permalink
HBASE-25167 Normalizer support for hot config reloading (#2523)
Browse files Browse the repository at this point in the history
Wire up the `ConfigurationObserver` chain for
`RegionNormalizerManager`. The following configuration keys support
hot-reloading:
 * hbase.normalizer.throughput.max_bytes_per_sec
 * hbase.normalizer.split.enabled
 * hbase.normalizer.merge.enabled
 * hbase.normalizer.min.region.count
 * hbase.normalizer.merge.min_region_age.days
 * hbase.normalizer.merge.min_region_size.mb

Note that support for `hbase.normalizer.period` is not provided
here. Support would need to be implemented generally for the `Chore`
subsystem.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Aman Poonia <aman.poonia.29@gmail.com>
  • Loading branch information
ndimiduk committed Oct 30, 2020
1 parent e3beccf commit d790bde
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 60 deletions.
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -21,28 +21,29 @@
import java.util.Set;
import java.util.WeakHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* Maintains the set of all the classes which would like to get notified
* when the Configuration is reloaded from the disk in the Online Configuration
* Change mechanism, which lets you update certain configuration properties
* on-the-fly, without having to restart the cluster.
*
* <p>
* If a class has configuration properties which you would like to be able to
* change on-the-fly, do the following:
* 1. Implement the {@link ConfigurationObserver} interface. This would require
* <ol>
* <li>Implement the {@link ConfigurationObserver} interface. This would require
* you to implement the
* {@link ConfigurationObserver#onConfigurationChange(Configuration)}
* method. This is a callback that is used to notify your class' instance
* that the configuration has changed. In this method, you need to check
* if the new values for the properties that are of interest to your class
* are different from the cached values. If yes, update them.
*
* <br />
* However, be careful with this. Certain properties might be trivially
* mutable online, but others might not. Two properties might be trivially
* mutable by themselves, but not when changed together. For example, if a
Expand All @@ -51,21 +52,23 @@
* yet updated "b", it might make a decision on the basis of a new value of
* "a", and an old value of "b". This might introduce subtle bugs. This
* needs to be dealt on a case-by-case basis, and this class does not provide
* any protection from such cases.
* any protection from such cases.</li>
*
* 2. Register the appropriate instance of the class with the
* <li>Register the appropriate instance of the class with the
* {@link ConfigurationManager} instance, using the
* {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
* method. Be careful not to do this in the constructor, as you might cause
* the 'this' reference to escape. Use a factory method, or an initialize()
* method which is called after the construction of the object.
* method which is called after the construction of the object.</li>
*
* 3. Deregister the instance using the
* <li>Deregister the instance using the
* {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
* method when it is going out of scope. In case you are not able to do that
* for any reason, it is still okay, since entries for dead observers are
* automatically collected during GC. But nonetheless, it is still a good
* practice to deregister your observer, whenever possible.
* practice to deregister your observer, whenever possible.</li>
* </ol>
* </p>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand Down Expand Up @@ -118,8 +121,8 @@ public void notifyAllObservers(Configuration conf) {
observer.onConfigurationChange(conf);
}
} catch (Throwable t) {
LOG.error("Encountered a throwable while notifying observers: " + " of type : " +
observer.getClass().getCanonicalName() + "(" + observer + ")", t);
LOG.error("Encountered a throwable while notifying observers: of type : {}({})",
observer.getClass().getCanonicalName(), observer, t);
}
}
}
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -24,7 +24,7 @@
/**
* Every class that wants to observe changes in Configuration properties,
* must implement interface (and also, register itself with the
* <code>ConfigurationManager</code> object.
* {@link ConfigurationManager}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -19,7 +19,6 @@

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
Expand All @@ -39,9 +38,9 @@ public class TestConfigurationManager {

private static final Logger LOG = LoggerFactory.getLogger(TestConfigurationManager.class);

class DummyConfigurationObserver implements ConfigurationObserver {
static class DummyConfigurationObserver implements ConfigurationObserver {
private boolean notifiedOnChange = false;
private ConfigurationManager cm;
private final ConfigurationManager cm;

public DummyConfigurationObserver(ConfigurationManager cm) {
this.cm = cm;
Expand All @@ -63,11 +62,11 @@ public void resetNotifiedOnChange() {
}

public void register() {
this.cm.registerObserver(this);
cm.registerObserver(this);
}

public void deregister() {
this.cm.deregisterObserver(this);
cm.deregisterObserver(this);
}
}

Expand Down
Expand Up @@ -785,6 +785,7 @@ private void initializeZKBasedSystemTrackers()

this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
this.configurationManager.registerObserver(regionNormalizerManager);
this.regionNormalizerManager.start();

this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
Expand Down
Expand Up @@ -22,8 +22,11 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
Expand All @@ -35,7 +38,7 @@
* This class encapsulates the details of the {@link RegionNormalizer} subsystem.
*/
@InterfaceAudience.Private
public class RegionNormalizerManager {
public class RegionNormalizerManager implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);

private final RegionNormalizerTracker regionNormalizerTracker;
Expand All @@ -48,7 +51,7 @@ public class RegionNormalizerManager {
private boolean started = false;
private boolean stopped = false;

public RegionNormalizerManager(
RegionNormalizerManager(
@NonNull final RegionNormalizerTracker regionNormalizerTracker,
@Nullable final RegionNormalizerChore regionNormalizerChore,
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
Expand All @@ -67,6 +70,25 @@ public RegionNormalizerManager(
.build());
}

@Override
public void registerChildren(ConfigurationManager manager) {
if (worker != null) {
manager.registerObserver(worker);
}
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
if (worker != null) {
manager.deregisterObserver(worker);
}
}

@Override
public void onConfigurationChange(Configuration conf) {
// no configuration managed here directly.
}

public void start() {
synchronized (startStopLock) {
if (started) {
Expand Down
Expand Up @@ -26,6 +26,9 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -39,7 +42,7 @@
* and executes the resulting {@link NormalizationPlan}s.
*/
@InterfaceAudience.Private
class RegionNormalizerWorker implements Runnable {
class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);

static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
Expand Down Expand Up @@ -70,7 +73,32 @@ class RegionNormalizerWorker implements Runnable {
this.rateLimiter = loadRateLimiter(configuration);
}

@Override
public void registerChildren(ConfigurationManager manager) {
if (regionNormalizer instanceof ConfigurationObserver) {
final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
manager.registerObserver(observer);
}
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
if (regionNormalizer instanceof ConfigurationObserver) {
final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
manager.deregisterObserver(observer);
}
}

@Override
public void onConfigurationChange(Configuration conf) {
rateLimiter.setRate(loadRateLimit(conf));
}

private static RateLimiter loadRateLimiter(final Configuration configuration) {
return RateLimiter.create(loadRateLimit(configuration));
}

private static long loadRateLimit(final Configuration configuration) {
long rateLimitBytes =
configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
long rateLimitMbs = rateLimitBytes / 1_000_000L;
Expand All @@ -82,7 +110,7 @@ private static RateLimiter loadRateLimiter(final Configuration configuration) {
}
LOG.info("Normalizer rate limit set to {}",
rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
return RateLimiter.create(rateLimitMbs);
return rateLimitMbs;
}

/**
Expand Down Expand Up @@ -116,6 +144,15 @@ long getMergePlanCount() {
return mergePlanCount;
}

/**
* Used in test only. This field is exposed to the test, as opposed to tracking the current
* configuration value beside the RateLimiter instance and managing synchronization to keep the
* two in sync.
*/
RateLimiter getRateLimiter() {
return rateLimiter;
}

@Override
public void run() {
while (true) {
Expand Down

0 comments on commit d790bde

Please sign in to comment.