From d790bdeddef755751c43f328daaf5aa027bf8cad Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 30 Oct 2020 10:41:56 -0700 Subject: [PATCH] HBASE-25167 Normalizer support for hot config reloading (#2523) Wire up the `ConfigurationObserver` chain for `RegionNormalizerManager`. The following configuration keys support hot-reloading: * hbase.normalizer.throughput.max_bytes_per_sec * hbase.normalizer.split.enabled * hbase.normalizer.merge.enabled * hbase.normalizer.min.region.count * hbase.normalizer.merge.min_region_age.days * hbase.normalizer.merge.min_region_size.mb Note that support for `hbase.normalizer.period` is not provided here. Support would need to be implemented generally for the `Chore` subsystem. Signed-off-by: Bharath Vissapragada Signed-off-by: Viraj Jasani Signed-off-by: Aman Poonia --- .../hbase/conf/ConfigurationManager.java | 27 +-- .../hbase/conf/ConfigurationObserver.java | 4 +- .../hbase/conf/TestConfigurationManager.java | 11 +- .../apache/hadoop/hbase/master/HMaster.java | 1 + .../normalizer/RegionNormalizerManager.java | 26 ++- .../normalizer/RegionNormalizerWorker.java | 41 ++++- .../normalizer/SimpleRegionNormalizer.java | 159 ++++++++++++++---- ...ormalizerManagerConfigurationObserver.java | 110 ++++++++++++ 8 files changed, 319 insertions(+), 60 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java index 511679f5b547..2c36c5308fa3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,28 +21,29 @@ import java.util.Set; import java.util.WeakHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * Maintains the set of all the classes which would like to get notified * when the Configuration is reloaded from the disk in the Online Configuration * Change mechanism, which lets you update certain configuration properties * on-the-fly, without having to restart the cluster. - * + *

* If a class has configuration properties which you would like to be able to * change on-the-fly, do the following: - * 1. Implement the {@link ConfigurationObserver} interface. This would require + *

    + *
  1. Implement the {@link ConfigurationObserver} interface. This would require * you to implement the * {@link ConfigurationObserver#onConfigurationChange(Configuration)} * method. This is a callback that is used to notify your class' instance * that the configuration has changed. In this method, you need to check * if the new values for the properties that are of interest to your class * are different from the cached values. If yes, update them. - * + *
    * However, be careful with this. Certain properties might be trivially * mutable online, but others might not. Two properties might be trivially * mutable by themselves, but not when changed together. For example, if a @@ -51,21 +52,23 @@ * yet updated "b", it might make a decision on the basis of a new value of * "a", and an old value of "b". This might introduce subtle bugs. This * needs to be dealt on a case-by-case basis, and this class does not provide - * any protection from such cases. + * any protection from such cases.
  2. * - * 2. Register the appropriate instance of the class with the + *
  3. Register the appropriate instance of the class with the * {@link ConfigurationManager} instance, using the * {@link ConfigurationManager#registerObserver(ConfigurationObserver)} * method. Be careful not to do this in the constructor, as you might cause * the 'this' reference to escape. Use a factory method, or an initialize() - * method which is called after the construction of the object. + * method which is called after the construction of the object.
  4. * - * 3. Deregister the instance using the + *
  5. Deregister the instance using the * {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)} * method when it is going out of scope. In case you are not able to do that * for any reason, it is still okay, since entries for dead observers are * automatically collected during GC. But nonetheless, it is still a good - * practice to deregister your observer, whenever possible. + * practice to deregister your observer, whenever possible.
  6. + *
+ *

*/ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -118,8 +121,8 @@ public void notifyAllObservers(Configuration conf) { observer.onConfigurationChange(conf); } } catch (Throwable t) { - LOG.error("Encountered a throwable while notifying observers: " + " of type : " + - observer.getClass().getCanonicalName() + "(" + observer + ")", t); + LOG.error("Encountered a throwable while notifying observers: of type : {}({})", + observer.getClass().getCanonicalName(), observer, t); } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java index 2370a21af033..0d1d8ce5a783 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,7 +24,7 @@ /** * Every class that wants to observe changes in Configuration properties, * must implement interface (and also, register itself with the - * ConfigurationManager object. + * {@link ConfigurationManager}. */ @InterfaceAudience.Private @InterfaceStability.Evolving diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java index 20dd02442631..21d74806ba04 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -39,9 +38,9 @@ public class TestConfigurationManager { private static final Logger LOG = LoggerFactory.getLogger(TestConfigurationManager.class); - class DummyConfigurationObserver implements ConfigurationObserver { + static class DummyConfigurationObserver implements ConfigurationObserver { private boolean notifiedOnChange = false; - private ConfigurationManager cm; + private final ConfigurationManager cm; public DummyConfigurationObserver(ConfigurationManager cm) { this.cm = cm; @@ -63,11 +62,11 @@ public void resetNotifiedOnChange() { } public void register() { - this.cm.registerObserver(this); + cm.registerObserver(this); } public void deregister() { - this.cm.deregisterObserver(this); + cm.deregisterObserver(this); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 58a805334f36..f9123046eef2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -785,6 +785,7 @@ private void initializeZKBasedSystemTrackers() this.regionNormalizerManager = RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this); + this.configurationManager.registerObserver(regionNormalizerManager); this.regionNormalizerManager.start(); this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java index e818519d6513..b4d16e796731 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java @@ -22,8 +22,11 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -35,7 +38,7 @@ * This class encapsulates the details of the {@link RegionNormalizer} subsystem. */ @InterfaceAudience.Private -public class RegionNormalizerManager { +public class RegionNormalizerManager implements PropagatingConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class); private final RegionNormalizerTracker regionNormalizerTracker; @@ -48,7 +51,7 @@ public class RegionNormalizerManager { private boolean started = false; private boolean stopped = false; - public RegionNormalizerManager( + RegionNormalizerManager( @NonNull final RegionNormalizerTracker regionNormalizerTracker, @Nullable final RegionNormalizerChore regionNormalizerChore, @Nullable final RegionNormalizerWorkQueue workQueue, @@ -67,6 +70,25 @@ public RegionNormalizerManager( .build()); } + @Override + public void registerChildren(ConfigurationManager manager) { + if (worker != null) { + manager.registerObserver(worker); + } + } + + @Override + public void deregisterChildren(ConfigurationManager manager) { + if (worker != null) { + manager.deregisterObserver(worker); + } + } + + @Override + public void onConfigurationChange(Configuration conf) { + // no configuration managed here directly. + } + public void start() { synchronized (startStopLock) { if (started) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java index 30f9fc25364d..408317a31f87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java @@ -26,6 +26,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -39,7 +42,7 @@ * and executes the resulting {@link NormalizationPlan}s. */ @InterfaceAudience.Private -class RegionNormalizerWorker implements Runnable { +class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable { private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class); static final String RATE_LIMIT_BYTES_PER_SEC_KEY = @@ -70,7 +73,32 @@ class RegionNormalizerWorker implements Runnable { this.rateLimiter = loadRateLimiter(configuration); } + @Override + public void registerChildren(ConfigurationManager manager) { + if (regionNormalizer instanceof ConfigurationObserver) { + final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; + manager.registerObserver(observer); + } + } + + @Override + public void deregisterChildren(ConfigurationManager manager) { + if (regionNormalizer instanceof ConfigurationObserver) { + final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; + manager.deregisterObserver(observer); + } + } + + @Override + public void onConfigurationChange(Configuration conf) { + rateLimiter.setRate(loadRateLimit(conf)); + } + private static RateLimiter loadRateLimiter(final Configuration configuration) { + return RateLimiter.create(loadRateLimit(configuration)); + } + + private static long loadRateLimit(final Configuration configuration) { long rateLimitBytes = configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES); long rateLimitMbs = rateLimitBytes / 1_000_000L; @@ -82,7 +110,7 @@ private static RateLimiter loadRateLimiter(final Configuration configuration) { } LOG.info("Normalizer rate limit set to {}", rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec"); - return RateLimiter.create(rateLimitMbs); + return rateLimitMbs; } /** @@ -116,6 +144,15 @@ long getMergePlanCount() { return mergePlanCount; } + /** + * Used in test only. This field is exposed to the test, as opposed to tracking the current + * configuration value beside the RateLimiter instance and managing synchronization to keep the + * two in sync. + */ + RateLimiter getRateLimiter() { + return rateLimiter; + } + @Override public void run() { while (true) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java index 062e401ba812..6d7387b7f11b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.assignment.RegionStates; @@ -56,7 +57,7 @@ * */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -class SimpleRegionNormalizer implements RegionNormalizer { +class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class); static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled"; @@ -72,25 +73,17 @@ class SimpleRegionNormalizer implements RegionNormalizer { static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb"; static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1; - private Configuration conf; private MasterServices masterServices; - private boolean splitEnabled; - private boolean mergeEnabled; - private int minRegionCount; - private Period mergeMinRegionAge; - private long mergeMinRegionSizeMb; + private NormalizerConfiguration normalizerConfiguration; public SimpleRegionNormalizer() { - splitEnabled = DEFAULT_SPLIT_ENABLED; - mergeEnabled = DEFAULT_MERGE_ENABLED; - minRegionCount = DEFAULT_MIN_REGION_COUNT; - mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS); - mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB; + masterServices = null; + normalizerConfiguration = new NormalizerConfiguration(); } @Override public Configuration getConf() { - return conf; + return normalizerConfiguration.getConf(); } @Override @@ -98,12 +91,13 @@ public void setConf(final Configuration conf) { if (conf == null) { return; } - this.conf = conf; - splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED); - mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED); - minRegionCount = parseMinRegionCount(conf); - mergeMinRegionAge = parseMergeMinRegionAge(conf); - mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf); + normalizerConfiguration = new NormalizerConfiguration(conf, normalizerConfiguration); + } + + @Override + public void onConfigurationChange(Configuration conf) { + LOG.debug("Updating configuration parameters according to new configuration instance."); + setConf(conf); } private static int parseMinRegionCount(final Configuration conf) { @@ -141,39 +135,46 @@ private static void warnInvalidValue(final String key, final T parsedValue, key, parsedValue, settledValue); } + private static void logConfigurationUpdated(final String key, final T oldValue, + final T newValue) { + if (!Objects.equals(oldValue, newValue)) { + LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue); + } + } + /** * Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}. */ public boolean isSplitEnabled() { - return splitEnabled; + return normalizerConfiguration.isSplitEnabled(); } /** * Return this instance's configured value for {@value #MERGE_ENABLED_KEY}. */ public boolean isMergeEnabled() { - return mergeEnabled; + return normalizerConfiguration.isMergeEnabled(); } /** * Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}. */ public int getMinRegionCount() { - return minRegionCount; + return normalizerConfiguration.getMinRegionCount(); } /** * Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}. */ public Period getMergeMinRegionAge() { - return mergeMinRegionAge; + return normalizerConfiguration.getMergeMinRegionAge(); } /** * Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}. */ public long getMergeMinRegionSizeMb() { - return mergeMinRegionSizeMb; + return normalizerConfiguration.getMergeMinRegionSizeMb(); } @Override @@ -292,8 +293,15 @@ private double getAverageRegionSizeMb(final List tableRegions) { /** * Determine if a {@link RegionInfo} should be considered for a merge operation. + *

+ * Callers beware: for safe concurrency, be sure to pass in the local instance of + * {@link NormalizerConfiguration}, don't use {@code this}'s instance. */ - private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) { + private boolean skipForMerge( + final NormalizerConfiguration normalizerConfiguration, + final RegionStates regionStates, + final RegionInfo regionInfo + ) { final RegionState state = regionStates.getRegionState(regionInfo); final String name = regionInfo.getEncodedName(); return @@ -304,10 +312,10 @@ private boolean skipForMerge(final RegionStates regionStates, final RegionInfo r () -> !Objects.equals(state.getState(), RegionState.State.OPEN), "skipping merge of region {} because it is not open.", name) || logTraceReason( - () -> !isOldEnoughForMerge(regionInfo), + () -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo), "skipping merge of region {} because it is not old enough.", name) || logTraceReason( - () -> !isLargeEnoughForMerge(regionInfo), + () -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo), "skipping merge region {} because it is not large enough.", name); } @@ -316,15 +324,16 @@ private boolean skipForMerge(final RegionStates regionStates, final RegionInfo r * towards target average or target region count. */ private List computeMergeNormalizationPlans(final NormalizeContext ctx) { - if (isEmpty(ctx.getTableRegions()) || ctx.getTableRegions().size() < minRegionCount) { + final NormalizerConfiguration configuration = normalizerConfiguration; + if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) { LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" - + " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(), - minRegionCount); + + " is {}, not computing merge plans.", ctx.getTableName(), + ctx.getTableRegions().size(), configuration.getMinRegionCount()); return Collections.emptyList(); } final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb(); - if (avgRegionSizeMb < mergeMinRegionSizeMb) { + if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) { return Collections.emptyList(); } LOG.debug("Computing normalization plan for table {}. average region size: {}, number of" @@ -347,7 +356,7 @@ private List computeMergeNormalizationPlans(final NormalizeCo for (current = rangeStart; current < ctx.getTableRegions().size(); current++) { final RegionInfo regionInfo = ctx.getTableRegions().get(current); final long regionSizeMb = getRegionSizeMB(regionInfo); - if (skipForMerge(ctx.getRegionStates(), regionInfo)) { + if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) { // this region cannot participate in a range. resume the outer loop. rangeStart = Math.max(current, rangeStart + 1); break; @@ -419,18 +428,28 @@ private List computeSplitNormalizationPlans(final NormalizeCo * Return {@code true} when {@code regionInfo} has a creation date that is old * enough to be considered for a merge operation, {@code false} otherwise. */ - private boolean isOldEnoughForMerge(final RegionInfo regionInfo) { + private static boolean isOldEnoughForMerge( + final NormalizerConfiguration normalizerConfiguration, + final RegionInfo regionInfo + ) { final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime()); final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId()); - return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge)); + return currentTime.isAfter( + regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge())); } /** * Return {@code true} when {@code regionInfo} has a size that is sufficient * to be considered for a merge operation, {@code false} otherwise. + *

+ * Callers beware: for safe concurrency, be sure to pass in the local instance of + * {@link NormalizerConfiguration}, don't use {@code this}'s instance. */ - private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) { - return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb; + private boolean isLargeEnoughForMerge( + final NormalizerConfiguration normalizerConfiguration, + final RegionInfo regionInfo + ) { + return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(); } private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue, @@ -442,6 +461,74 @@ private static boolean logTraceReason(final BooleanSupplier predicate, final Str return value; } + /** + * Holds the configuration values read from {@link Configuration}. Encapsulation in a POJO + * enables atomic hot-reloading of configs without locks. + */ + private static final class NormalizerConfiguration { + private final Configuration conf; + private final boolean splitEnabled; + private final boolean mergeEnabled; + private final int minRegionCount; + private final Period mergeMinRegionAge; + private final long mergeMinRegionSizeMb; + + private NormalizerConfiguration() { + conf = null; + splitEnabled = DEFAULT_SPLIT_ENABLED; + mergeEnabled = DEFAULT_MERGE_ENABLED; + minRegionCount = DEFAULT_MIN_REGION_COUNT; + mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS); + mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB; + } + + private NormalizerConfiguration( + final Configuration conf, + final NormalizerConfiguration currentConfiguration + ) { + this.conf = conf; + splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED); + mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED); + minRegionCount = parseMinRegionCount(conf); + mergeMinRegionAge = parseMergeMinRegionAge(conf); + mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf); + logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(), + splitEnabled); + logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(), + mergeEnabled); + logConfigurationUpdated(MIN_REGION_COUNT_KEY, currentConfiguration.getMinRegionCount(), + minRegionCount); + logConfigurationUpdated(MERGE_MIN_REGION_AGE_DAYS_KEY, + currentConfiguration.getMergeMinRegionAge(), mergeMinRegionAge); + logConfigurationUpdated(MERGE_MIN_REGION_SIZE_MB_KEY, + currentConfiguration.getMergeMinRegionSizeMb(), mergeMinRegionSizeMb); + } + + public Configuration getConf() { + return conf; + } + + public boolean isSplitEnabled() { + return splitEnabled; + } + + public boolean isMergeEnabled() { + return mergeEnabled; + } + + public int getMinRegionCount() { + return minRegionCount; + } + + public Period getMergeMinRegionAge() { + return mergeMinRegionAge; + } + + public long getMergeMinRegionSizeMb() { + return mergeMinRegionSizeMb; + } + } + /** * Inner class caries the state necessary to perform a single invocation of * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java new file mode 100644 index 000000000000..00980233edce --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerManagerConfigurationObserver.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter; + +/** + * Test that configuration changes are propagated to all children. + */ +@Category({ MasterTests.class, SmallTests.class}) +public class TestRegionNormalizerManagerConfigurationObserver { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionNormalizerManagerConfigurationObserver.class); + + private static final HBaseTestingUtility testUtil = new HBaseTestingUtility(); + private static final Pattern rateLimitPattern = + Pattern.compile("RateLimiter\\[stableRate=(?.+)qps]"); + + private Configuration conf; + private SimpleRegionNormalizer normalizer; + @Mock private MasterServices masterServices; + @Mock private RegionNormalizerTracker tracker; + @Mock private RegionNormalizerChore chore; + @Mock private RegionNormalizerWorkQueue queue; + private RegionNormalizerWorker worker; + private ConfigurationManager configurationManager; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + conf = testUtil.getConfiguration(); + normalizer = new SimpleRegionNormalizer(); + worker = new RegionNormalizerWorker(conf, masterServices, normalizer, queue); + final RegionNormalizerManager normalizerManager = + new RegionNormalizerManager(tracker, chore, queue, worker); + configurationManager = new ConfigurationManager(); + configurationManager.registerObserver(normalizerManager); + } + + @Test + public void test() { + assertTrue(normalizer.isMergeEnabled()); + assertEquals(3, normalizer.getMinRegionCount()); + assertEquals(1_000_000L, parseConfiguredRateLimit(worker.getRateLimiter())); + + final Configuration newConf = new Configuration(conf); + // configs on SimpleRegionNormalizer + newConf.setBoolean("hbase.normalizer.merge.enabled", false); + newConf.setInt("hbase.normalizer.min.region.count", 100); + // config on RegionNormalizerWorker + newConf.set("hbase.normalizer.throughput.max_bytes_per_sec", "12g"); + + configurationManager.notifyAllObservers(newConf); + assertFalse(normalizer.isMergeEnabled()); + assertEquals(100, normalizer.getMinRegionCount()); + assertEquals(12_884L, parseConfiguredRateLimit(worker.getRateLimiter())); + } + + /** + * The {@link RateLimiter} class does not publicly expose its currently configured rate. It does + * offer this information in the {@link RateLimiter#toString()} method. It's fragile, but parse + * this value. The alternative would be to track the value explicitly in the worker, and the + * associated coordination overhead paid at runtime. See the related note on + * {@link RegionNormalizerWorker#getRateLimiter()}. + */ + private static long parseConfiguredRateLimit(final RateLimiter rateLimiter) { + final String val = rateLimiter.toString(); + final Matcher matcher = rateLimitPattern.matcher(val); + assertTrue(matcher.matches()); + final String parsedRate = matcher.group("rate"); + return (long) Double.parseDouble(parsedRate); + } +}