From f42b2aa108ed3e31436f2b7fc1489af659502330 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 8 Nov 2019 13:54:02 -0800 Subject: [PATCH] HBASE-23230 Enforce member visibility in HRegionServer (#775) (#806) * Clean up a bunch of private variable leakage into other classes. Reduces visibility as much as possible, providing getters where access remains necessary or making use of getters that already exist. There remains an insidious relationship between `HRegionServer` and `RSRpcServices`. * Rename `fs` to `dataFs`, `rootDir` as `dataRootDir` so as to distinguish from the new `walFs`, `walRootDir` (and make it easier to spot bugs). * Cleanup or delete a bunch of lack-luster javadoc comments. * Delete a handful of methods that are unused according to static analysis. * Reduces the warning count as reported by IntelliJ from 100 to 7. Signed-off-by: stack --- .../tmpl/regionserver/RSStatusTmpl.jamon | 2 +- .../java/org/apache/hadoop/hbase/Server.java | 3 +- .../apache/hadoop/hbase/master/HMaster.java | 6 +- .../hbase/master/MasterRpcServices.java | 7 +- .../hbase/regionserver/HRegionServer.java | 609 ++++++++---------- .../{Leases.java => LeaseManager.java} | 44 +- .../hbase/regionserver/MemStoreFlusher.java | 4 +- .../MetricsRegionServerWrapperImpl.java | 14 +- .../MetricsTableWrapperAggregateImpl.java | 2 +- .../hbase/regionserver/RSDumpServlet.java | 7 +- .../hbase/regionserver/RSRpcServices.java | 187 +++--- .../regionserver/RegionServerServices.java | 2 +- .../hbase/regionserver/SplitRequest.java | 11 +- .../hbase/regionserver/SplitWALCallable.java | 2 +- .../hbase/MockRegionServerServices.java | 4 +- .../hadoop/hbase/master/MockRegionServer.java | 4 +- .../hbase/master/TestMasterStatusServlet.java | 2 +- ...TestCleanupCompactedFileAfterFailover.java | 2 +- .../TestPerColumnFamilyFlush.java | 2 +- .../hbase/regionserver/TestPriorityRpc.java | 39 +- .../regionserver/TestRSStatusServlet.java | 2 +- .../TestRegionServerAbortTimeout.java | 7 +- .../regionserver/TestRegionServerMetrics.java | 4 +- .../TestRegionServerNoMaster.java | 2 +- .../TestSecureBulkLoadManager.java | 4 +- .../regionserver/TestSplitWalDataLoss.java | 6 +- .../SerialReplicationTestBase.java | 7 +- 27 files changed, 441 insertions(+), 544 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{Leases.java => LeaseManager.java} (89%) diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index 2b07523844d7..f07b8e766126 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -133,7 +133,7 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;

Server Metrics

- <& ServerMetricsTmpl; mWrap = regionServer.getRegionServerMetrics().getRegionServerWrapper(); + <& ServerMetricsTmpl; mWrap = regionServer.getMetrics().getRegionServerWrapper(); mServerWrap = regionServer.getRpcServer().getMetrics().getHBaseServerWrapper(); bbAllocator = regionServer.getRpcServer().getByteBuffAllocator(); &>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java index e533b7531253..8963b19ee6eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java @@ -78,8 +78,7 @@ public interface Server extends Abortable, Stoppable { /** * @return Return the FileSystem object used (can return null!). */ - // TODO: On Master, return Master's. On RegionServer, return RegionServers. The FileSystems - // may differ. TODO. + // TODO: Distinguish between "dataFs" and "walFs". default FileSystem getFileSystem() { // This default is pretty dodgy! Configuration c = getConfiguration(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2a31a384a9f8..52ba32606859 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -515,8 +515,8 @@ public HMaster(final Configuration conf) this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); - LOG.info("hbase.rootdir=" + getRootDir() + - ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); + LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(), + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); // Disable usage of meta replicas in the master this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); @@ -3441,7 +3441,7 @@ public void reportMobCompactionEnd(TableName tableName) throws IOException { */ public void requestMobCompaction(TableName tableName, List columns, boolean allFiles) throws IOException { - mobCompactThread.requestMobCompaction(conf, fs, tableName, columns, allFiles); + mobCompactThread.requestMobCompaction(conf, getFileSystem(), tableName, columns, allFiles); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 4943d4de6563..4b3ba0e905a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -392,9 +392,10 @@ protected Class getRpcSchedulerFactoryClass() { } @Override - protected RpcServerInterface createRpcServer(Server server, Configuration conf, - RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) - throws IOException { + protected RpcServerInterface createRpcServer(final Server server, + final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, + final String name) throws IOException { + final Configuration conf = regionServer.getConfiguration(); // RpcServer at HM by default enable ByteBufferPool iff HM having user table region in it boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, LoadBalancer.isMasterCanHostUserRegions(conf)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f22137dad9c0..4567ed667af8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -188,7 +187,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.misc.Signal; -import sun.misc.SignalHandler; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -241,9 +239,6 @@ @SuppressWarnings({ "deprecation"}) public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId, ConfigurationObserver { - // Time to pause if master says 'please hold'. Make configurable if needed. - private static final int INIT_PAUSE_TIME_MS = 1000; - private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); /** @@ -253,10 +248,12 @@ public class HRegionServer extends HasThread implements @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") public static boolean TEST_SKIP_REPORTING_TRANSITION = false; - //RegionName vs current action in progress - //true - if open region action in progress - //false - if close region action in progress - protected final ConcurrentMap regionsInTransitionInRS = + /** + * A map from RegionName to current action in progress. Boolean value indicates: + * true - if open region action in progress + * false - if close region action in progress + */ + private final ConcurrentMap regionsInTransitionInRS = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); /** @@ -271,10 +268,9 @@ public class HRegionServer extends HasThread implements private final Cache executedRegionProcedures = CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); - // Cache flushing - protected MemStoreFlusher cacheFlusher; + private MemStoreFlusher cacheFlusher; - protected HeapMemoryManager hMemManager; + private HeapMemoryManager hMemManager; /** * Cluster connection to be shared by services. @@ -292,8 +288,8 @@ public class HRegionServer extends HasThread implements protected TableDescriptors tableDescriptors; // Replication services. If no replication, this handler will be null. - protected ReplicationSourceService replicationSourceHandler; - protected ReplicationSinkService replicationSinkHandler; + private ReplicationSourceService replicationSourceHandler; + private ReplicationSinkService replicationSinkHandler; // Compactions public CompactSplit compactSplitThread; @@ -302,7 +298,12 @@ public class HRegionServer extends HasThread implements * Map of regions currently being served by this region server. Key is the * encoded region name. All access should be synchronized. */ - protected final Map onlineRegions = new ConcurrentHashMap<>(); + private final Map onlineRegions = new ConcurrentHashMap<>(); + /** + * Lock for gating access to {@link #onlineRegions}. + * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap? + */ + private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); /** * Map of encoded region names to the DataNode locations they should be hosted on @@ -313,19 +314,16 @@ public class HRegionServer extends HasThread implements * it seems a bit weird to store ServerName since ServerName refers to RegionServers * and here we really mean DataNode locations. */ - protected final Map regionFavoredNodesMap = - new ConcurrentHashMap<>(); + private final Map regionFavoredNodesMap = new ConcurrentHashMap<>(); - // Leases - protected Leases leases; + private LeaseManager leaseManager; // Instance of the hbase executor executorService. protected ExecutorService executorService; - // If false, the file system has become unavailable - protected volatile boolean fsOk; - protected HFileSystem fs; - protected HFileSystem walFs; + private volatile boolean dataFsOk; + private HFileSystem dataFs; + private HFileSystem walFs; // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests @@ -335,45 +333,36 @@ public class HRegionServer extends HasThread implements // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. private volatile boolean abortRequested; - public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; + static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; // Default abort timeout is 1200 seconds for safe private static final long DEFAULT_ABORT_TIMEOUT = 1200000; // Will run this task when abort timeout - public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; - - ConcurrentMap rowlocks = new ConcurrentHashMap<>(); + static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; // A state before we go into stopped state. At this stage we're closing user // space regions. private boolean stopping = false; - - volatile boolean killed = false; - + private volatile boolean killed = false; private volatile boolean shutDown = false; protected final Configuration conf; - private Path rootDir; + private Path dataRootDir; private Path walRootDir; - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - final int numRetries; - protected final int threadWakeFrequency; - protected final int msgInterval; + private final int threadWakeFrequency; + final int msgInterval; private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; private final int compactionCheckFrequency; private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; private final int flushCheckFrequency; - protected final int numRegionsToReport; - // Stub to do region server status calls against the master. private volatile RegionServerStatusService.BlockingInterface rssStub; private volatile LockService.BlockingInterface lockStub; // RPC client. Used to make the stub above that does region server status checking. - RpcClient rpcClient; + private RpcClient rpcClient; private RpcRetryingCallerFactory rpcRetryingCallerFactory; private RpcControllerFactory rpcControllerFactory; @@ -389,8 +378,7 @@ public class HRegionServer extends HasThread implements /** region server process name */ public static final String REGIONSERVER = "regionserver"; - MetricsRegionServer metricsRegionServer; - MetricsTable metricsTable; + private MetricsRegionServer metricsRegionServer; private SpanReceiverHost spanReceiverHost; /** @@ -398,21 +386,19 @@ public class HRegionServer extends HasThread implements */ private ChoreService choreService; - /* + /** * Check for compactions requests. */ - ScheduledChore compactionChecker; + private ScheduledChore compactionChecker; - /* + /** * Check for flushes */ - ScheduledChore periodicFlusher; + private ScheduledChore periodicFlusher; - protected volatile WALFactory walFactory; + private volatile WALFactory walFactory; - // WAL roller. log is protected rather than private to avoid - // eclipse warning when accessed by inner classes - protected LogRoller walRoller; + private LogRoller walRoller; // A thread which calls reportProcedureDone private RemoteProcedureResultReporter procedureResultReporter; @@ -456,11 +442,11 @@ public class HRegionServer extends HasThread implements /** * The server name the Master sees us as. Its made from the hostname the * master passes us, port, and server startcode. Gets set after registration - * against Master. + * against Master. */ protected ServerName serverName; - /* + /** * hostname specified by hostname config */ protected String useThisHostnameInstead; @@ -473,8 +459,11 @@ public class HRegionServer extends HasThread implements @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname"; - // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. - // Exception will be thrown if both are used. + /** + * HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. + * Exception will be thrown if both are used. + */ + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = "hbase.regionserver.hostname.disable.master.reversedns"; @@ -527,7 +516,7 @@ public class HRegionServer extends HasThread implements protected final RSRpcServices rpcServices; - protected CoordinatedStateManager csm; + private CoordinatedStateManager csm; /** * Configuration manager is used to register/deregister and notify the configuration observers @@ -540,9 +529,9 @@ public class HRegionServer extends HasThread implements private volatile ThroughputController flushThroughputController; - protected SecureBulkLoadManager secureBulkLoadManager; + private SecureBulkLoadManager secureBulkLoadManager; - protected FileSystemUtilizationChore fsUtilizationChore; + private FileSystemUtilizationChore fsUtilizationChore; private final NettyEventLoopGroupConfig eventLoopGroupConfig; @@ -551,28 +540,30 @@ public class HRegionServer extends HasThread implements * means it needs to just come up and make do without a Master to talk to: e.g. in test or * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only * purpose is as a Replication-stream sink; see HBASE-18846 for more. + * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ? */ private final boolean masterless; - static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; + private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; /**regionserver codec list **/ - public static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; + private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; // A timer to shutdown the process if abort takes too long private Timer abortMonitor; /** - * Starts a HRegionServer at the default location + * Starts a HRegionServer at the default location. + *

+ * Don't start any services or managers in here in the Constructor. + * Defer till after we register with the Master as much as possible. See {@link #startServices}. */ - // Don't start any services or managers in here in the Constructor. - // Defer till after we register with the Master as much as possible. See #startServices. - public HRegionServer(Configuration conf) throws IOException { + public HRegionServer(final Configuration conf) throws IOException { super("RegionServer"); // thread name TraceUtil.initTracer(conf); try { this.startcode = System.currentTimeMillis(); this.conf = conf; - this.fsOk = true; + this.dataFsOk = true; this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); this.eventLoopGroupConfig = setupNetty(this.conf); MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); @@ -584,8 +575,6 @@ public HRegionServer(Configuration conf) throws IOException { // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); // Config'ed params - this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); @@ -596,8 +585,6 @@ public HRegionServer(Configuration conf) throws IOException { boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; - this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); - this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -636,12 +623,8 @@ public HRegionServer(Configuration conf) throws IOException { mobFileCache = new MobFileCache(conf); } - uncaughtExceptionHandler = new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - abort("Uncaught exception in executorService thread " + t.getName(), e); - } - }; + uncaughtExceptionHandler = + (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); initializeFileSystem(); spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); @@ -714,12 +697,9 @@ protected String getUseThisHostnameInstead(Configuration conf) throws IOExceptio */ private static void setupWindows(final Configuration conf, ConfigurationManager cm) { if (!SystemUtils.IS_OS_WINDOWS) { - Signal.handle(new Signal("HUP"), new SignalHandler() { - @Override - public void handle(Signal signal) { - conf.reloadConfiguration(); - cm.notifyAllObservers(conf); - } + Signal.handle(new Signal("HUP"), signal -> { + conf.reloadConfiguration(); + cm.notifyAllObservers(conf); }); } } @@ -744,14 +724,14 @@ private void initializeFileSystem() throws IOException { // underlying hadoop hdfs accessors will be going against wrong filesystem // (unless all is set to defaults). FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); - this.fs = new HFileSystem(this.conf, useHBaseChecksum); - this.rootDir = FSUtils.getRootDir(this.conf); + this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); + this.dataRootDir = FSUtils.getRootDir(this.conf); this.tableDescriptors = getFsTableDescriptors(); } - protected TableDescriptors getFsTableDescriptors() throws IOException { + private TableDescriptors getFsTableDescriptors() throws IOException { return new FSTableDescriptors(this.conf, - this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); + this.dataFs, this.dataRootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); } protected Function getMetaTableObserver() { @@ -948,7 +928,7 @@ private void initializeZooKeeper() throws IOException, InterruptedException { * if the region server is shut down * @param tracker znode tracker to use * @throws IOException any IO exception, plus if the RS is stopped - * @throws InterruptedException + * @throws InterruptedException if the waiting thread is interrupted */ private void blockAndCheckIfStopped(ZKNodeTracker tracker) throws IOException, InterruptedException { @@ -982,7 +962,7 @@ public void run() { try { if (!isStopped() && !isAborted()) { - ShutdownHook.install(conf, fs, this, Thread.currentThread()); + ShutdownHook.install(conf, dataFs, this, Thread.currentThread()); // Initialize the RegionServerCoprocessorHost now that our ephemeral // node was created, in case any coprocessors want to use ZooKeeper this.rsHost = new RegionServerCoprocessorHost(this, this.conf); @@ -1028,13 +1008,13 @@ public void run() { // The main run loop. while (!isStopped() && isHealthy()) { if (!isClusterUp()) { - if (isOnlineRegionsEmpty()) { + if (onlineRegions.isEmpty()) { stop("Exiting; cluster shutdown set and not carrying any regions"); } else if (!this.stopping) { this.stopping = true; LOG.info("Closing user regions"); closeUserRegions(this.abortRequested); - } else if (this.stopping) { + } else { boolean allUserRegionsOffline = areAllUserRegionsOffline(); if (allUserRegionsOffline) { // Set stopped if no more write requests tp meta tables @@ -1070,8 +1050,8 @@ public void run() { } } - if (this.leases != null) { - this.leases.closeAfterLeasesExpire(); + if (this.leaseManager != null) { + this.leaseManager.closeAfterLeasesExpire(); } if (this.splitLogWorker != null) { splitLogWorker.stop(); @@ -1101,7 +1081,6 @@ public void run() { if (this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); - sendShutdownInterrupt(); // Stop the snapshot and other procedure handlers, forcefully killing all running tasks if (rspmHost != null) { @@ -1111,7 +1090,7 @@ public void run() { if (this.killed) { // Just skip out w/o closing regions. Used when testing. } else if (abortRequested) { - if (this.fsOk) { + if (this.dataFsOk) { closeUserRegions(abortRequested); // Don't leave any open file handles } LOG.info("aborting server " + this.serverName); @@ -1132,7 +1111,7 @@ public void run() { // Closing the compactSplit thread before closing meta regions if (!this.killed && containsMetaTableRegions()) { - if (!abortRequested || this.fsOk) { + if (!abortRequested || this.dataFsOk) { if (this.compactSplitThread != null) { this.compactSplitThread.join(); this.compactSplitThread = null; @@ -1141,7 +1120,7 @@ public void run() { } } - if (!this.killed && this.fsOk) { + if (!this.killed && this.dataFsOk) { waitOnAllRegionsToClose(abortRequested); LOG.info("stopping server " + this.serverName + "; all regions closed."); } @@ -1155,8 +1134,8 @@ public void run() { rsSpaceQuotaManager = null; } - //fsOk flag may be changed when closing regions throws exception. - if (this.fsOk) { + // flag may be changed when closing regions throws exception. + if (this.dataFsOk) { shutdownWAL(!abortRequested); } @@ -1170,8 +1149,8 @@ public void run() { if (this.rpcClient != null) { this.rpcClient.close(); } - if (this.leases != null) { - this.leases.close(); + if (this.leaseManager != null) { + this.leaseManager.close(); } if (this.pauseMonitor != null) { this.pauseMonitor.stop(); @@ -1188,6 +1167,7 @@ public void run() { try { deleteMyEphemeralNode(); } catch (KeeperException.NoNodeException nn) { + // pass } catch (KeeperException e) { LOG.warn("Failed deleting my ephemeral node", e); } @@ -1307,7 +1287,7 @@ public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { * @param rss The stub to send to the Master * @param regionSizeStore The store containing region sizes */ - void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, + private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, RegionSizeStore regionSizeStore) throws ServiceException { RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); @@ -1321,7 +1301,7 @@ void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, /** * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. * - * @param regionSizeStore The size in bytes of regions + * @param regionSizes The size in bytes of regions * @return The corresponding protocol buffer message. */ RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { @@ -1347,7 +1327,7 @@ RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { .build(); } - ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) + private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) throws IOException { // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests // per second, and other metrics As long as metrics are part of ServerLoad it's best to use @@ -1381,9 +1361,8 @@ ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long report for (HRegion region : regions) { if (region.getCoprocessorHost() != null) { Set regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); - Iterator iterator = regionCoprocessors.iterator(); - while (iterator.hasNext()) { - serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build()); + for (String regionCoprocessor : regionCoprocessors) { + serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); } } serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); @@ -1420,7 +1399,7 @@ ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long report return serverLoad.build(); } - String getOnlineRegionsAsPrintableString() { + private String getOnlineRegionsAsPrintableString() { StringBuilder sb = new StringBuilder(); for (Region r: this.onlineRegions.values()) { if (sb.length() > 0) sb.append(", "); @@ -1439,7 +1418,7 @@ private void waitOnAllRegionsToClose(final boolean abort) { Set closedRegions = new HashSet<>(); boolean interrupted = false; try { - while (!isOnlineRegionsEmpty()) { + while (!onlineRegions.isEmpty()) { int count = getNumberOfOnlineRegions(); // Only print a message if the count of regions has changed. if (count != lastCount) { @@ -1469,7 +1448,7 @@ private void waitOnAllRegionsToClose(final boolean abort) { } // No regions in RIT, we could stop waiting now. if (this.regionsInTransitionInRS.isEmpty()) { - if (!isOnlineRegionsEmpty()) { + if (!onlineRegions.isEmpty()) { LOG.info("We were exiting though online regions are not empty," + " because some regions failed closing"); } @@ -1486,7 +1465,7 @@ private void waitOnAllRegionsToClose(final boolean abort) { } } - private boolean sleep(long millis) { + private static boolean sleep(long millis) { boolean interrupted = false; try { Thread.sleep(millis); @@ -1513,7 +1492,7 @@ private void shutdownWAL(final boolean close) { } } - /* + /** * Run init. Sets up wal and starts up all server threads. * * @param c Extra configuration. @@ -1577,7 +1556,8 @@ protected void handleReportForDutyResponse(final RegionServerStartupResponse c) // This call sets up an initialized replication and WAL. Later we start it up. setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set - this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); + final MetricsTable metricsTable = + new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); this.metricsRegionServer = new MetricsRegionServer( new MetricsRegionServerWrapperImpl(this), conf, metricsTable); // Now that we have a metrics source, start the pause monitor @@ -1640,7 +1620,7 @@ private void startHeapMemoryManager() { } } - private void createMyEphemeralNode() throws KeeperException, IOException { + private void createMyEphemeralNode() throws KeeperException { RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); @@ -1657,13 +1637,11 @@ public RegionServerAccounting getRegionServerAccounting() { return regionServerAccounting; } - /* + /** * @param r Region to get RegionLoad for. * @param regionLoadBldr the RegionLoad.Builder, can be null * @param regionSpecifier the RegionSpecifier.Builder, can be null * @return RegionLoad instance. - * - * @throws IOException */ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, RegionSpecifier.Builder regionSpecifier) throws IOException { @@ -1737,16 +1715,12 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, return regionLoadBldr.build(); } - /** - * @param encodedRegionName - * @return An instance of RegionLoad. - */ public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { HRegion r = onlineRegions.get(encodedRegionName); return r != null ? createRegionLoad(r, null, null) : null; } - /* + /** * Inner class that runs on a long period checking if regions need compaction. */ private static class CompactionChecker extends ScheduledChore { @@ -1811,18 +1785,19 @@ protected void chore() { } } - static class PeriodicMemStoreFlusher extends ScheduledChore { - final HRegionServer server; - final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds - final static int MIN_DELAY_TIME = 0; // millisec + private static class PeriodicMemStoreFlusher extends ScheduledChore { + private final HRegionServer server; + private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds + private final static int MIN_DELAY_TIME = 0; // millisec + private final long rangeOfDelayMs; - final int rangeOfDelay; - public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { + PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { super("MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; - this.rangeOfDelay = this.server.conf.getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", - RANGE_OF_DELAY)*1000; + final long configuredRangeOfDelay = server.getConfiguration().getInt( + "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); + this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); } @Override @@ -1833,7 +1808,7 @@ protected void chore() { if (r.shouldFlush(whyFlush)) { FlushRequester requester = server.getFlushRequester(); if (requester != null) { - long randomDelay = (long) RandomUtils.nextInt(0, rangeOfDelay) + MIN_DELAY_TIME; + long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME; //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. @@ -1889,7 +1864,6 @@ private void setupWALAndReplication() throws IOException { /** * Start up replication source and sink handlers. - * @throws IOException */ private void startReplicationService() throws IOException { if (this.replicationSourceHandler == this.replicationSinkHandler && @@ -1905,11 +1879,6 @@ private void startReplicationService() throws IOException { } } - - public MetricsRegionServer getRegionServerMetrics() { - return this.metricsRegionServer; - } - /** * @return Master address tracker instance. */ @@ -1917,7 +1886,7 @@ public MasterAddressTracker getMasterAddressTracker() { return this.masterAddressTracker; } - /* + /** * Start maintenance Threads, Server, Worker and lease checker threads. * Start all threads we need to run. This is called after we've successfully * registered with the Master. @@ -2007,8 +1976,8 @@ private void startServices() throws IOException { // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. - Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", - uncaughtExceptionHandler); + Threads.setDaemonThreadRunning(this.leaseManager.getThread(), getName() + ".leaseChecker", + uncaughtExceptionHandler); // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for @@ -2036,7 +2005,7 @@ private void startServices() throws IOException { initializeMemStoreChunkCreator(); } - private void initializeThreads() throws IOException { + private void initializeThreads() { // Cache flushing thread. this.cacheFlusher = new MemStoreFlusher(conf, this); @@ -2047,7 +2016,7 @@ private void initializeThreads() throws IOException { // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); - this.leases = new Leases(this.threadWakeFrequency); + this.leaseManager = new LeaseManager(this.threadWakeFrequency); // Create the thread to clean the moved regions list movedRegionsCleaner = MovedRegionsCleaner.create(this); @@ -2092,10 +2061,8 @@ private void registerConfigurationObservers() { /** * Puts up the webui. - * @return Returns final port -- maybe different from what we started with. - * @throws IOException */ - private int putUpWebUI() throws IOException { + private void putUpWebUI() throws IOException { int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT); String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); @@ -2106,7 +2073,9 @@ private int putUpWebUI() throws IOException { addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); } // -1 is for disabling info server - if (port < 0) return port; + if (port < 0) { + return; + } if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { String msg = @@ -2117,8 +2086,7 @@ private int putUpWebUI() throws IOException { throw new IOException(msg); } // check if auto port bind enabled - boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, - false); + boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); while (true) { try { this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); @@ -2143,19 +2111,18 @@ private int putUpWebUI() throws IOException { HConstants.DEFAULT_MASTER_INFOPORT); conf.setInt("hbase.master.info.port.orig", masterInfoPort); conf.setInt(HConstants.MASTER_INFO_PORT, port); - return port; } /* * Verify that server is healthy */ private boolean isHealthy() { - if (!fsOk) { + if (!dataFsOk) { // File system problem return false; } // Verify that all threads are alive - boolean healthy = (this.leases == null || this.leases.isAlive()) + boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) && (this.walRoller == null || this.walRoller.isAlive()) && (this.compactionChecker == null || this.compactionChecker.isScheduled()) @@ -2167,7 +2134,7 @@ private boolean isHealthy() { } @Override - public List getWALs() throws IOException { + public List getWALs() { return walFactory.getWALs(); } @@ -2190,6 +2157,10 @@ public LogRoller getWalRoller() { return walRoller; } + WALFactory getWalFactory() { + return walFactory; + } + @Override public Connection getConnection() { return getClusterConnection(); @@ -2280,42 +2251,48 @@ public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOEx LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); } - @Override - public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { - TransitionCode code = context.getCode(); - long openSeqNum = context.getOpenSeqNum(); + /** + * Helper method for use in tests. Skip the region transition report when there's no master + * around to receive it. + */ + private boolean skipReportingTransition(final RegionStateTransitionContext context) { + final TransitionCode code = context.getCode(); + final long openSeqNum = context.getOpenSeqNum(); long masterSystemTime = context.getMasterSystemTime(); - RegionInfo[] hris = context.getHris(); - long[] procIds = context.getProcIds(); + final RegionInfo[] hris = context.getHris(); - if (TEST_SKIP_REPORTING_TRANSITION) { - // This is for testing only in case there is no master - // to handle the region transition report at all. - if (code == TransitionCode.OPENED) { - Preconditions.checkArgument(hris != null && hris.length == 1); - if (hris[0].isMetaRegion()) { - try { - MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, - hris[0].getReplicaId(),State.OPEN); - } catch (KeeperException e) { - LOG.info("Failed to update meta location", e); - return false; - } - } else { - try { - MetaTableAccessor.updateRegionLocation(clusterConnection, + if (code == TransitionCode.OPENED) { + Preconditions.checkArgument(hris != null && hris.length == 1); + if (hris[0].isMetaRegion()) { + try { + MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, + hris[0].getReplicaId(),State.OPEN); + } catch (KeeperException e) { + LOG.info("Failed to update meta location", e); + return false; + } + } else { + try { + MetaTableAccessor.updateRegionLocation(clusterConnection, hris[0], serverName, openSeqNum, masterSystemTime); - } catch (IOException e) { - LOG.info("Failed to update meta", e); - return false; - } + } catch (IOException e) { + LOG.info("Failed to update meta", e); + return false; } } - return true; } + return true; + } + + private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest( + final RegionStateTransitionContext context) { + final TransitionCode code = context.getCode(); + final long openSeqNum = context.getOpenSeqNum(); + final RegionInfo[] hris = context.getHris(); + final long[] procIds = context.getProcIds(); ReportRegionStateTransitionRequest.Builder builder = - ReportRegionStateTransitionRequest.newBuilder(); + ReportRegionStateTransitionRequest.newBuilder(); builder.setServer(ProtobufUtil.toServerName(serverName)); RegionStateTransition.Builder transition = builder.addTransitionBuilder(); transition.setTransitionCode(code); @@ -2328,9 +2305,22 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co for (long procId: procIds) { transition.addProcId(procId); } - ReportRegionStateTransitionRequest request = builder.build(); + + return builder.build(); + } + + @Override + public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { + if (TEST_SKIP_REPORTING_TRANSITION) { + return skipReportingTransition(context); + } + final ReportRegionStateTransitionRequest request = + createReportRegionStateTransitionRequest(context); + + // Time to pause if master says 'please hold'. Make configurable if needed. + final long initPauseTime = 1000; int tries = 0; - long pauseTime = INIT_PAUSE_TIME_MS; + long pauseTime; // Keep looping till we get an error. We want to send reports even though server is going down. // Only go down if clusterConnection is null. It is set to null almost as last thing as the // HRegionServer does down. @@ -2361,9 +2351,9 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co || ioe instanceof CallQueueTooBigException; if (pause) { // Do backoff else we flood the Master with requests. - pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); + pauseTime = ConnectionUtils.getPauseTime(initPauseTime, tries); } else { - pauseTime = INIT_PAUSE_TIME_MS; // Reset. + pauseTime = initPauseTime; // Reset. } LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + @@ -2385,7 +2375,7 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co * Trigger a flush in the primary region replica if this region is a secondary replica. Does not * block this thread. See RegionReplicaFlushHandler for details. */ - void triggerFlushInPrimaryRegion(final HRegion region) { + private void triggerFlushInPrimaryRegion(final HRegion region) { if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { return; } @@ -2497,12 +2487,6 @@ protected void kill() { abort("Simulated kill"); } - /** - * Called on stop/abort before closing the cluster connection and meta locator. - */ - protected void sendShutdownInterrupt() { - } - // Limits the time spent in the shutdown process. private void scheduleAbortTimer() { if (this.abortMonitor == null) { @@ -2593,8 +2577,7 @@ ReplicationSinkService getReplicationSinkService() { * * @return master + port, or null if server has been stopped */ - @VisibleForTesting - protected synchronized ServerName createRegionServerStatusStub() { + private synchronized ServerName createRegionServerStatusStub() { // Create RS stub without refreshing the master node from ZK, use cached data return createRegionServerStatusStub(false); } @@ -2758,23 +2741,13 @@ public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { } } - /** - * Closes all regions. Called on our way out. - * Assumes that its not possible for new regions to be added to onlineRegions - * while this method runs. - */ - protected void closeAllRegions(final boolean abort) { - closeUserRegions(abort); - closeMetaTableRegions(abort); - } - /** * Close meta region if we carry it * @param abort Whether we're running an abort. */ - void closeMetaTableRegions(final boolean abort) { + private void closeMetaTableRegions(final boolean abort) { HRegion meta = null; - this.lock.writeLock().lock(); + this.onlineRegionsLock.writeLock().lock(); try { for (Map.Entry e: onlineRegions.entrySet()) { RegionInfo hri = e.getValue().getRegionInfo(); @@ -2784,7 +2757,7 @@ void closeMetaTableRegions(final boolean abort) { if (meta != null) break; } } finally { - this.lock.writeLock().unlock(); + this.onlineRegionsLock.writeLock().unlock(); } if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); } @@ -2795,8 +2768,8 @@ void closeMetaTableRegions(final boolean abort) { * that are already closed or that are closing. * @param abort Whether we're running an abort. */ - void closeUserRegions(final boolean abort) { - this.lock.writeLock().lock(); + private void closeUserRegions(final boolean abort) { + this.onlineRegionsLock.writeLock().lock(); try { for (Map.Entry e: this.onlineRegions.entrySet()) { HRegion r = e.getValue(); @@ -2806,7 +2779,7 @@ void closeUserRegions(final boolean abort) { } } } finally { - this.lock.writeLock().unlock(); + this.onlineRegionsLock.writeLock().unlock(); } } @@ -2828,28 +2801,19 @@ public boolean isStopping() { return this.stopping; } - /** - * - * @return the configuration - */ @Override public Configuration getConfiguration() { return conf; } - /** @return the write lock for the server */ - ReentrantReadWriteLock.WriteLock getWriteLock() { - return lock.writeLock(); + protected Map getOnlineRegions() { + return this.onlineRegions; } public int getNumberOfOnlineRegions() { return this.onlineRegions.size(); } - boolean isOnlineRegionsEmpty() { - return this.onlineRegions.isEmpty(); - } - /** * For tests, web ui and metrics. * This method will only work if HRegionServer is in the same JVM as client; @@ -2873,13 +2837,7 @@ public void addRegion(HRegion region) { */ SortedMap getCopyOfOnlineRegionsSortedByOffHeapSize() { // we'll sort the regions in reverse - SortedMap sortedRegions = new TreeMap<>( - new Comparator() { - @Override - public int compare(Long a, Long b) { - return -1 * a.compareTo(b); - } - }); + SortedMap sortedRegions = new TreeMap<>(Comparator.reverseOrder()); // Copy over all regions. Regions are sorted by size with biggest first. for (HRegion region : this.onlineRegions.values()) { sortedRegions.put(region.getMemStoreOffHeapSize(), region); @@ -2894,13 +2852,7 @@ public int compare(Long a, Long b) { */ SortedMap getCopyOfOnlineRegionsSortedByOnHeapSize() { // we'll sort the regions in reverse - SortedMap sortedRegions = new TreeMap<>( - new Comparator() { - @Override - public int compare(Long a, Long b) { - return -1 * a.compareTo(b); - } - }); + SortedMap sortedRegions = new TreeMap<>(Comparator.reverseOrder()); // Copy over all regions. Regions are sorted by size with biggest first. for (HRegion region : this.onlineRegions.values()) { sortedRegions.put(region.getMemStoreHeapSize(), region); @@ -2927,23 +2879,27 @@ public CompactionRequester getCompactionRequestor() { } @Override - public Leases getLeases() { - return leases; + public LeaseManager getLeaseManager() { + return leaseManager; } /** * @return Return the rootDir. */ - protected Path getRootDir() { - return rootDir; + protected Path getDataRootDir() { + return dataRootDir; } - /** - * @return Return the fs. - */ @Override public FileSystem getFileSystem() { - return fs; + return dataFs; + } + + /** + * @return {@code true} when the data file system is available, {@code false} otherwise. + */ + boolean isDataFileSystemOk() { + return this.dataFsOk; } /** @@ -2965,15 +2921,6 @@ public String toString() { return getServerName().toString(); } - /** - * Interval at which threads should run - * - * @return the interval - */ - public int getThreadWakeFrequency() { - return threadWakeFrequency; - } - @Override public ZKWatcher getZooKeeper() { return zooKeeper; @@ -3051,7 +2998,7 @@ private static void createNewReplicationInstance(Configuration conf, HRegionServ private static T newReplicationInstance(String classname, Class xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir, WALProvider walProvider) throws IOException { - Class clazz = null; + final Class clazz; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); clazz = Class.forName(classname, true, classLoader).asSubclass(xface); @@ -3079,18 +3026,15 @@ public Map getWalGroupsReplicationStatus(){ /** * Utility for constructing an instance of the passed HRegionServer class. - * - * @param regionServerClass - * @param conf2 - * @return HRegionServer instance. */ - public static HRegionServer constructRegionServer( - Class regionServerClass, - final Configuration conf2) { + static HRegionServer constructRegionServer( + final Class regionServerClass, + final Configuration conf + ) { try { - Constructor c = regionServerClass - .getConstructor(Configuration.class); - return c.newInstance(conf2); + Constructor c = + regionServerClass.getConstructor(Configuration.class); + return c.newInstance(conf); } catch (Exception e) { throw new RuntimeException("Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); @@ -3100,7 +3044,7 @@ public static HRegionServer constructRegionServer( /** * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ - public static void main(String[] args) throws Exception { + public static void main(String[] args) { LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); @@ -3118,7 +3062,7 @@ public static void main(String[] args) throws Exception { * closed during a disable, etc., it will not be included in the returned list. * So, the returned list may not necessarily be ALL regions in this table, its * all the ONLINE regions in the table. - * @param tableName + * @param tableName table to limit the scope of the query * @return Online regions from tableName */ @Override @@ -3137,10 +3081,10 @@ public List getRegions(TableName tableName) { @Override public List getRegions() { - List allRegions = new ArrayList<>(); + List allRegions; synchronized (this.onlineRegions) { // Return a clone copy of the onlineRegions - allRegions.addAll(onlineRegions.values()); + allRegions = new ArrayList<>(onlineRegions.values()); } return allRegions; } @@ -3160,7 +3104,6 @@ public Set getOnlineTables() { return tables; } - // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). public String[] getRegionServerCoprocessors() { TreeSet coprocessors = new TreeSet<>(); try { @@ -3182,7 +3125,7 @@ public String[] getRegionServerCoprocessors() { } } coprocessors.addAll(rsHost.getCoprocessors()); - return coprocessors.toArray(new String[coprocessors.size()]); + return coprocessors.toArray(new String[0]); } /** @@ -3279,43 +3222,6 @@ protected boolean closeRegion(String encodedName, final boolean abort, final Ser return true; } - /** - * Close and offline the region for split or merge - * - * @param regionEncodedName the name of the region(s) to close - * @return true if closed the region successfully. - * @throws IOException - */ - protected boolean closeAndOfflineRegionForSplitOrMerge(final List regionEncodedName) - throws IOException { - for (int i = 0; i < regionEncodedName.size(); ++i) { - HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); - if (regionToClose != null) { - Map> hstoreFiles = null; - Exception exceptionToThrow = null; - try { - hstoreFiles = regionToClose.close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFiles == null) { - // The region was closed by someone else - exceptionToThrow = - new IOException("Failed to close region: already closed by another thread"); - } - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) { - throw (IOException) exceptionToThrow; - } - throw new IOException(exceptionToThrow); - } - // Offline the region - this.removeRegion(regionToClose, null); - } - } - return true; - } - /** * @return HRegion for the passed binary regionName or null if * named region is not member of the online regions. @@ -3325,10 +3231,6 @@ public HRegion getOnlineRegion(final byte[] regionName) { return this.onlineRegions.get(encodedRegionName); } - public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { - return this.regionFavoredNodesMap.get(encodedRegionName); - } - @Override public HRegion getRegion(final String encodedRegionName) { return this.onlineRegions.get(encodedRegionName); @@ -3354,10 +3256,8 @@ public boolean removeRegion(final HRegion r, ServerName destination) { /** * Protected Utility method for safely obtaining an HRegion handle. * - * @param regionName - * Name of online {@link HRegion} to return + * @param regionName Name of online {@link HRegion} to return * @return {@link HRegion} for regionName - * @throws NotServingRegionException */ protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { @@ -3370,7 +3270,7 @@ public HRegion getRegionByEncodedName(String encodedRegionName) return getRegionByEncodedName(null, encodedRegionName); } - protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) + private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) throws NotServingRegionException { HRegion region = this.onlineRegions.get(encodedRegionName); if (region == null) { @@ -3381,7 +3281,7 @@ protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegion Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); String regionNameStr = regionName == null? encodedRegionName: Bytes.toStringBinary(regionName); - if (isOpening != null && isOpening.booleanValue()) { + if (isOpening != null && isOpening) { throw new RegionOpeningException("Region " + regionNameStr + " is opening on " + this.serverName); } @@ -3391,14 +3291,12 @@ protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegion return region; } - /* + /** * Cleanup after Throwable caught invoking method. Converts t to * IOE if it isn't already. * * @param t Throwable - * * @param msg Message to log in error. Can be null. - * * @return Throwable converted to an IOE; methods can only let out IOEs. */ private Throwable cleanup(final Throwable t, final String msg) { @@ -3419,14 +3317,11 @@ private Throwable cleanup(final Throwable t, final String msg) { return t; } - /* - * @param t - * + /** * @param msg Message to put in new IOE if passed t is not an IOE - * * @return Make t an IOE if it isn't already. */ - protected IOException convertThrowableToIOE(final Throwable t, final String msg) { + private IOException convertThrowableToIOE(final Throwable t, final String msg) { return (t instanceof IOException ? (IOException) t : msg == null || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); } @@ -3437,16 +3332,16 @@ protected IOException convertThrowableToIOE(final Throwable t, final String msg) * * @return false if file system is not available */ - public boolean checkFileSystem() { - if (this.fsOk && this.fs != null) { + boolean checkFileSystem() { + if (this.dataFsOk && this.dataFs != null) { try { - FSUtils.checkFileSystemAvailable(this.fs); + FSUtils.checkFileSystemAvailable(this.dataFs); } catch (IOException e) { abort("File System not available", e); - this.fsOk = false; + this.dataFsOk = false; } } - return this.fsOk; + return this.dataFsOk; } @Override @@ -3465,7 +3360,7 @@ public void updateRegionFavoredNodesMapping(String encodedRegionName, /** * Return the favored nodes for a region given its encoded name. Look at the * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] - * @param encodedRegionName + * * @return array of favored locations */ @Override @@ -3481,12 +3376,12 @@ public ServerNonceManager getNonceManager() { private static class MovedRegionInfo { private final ServerName serverName; private final long seqNum; - private final long ts; + private final long moveTime; - public MovedRegionInfo(ServerName serverName, long closeSeqNum) { + MovedRegionInfo(ServerName serverName, long closeSeqNum) { this.serverName = serverName; this.seqNum = closeSeqNum; - ts = EnvironmentEdgeManager.currentTime(); + this.moveTime = EnvironmentEdgeManager.currentTime(); } public ServerName getServerName() { @@ -3497,20 +3392,24 @@ public long getSeqNum() { return seqNum; } - public long getMoveTime() { - return ts; + long getMoveTime() { + return moveTime; } } - // This map will contains all the regions that we closed for a move. - // We add the time it was moved as we don't want to keep too old information - protected Map movedRegions = new ConcurrentHashMap<>(3000); + /** + * This map will contains all the regions that we closed for a move. + * We add the time it was moved as we don't want to keep too old information + */ + private Map movedRegions = new ConcurrentHashMap<>(3000); - // We need a timeout. If not there is a risk of giving a wrong information: this would double - // the number of network calls instead of reducing them. + /** + * We need a timeout. If not there is a risk of giving a wrong information: this would double + * the number of network calls instead of reducing them. + */ private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); - protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { + private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { if (ServerName.isSameAddress(destination, this.getServerName())) { LOG.warn("Not adding moved region record: " + encodedName + " to self."); return; @@ -3544,14 +3443,8 @@ private MovedRegionInfo getMovedRegion(final String encodedRegionName) { */ protected void cleanMovedRegions() { final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; - Iterator> it = movedRegions.entrySet().iterator(); - while (it.hasNext()){ - Map.Entry e = it.next(); - if (e.getValue().getMoveTime() < cutOff) { - it.remove(); - } - } + movedRegions.entrySet().removeIf(e -> e.getValue().getMoveTime() < cutOff); } /* @@ -3619,7 +3512,7 @@ public CompactSplit getCompactSplitThread() { return this.compactSplitThread; } - public CoprocessorServiceResponse execRegionServerService( + CoprocessorServiceResponse execRegionServerService( @SuppressWarnings("UnusedParameters") final RpcController controller, final CoprocessorServiceRequest serviceRequest) throws ServiceException { try { @@ -3646,13 +3539,9 @@ public CoprocessorServiceResponse execRegionServerService( CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); final com.google.protobuf.Message.Builder responseBuilder = service.getResponsePrototype(methodDesc).newBuilderForType(); - service.callMethod(methodDesc, serviceController, request, - new com.google.protobuf.RpcCallback() { - @Override - public void run(com.google.protobuf.Message message) { - if (message != null) { - responseBuilder.mergeFrom(message); - } + service.callMethod(methodDesc, serviceController, request, message -> { + if (message != null) { + responseBuilder.mergeFrom(message); } }); IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); @@ -3698,7 +3587,8 @@ public ZKPermissionWatcher getZKPermissionWatcher() { /** * @return : Returns the ConfigurationManager object for testing purposes. */ - protected ConfigurationManager getConfigurationManager() { + @VisibleForTesting + ConfigurationManager getConfigurationManager() { return configurationManager; } @@ -3713,14 +3603,14 @@ public TableDescriptors getTableDescriptors() { /** * Reload the configuration from disk. */ - public void updateConfiguration() { + void updateConfiguration() { LOG.info("Reloading the configuration from disk."); // Reload the configuration from disk. conf.reloadConfiguration(); configurationManager.notifyAllObservers(conf); } - public CacheEvictionStats clearRegionBlockCache(Region region) { + CacheEvictionStats clearRegionBlockCache(Region region) { long evictedBlocks = 0; for(Store store : region.getStores()) { @@ -3753,6 +3643,10 @@ public HeapMemoryManager getHeapMemoryManager() { return hMemManager; } + MemStoreFlusher getMemStoreFlusher() { + return cacheFlusher; + } + /** * For testing * @return whether all wal roll request finished for this regionserver @@ -3801,8 +3695,8 @@ public SecureBulkLoadManager getSecureBulkLoadManager() { } @Override - public EntityLock regionLock(List regionInfos, String description, - Abortable abort) throws IOException { + public EntityLock regionLock(final List regionInfos, final String description, + final Abortable abort) { return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) .regionLock(regionInfos, description, abort); } @@ -3862,7 +3756,7 @@ public Connection createConnection(Configuration conf) throws IOException { this.rpcServices, this.rpcServices); } - public void executeProcedure(long procId, RSProcedureCallable callable) { + void executeProcedure(long procId, RSProcedureCallable callable) { executorService.submit(new RSProcedureHandler(this, procId, callable)); } @@ -3871,7 +3765,8 @@ public void remoteProcedureComplete(long procId, Throwable error) { } void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { - RegionServerStatusService.BlockingInterface rss = rssStub; + RegionServerStatusService.BlockingInterface rss; + // TODO: juggling class state with an instance variable, outside of a synchronized block :'( for (;;) { rss = rssStub; if (rss != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseManager.java similarity index 89% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseManager.java index 0afa3813edd6..15bc2afa346d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseManager.java @@ -53,29 +53,25 @@ * sleep time which is invariant. */ @InterfaceAudience.Private -public class Leases extends HasThread { - private static final Logger LOG = LoggerFactory.getLogger(Leases.class.getName()); - public static final int MIN_WAIT_TIME = 100; - private final Map leases = new ConcurrentHashMap<>(); +public class LeaseManager extends HasThread { + private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName()); + private static final int MIN_WAIT_TIME = 100; - protected final int leaseCheckFrequency; - protected volatile boolean stopRequested = false; + private final Map leases = new ConcurrentHashMap<>(); + private final int leaseCheckFrequency; + private volatile boolean stopRequested = false; /** - * Creates a lease monitor + * Creates a lease manager. * - * @param leaseCheckFrequency - how often the lease should be checked - * (milliseconds) + * @param leaseCheckFrequency - how often the lease should be checked (milliseconds) */ - public Leases(final int leaseCheckFrequency) { - super("RegionServerLeases"); // thread name + public LeaseManager(final int leaseCheckFrequency) { + super("RegionServer.LeaseManager"); // thread name this.leaseCheckFrequency = leaseCheckFrequency; setDaemon(true); } - /** - * @see Thread#run() - */ @Override public void run() { long toWait = leaseCheckFrequency; @@ -93,9 +89,7 @@ public void run() { toWait = Math.max(MIN_WAIT_TIME, toWait); Thread.sleep(toWait); - } catch (InterruptedException e) { - continue; - } catch (ConcurrentModificationException e) { + } catch (InterruptedException | ConcurrentModificationException e) { continue; } catch (Throwable e) { LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e); @@ -156,7 +150,6 @@ public void close() { * @param leaseTimeoutPeriod length of the lease in milliseconds * @param listener listener that will process lease expirations * @return The lease created. - * @throws LeaseStillHeldException */ public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) throws LeaseStillHeldException { @@ -167,8 +160,6 @@ public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseLi /** * Inserts lease. Resets expiration before insertion. - * @param lease - * @throws LeaseStillHeldException */ public void addLease(final Lease lease) throws LeaseStillHeldException { if (this.stopRequested) { @@ -184,8 +175,7 @@ public void addLease(final Lease lease) throws LeaseStillHeldException { /** * Renew a lease * - * @param leaseName name of lease - * @throws LeaseException + * @param leaseName name of the lease */ public void renewLease(final String leaseName) throws LeaseException { if (this.stopRequested) { @@ -202,20 +192,17 @@ public void renewLease(final String leaseName) throws LeaseException { /** * Client explicitly cancels a lease. + * * @param leaseName name of lease - * @throws org.apache.hadoop.hbase.regionserver.LeaseException */ public void cancelLease(final String leaseName) throws LeaseException { removeLease(leaseName); } /** - * Remove named lease. - * Lease is removed from the map of leases. - * Lease can be reinserted using {@link #addLease(Lease)} + * Remove named lease. Lease is removed from the map of leases. * * @param leaseName name of lease - * @throws org.apache.hadoop.hbase.regionserver.LeaseException * @return Removed lease */ Lease removeLease(final String leaseName) throws LeaseException { @@ -234,9 +221,6 @@ Lease removeLease(final String leaseName) throws LeaseException { public static class LeaseStillHeldException extends IOException { private final String leaseName; - /** - * @param name - */ public LeaseStillHeldException(final String name) { this.leaseName = name; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index a53008de3d98..feb5f36f4036 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -706,7 +706,7 @@ public void reclaimMemStoreMemory() { try { flushType = isAboveHighWaterMark(); while (flushType != FlushType.NORMAL && !server.isStopped()) { - server.cacheFlusher.setFlushType(flushType); + server.getMemStoreFlusher().setFlushType(flushType); if (!blocked) { startTime = EnvironmentEdgeManager.currentTime(); if (!server.getRegionServerAccounting().isOffheap()) { @@ -764,7 +764,7 @@ public void reclaimMemStoreMemory() { } else { flushType = isAboveLowWaterMark(); if (flushType != FlushType.NORMAL) { - server.cacheFlusher.setFlushType(flushType); + server.getMemStoreFlusher().setFlushType(flushType); wakeupFlushThread(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index a71429ba726e..97474bea3e82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -128,7 +128,7 @@ public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) { initBlockCache(); initMobFileCache(); - this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD, + this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD, HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD); this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor(); @@ -264,10 +264,10 @@ public int getLargeCompactionQueueSize() { @Override public int getFlushQueueSize() { //If there is no flusher there should be no queue. - if (this.regionServer.cacheFlusher == null) { + if (this.regionServer.getMemStoreFlusher() == null) { return 0; } - return this.regionServer.cacheFlusher.getFlushQueueSize(); + return this.regionServer.getMemStoreFlusher().getFlushQueueSize(); } @Override @@ -538,10 +538,10 @@ public double getPercentFileLocalSecondaryRegions() { @Override public long getUpdatesBlockedTime() { - if (this.regionServer.cacheFlusher == null) { + if (this.regionServer.getMemStoreFlusher() == null) { return 0; } - return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().sum(); + return this.regionServer.getMemStoreFlusher().getUpdatesBlockedMsHighWater().sum(); } @Override @@ -805,8 +805,8 @@ synchronized public void run() { } lastRan = currentTime; - WALProvider provider = regionServer.walFactory.getWALProvider(); - WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider(); + final WALProvider provider = regionServer.getWalFactory().getWALProvider(); + final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider(); numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) + (metaProvider == null ? 0 : metaProvider.getNumLogFiles()); walFileSize = (provider == null ? 0 : provider.getLogFileSize()) + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java index 56b759822384..ac74cd89d54e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java @@ -48,7 +48,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) { this.regionServer = regionServer; - this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD, + this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD, HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000; this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor(); this.runnable = new TableMetricsWrapperRunnable(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java index 2852ecf8a337..d83ae47768d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java @@ -134,11 +134,10 @@ public static void dumpQueue(HRegionServer hrs, PrintWriter out) out.println(hrs.compactSplitThread.dumpQueue()); } - if (hrs.cacheFlusher != null) { + if (hrs.getMemStoreFlusher() != null) { // 2. Print out flush Queue - out.println("\nFlush Queue summary: " - + hrs.cacheFlusher.toString()); - out.println(hrs.cacheFlusher.dumpQueue()); + out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString()); + out.println(hrs.getMemStoreFlusher().dumpQueue()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 8730c3326edc..3b2e58118389 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -116,8 +116,8 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; -import org.apache.hadoop.hbase.regionserver.Leases.Lease; -import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; +import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; @@ -395,7 +395,9 @@ public void run() throws IOException { // We're done. On way out re-add the above removed lease. The lease was temp removed for this // Rpc call and we are at end of the call now. Time to add it back. if (scanners.containsKey(scannerName)) { - if (lease != null) regionServer.leases.addLease(lease); + if (lease != null) { + regionServer.getLeaseManager().addLease(lease); + } } } } @@ -611,7 +613,7 @@ private boolean checkAndRowMutate(final HRegion region, final List doNonAtomicRegionMutation(final HRegion region, r = region.get(get); } } finally { - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateGet( + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateGet( region.getTableDescriptor().getTableName(), EnvironmentEdgeManager.currentTime() - before); } @@ -1027,7 +1026,7 @@ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion r } if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); + regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } // HBASE-17924 @@ -1085,15 +1084,16 @@ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion r private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, boolean batchContainsDelete) { - if (regionServer.metricsRegionServer != null) { + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { - regionServer.metricsRegionServer - .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); + metricsRegionServer.updatePutBatch( + region.getTableDescriptor().getTableName(), after - starttime); } if (batchContainsDelete) { - regionServer.metricsRegionServer - .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); + metricsRegionServer.updateDeleteBatch( + region.getTableDescriptor().getTableName(), after - starttime); } } } @@ -1157,7 +1157,7 @@ private void updateMutationMetrics(HRegion region, long starttime, boolean batch } requestCount.increment(); if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); + regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } return region.batchReplay(mutations.toArray( new MutationReplay[mutations.size()]), replaySeqId); @@ -1198,16 +1198,18 @@ public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThre private final LogDelegate ld; - public RSRpcServices(HRegionServer rs) throws IOException { + public RSRpcServices(final HRegionServer rs) throws IOException { this(rs, DEFAULT_LOG_DELEGATE); } // Directly invoked only for testing - RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { + RSRpcServices(final HRegionServer rs, final LogDelegate ld) throws IOException { + final Configuration conf = rs.getConfiguration(); this.ld = ld; regionServer = rs; - rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); - RpcSchedulerFactory rpcSchedulerFactory; + rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + + final RpcSchedulerFactory rpcSchedulerFactory; try { rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) .getDeclaredConstructor().newInstance(); @@ -1216,22 +1218,22 @@ public RSRpcServices(HRegionServer rs) throws IOException { throw new IllegalArgumentException(e); } // Server to handle client requests. - InetSocketAddress initialIsa; - InetSocketAddress bindAddress; + final InetSocketAddress initialIsa; + final InetSocketAddress bindAddress; if(this instanceof MasterRpcServices) { - String hostname = getHostname(rs.conf, true); - int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); + String hostname = getHostname(conf, true); + int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); // Creation of a HSA will force a resolve. initialIsa = new InetSocketAddress(hostname, port); - bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port); + bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port); } else { - String hostname = getHostname(rs.conf, false); - int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT, + String hostname = getHostname(conf, false); + int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); // Creation of a HSA will force a resolve. initialIsa = new InetSocketAddress(hostname, port); - bindAddress = new InetSocketAddress( - rs.conf.get("hbase.regionserver.ipc.address", hostname), port); + bindAddress = + new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port); } if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initialIsa); @@ -1239,26 +1241,26 @@ public RSRpcServices(HRegionServer rs) throws IOException { priority = createPriority(); // Using Address means we don't get the IP too. Shorten it more even to just the host name // w/o the domain. - String name = rs.getProcessName() + "/" + + final String name = rs.getProcessName() + "/" + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); // Set how many times to retry talking to another server over Connection. - ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); - rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name); + ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); + rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); rpcServer.setRsRpcServices(this); - scannerLeaseTimeoutPeriod = rs.conf.getInt( + scannerLeaseTimeoutPeriod = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - maxScannerResultSize = rs.conf.getLong( + maxScannerResultSize = conf.getLong( HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); - rpcTimeout = rs.conf.getInt( + rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - minimumScanTimeLimitDelta = rs.conf.getLong( + minimumScanTimeLimitDelta = conf.getLong( REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); - InetSocketAddress address = rpcServer.getListenerAddress(); + final InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); } @@ -1271,9 +1273,13 @@ public RSRpcServices(HRegionServer rs) throws IOException { .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); } - protected RpcServerInterface createRpcServer(Server server, Configuration conf, - RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) - throws IOException { + protected RpcServerInterface createRpcServer( + final Server server, + final RpcSchedulerFactory rpcSchedulerFactory, + final InetSocketAddress bindAddress, + final String name + ) throws IOException { + final Configuration conf = server.getConfiguration(); boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); try { return RpcServerFactory.createRpcServer(server, name, getServices(), @@ -1287,7 +1293,8 @@ protected RpcServerInterface createRpcServer(Server server, Configuration conf, } protected Class getRpcSchedulerFactoryClass() { - return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + final Configuration conf = regionServer.getConfiguration(); + return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, SimpleRpcSchedulerFactory.class); } @@ -1406,8 +1413,8 @@ Object addSize(RpcCallContext context, Result r, Object lastBlock) { private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, HRegion r, boolean needCursor) throws LeaseStillHeldException { - Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, - new ScannerListener(scannerName)); + Lease lease = regionServer.getLeaseManager().createLease( + scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); RpcCallback closeCallback; if (s instanceof RpcCallback) { @@ -1514,7 +1521,7 @@ protected void checkOpen() throws IOException { if (regionServer.isStopped()) { throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); } - if (!regionServer.fsOk) { + if (!regionServer.isDataFileSystemOk()) { throw new RegionServerStoppedException("File system not available"); } if (!regionServer.isOnline()) { @@ -1753,12 +1760,12 @@ public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, try { checkOpen(); requestCount.increment(); - Map onlineRegions = regionServer.onlineRegions; + Map onlineRegions = regionServer.getOnlineRegions(); List list = new ArrayList<>(onlineRegions.size()); for (HRegion region: onlineRegions.values()) { list.add(region.getRegionInfo()); } - Collections.sort(list, RegionInfo.COMPARATOR); + list.sort(RegionInfo.COMPARATOR); return ResponseConverter.buildGetOnlineRegionResponse(list); } catch (IOException ie) { throw new ServiceException(ie); @@ -2008,7 +2015,7 @@ public OpenRegionResponse openRegion(final RpcController controller, throw new ServiceException(ie); } // We are assigning meta, wait a little for regionserver to finish initialization. - int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout long endTime = System.currentTimeMillis() + timeout; synchronized (regionServer.online) { @@ -2048,7 +2055,7 @@ public OpenRegionResponse openRegion(final RpcController controller, } LOG.info("Open " + region.getRegionNameAsString()); - final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent( + final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent( encodedNameBytes, Boolean.TRUE); if (Boolean.FALSE.equals(previous)) { @@ -2059,7 +2066,7 @@ public OpenRegionResponse openRegion(final RpcController controller, regionServer.abort(error); throw new IOException(error); } - regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE); + regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); } if (Boolean.TRUE.equals(previous)) { @@ -2261,9 +2268,9 @@ public ReplicateWALEntryResponse replay(final RpcController controller, } catch (IOException ie) { throw new ServiceException(ie); } finally { - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateReplay( - EnvironmentEdgeManager.currentTime() - before); + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); } } } @@ -2273,7 +2280,6 @@ public ReplicateWALEntryResponse replay(final RpcController controller, * * @param controller the RPC controller * @param request the request - * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) @@ -2281,12 +2287,12 @@ public ReplicateWALEntryResponse replicateWALEntry(final RpcController controlle final ReplicateWALEntryRequest request) throws ServiceException { try { checkOpen(); - if (regionServer.replicationSinkHandler != null) { + if (regionServer.getReplicationSinkService() != null) { requestCount.increment(); List entries = request.getEntryList(); CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); - regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, + regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); @@ -2312,7 +2318,7 @@ public RollWALWriterResponse rollWALWriter(final RpcController controller, checkOpen(); requestCount.increment(); regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); - regionServer.walRoller.requestRollAll(); + regionServer.getWalRoller().requestRollAll(); regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); return builder.build(); @@ -2412,7 +2418,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, } } else { // secure bulk load - map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); + map = regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(map != null); @@ -2433,9 +2439,9 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, } catch (IOException ie) { throw new ServiceException(ie); } finally { - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateBulkLoad( - EnvironmentEdgeManager.currentTime() - start); + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); } } } @@ -2449,7 +2455,7 @@ public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, HRegion region = getRegion(request.getRegion()); - String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); + String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request); PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); builder.setBulkToken(bulkToken); return builder.build(); @@ -2467,9 +2473,8 @@ public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, HRegion region = getRegion(request.getRegion()); - regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); - CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); - return response; + regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request); + return CleanupBulkLoadResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } @@ -2582,11 +2587,12 @@ public GetResponse get(final RpcController controller, } catch (IOException ie) { throw new ServiceException(ie); } finally { - MetricsRegionServer mrs = regionServer.metricsRegionServer; - if (mrs != null) { + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { TableDescriptor td = region != null? region.getTableDescriptor(): null; if (td != null) { - mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before); + metricsRegionServer.updateGet( + td.getTableName(), EnvironmentEdgeManager.currentTime() - before); } } if (quota != null) { @@ -2848,7 +2854,7 @@ public MutateResponse mutate(final RpcController rpcc, MutateResponse.Builder builder = MutateResponse.newBuilder(); MutationProto mutation = request.getMutation(); if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); + regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; Result r = null; @@ -2955,22 +2961,23 @@ public MutateResponse mutate(final RpcController rpcc, quota.close(); } // Update metrics - if (regionServer.metricsRegionServer != null && type != null) { + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null && type != null) { long after = EnvironmentEdgeManager.currentTime(); switch (type) { case DELETE: if (request.hasCondition()) { - regionServer.metricsRegionServer.updateCheckAndDelete(after - before); + metricsRegionServer.updateCheckAndDelete(after - before); } else { - regionServer.metricsRegionServer.updateDelete( + metricsRegionServer.updateDelete( region == null ? null : region.getRegionInfo().getTable(), after - before); } break; case PUT: if (request.hasCondition()) { - regionServer.metricsRegionServer.updateCheckAndPut(after - before); + metricsRegionServer.updateCheckAndPut(after - before); } else { - regionServer.metricsRegionServer.updatePut( + metricsRegionServer.updatePut( region == null ? null : region.getRegionInfo().getTable(),after - before); } break; @@ -3027,7 +3034,7 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep LOG.warn("Getting exception closing " + scannerName, e); } finally { try { - regionServer.leases.cancelLease(scannerName); + regionServer.getLeaseManager().cancelLease(scannerName); } catch (LeaseException e) { LOG.warn("Getting exception closing " + scannerName, e); } @@ -3088,9 +3095,9 @@ private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) } } - private void addScannerLeaseBack(Leases.Lease lease) { + private void addScannerLeaseBack(LeaseManager.Lease lease) { try { - regionServer.leases.addLease(lease); + regionServer.getLeaseManager().addLease(lease); } catch (LeaseStillHeldException e) { // should not happen as the scanner id is unique. throw new AssertionError(e); @@ -3306,10 +3313,11 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan long end = EnvironmentEdgeManager.currentTime(); long responseCellSize = context != null ? context.getResponseCellSize() : 0; region.getMetrics().updateScanTime(end - before); - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScanSize( + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateScanSize( region.getTableDescriptor().getTableName(), responseCellSize); - regionServer.metricsRegionServer.updateScanTime( + metricsRegionServer.updateScanTime( region.getTableDescriptor().getTableName(), end - before); } } finally { @@ -3348,9 +3356,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque LOG.debug( "Server shutting down and client tried to access missing scanner " + scannerName); } - if (regionServer.leases != null) { + final LeaseManager leaseManager = regionServer.getLeaseManager(); + if (leaseManager != null) { try { - regionServer.leases.cancelLease(scannerName); + leaseManager.cancelLease(scannerName); } catch (LeaseException le) { // No problem, ignore if (LOG.isTraceEnabled()) { @@ -3384,11 +3393,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } HRegion region = rsh.r; String scannerName = rsh.scannerName; - Leases.Lease lease; + LeaseManager.Lease lease; try { // Remove lease while its being processed in server; protects against case // where processing of request takes > lease expiration time. - lease = regionServer.leases.removeLease(scannerName); + lease = regionServer.getLeaseManager().removeLease(scannerName); } catch (LeaseException e) { throw new ServiceException(e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 9d6fefe6c908..bf531f47a381 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -195,7 +195,7 @@ public long[] getProcIds() { /** * @return The RegionServer's "Leases" service */ - Leases getLeases(); + LeaseManager getLeaseManager(); /** * @return hbase executor service diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index 9e806bb6117d..d699caed7cd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -59,14 +59,11 @@ public String toString() { } private void doSplitting() { - server.metricsRegionServer.incrSplitRequest(); + server.getMetrics().incrSplitRequest(); if (user != null && user.getUGI() != null) { - user.getUGI().doAs (new PrivilegedAction() { - @Override - public Void run() { - requestRegionSplit(); - return null; - } + user.getUGI().doAs((PrivilegedAction) () -> { + requestRegionSplit(); + return null; }); } else { requestRegionSplit(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java index b94df22c82da..4a65763e1c50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -101,7 +101,7 @@ public String getWalPath() { private void splitWal() throws IOException { SplitLogWorker.TaskExecutor.Status status = - SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory); + SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.getWalFactory()); if (status != SplitLogWorker.TaskExecutor.Status.DONE) { throw new IOException("Split WAL " + walPath + " failed at server "); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 708e8bc97e39..82db21268dde 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; -import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.LeaseManager; import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; @@ -228,7 +228,7 @@ public void setFileSystem(FileSystem hfs) { } @Override - public Leases getLeases() { + public LeaseManager getLeaseManager() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 7f8548801c6d..604797a91df2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; -import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.LeaseManager; import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; @@ -527,7 +527,7 @@ public List getRegions(TableName tableName) throws IOException { } @Override - public Leases getLeases() { + public LeaseManager getLeaseManager() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java index cf80d8e355a5..e0260f9f95a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java @@ -111,7 +111,7 @@ public void setupBasicMocks() { MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class); Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper(); - Mockito.doReturn(rms).when(master).getRegionServerMetrics(); + Mockito.doReturn(rms).when(master).getMetrics(); // Mock admin admin = Mockito.mock(Admin.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java index 5b633c5b12cf..dcc738018782 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java @@ -156,7 +156,7 @@ private void testCleanupAfterFailover(int compactNum) throws Exception { int walNum = rsServedTable.getWALs().size(); // Roll WAL - rsServedTable.walRoller.requestRollAll(); + rsServedTable.getWalRoller().requestRollAll(); // Flush again region.flush(true); // The WAL which contains compaction event marker should be archived diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 9bbce091440e..c5fcf8a1a843 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -501,7 +501,7 @@ public void testFlushingWhenLogRolling() throws Exception { assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); table.put(createPut(1, 12345678)); // Make numRolledLogFiles greater than maxLogs - desiredRegionAndServer.getSecond().walRoller.requestRollAll(); + desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); // Wait for some time till the flush caused by log rolling happens. TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index 2f4dce88fb61..d75d055e44fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -60,12 +59,13 @@ public class TestPriorityRpc { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestPriorityRpc.class); + private Configuration conf; private HRegionServer regionServer = null; private PriorityFunction priority = null; @Before public void setup() { - Configuration conf = HBaseConfiguration.create(); + conf = HBaseConfiguration.create(); conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.getDataTestDir(this.getClass().getName()); @@ -106,8 +106,8 @@ public void testQosFunctionForMeta() throws IOException { .thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable()); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + assertEquals( + HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, createSomeUser())); } @Test @@ -120,8 +120,7 @@ public void testQosFunctionWithoutKnownArgument() throws IOException { headerBuilder.setMethodName("foo"); RequestHeader header = headerBuilder.build(); PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); - assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, createSomeUser())); } @Test @@ -141,12 +140,12 @@ public void testQosFunctionForScanMethod() throws IOException { Mockito.when(mockRpc.getRegion(Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); // make isSystemTable return false - Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); + Mockito.when(mockRegionInfo.getTable()) + .thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - int qos = priority.getPriority(header, scanRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})); - assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); + final int qos = priority.getPriority(header, scanRequest, createSomeUser()); + assertEquals(Integer.toString(qos), qos, HConstants.NORMAL_QOS); //build a scan request with scannerID scanBuilder = ScanRequest.newBuilder(); @@ -158,18 +157,26 @@ public void testQosFunctionForScanMethod() throws IOException { Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); - Mockito.when(mockRegionInfo.getTable()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable()); + Mockito.when(mockRegionInfo.getTable()) + .thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable()); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + assertEquals( + HConstants.SYSTEMTABLE_QOS, + priority.getPriority(header, scanRequest, createSomeUser())); //the same as above but with non-meta region // make isSystemTable return false - Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); - assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + Mockito.when(mockRegionInfo.getTable()) + .thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); + assertEquals( + HConstants.NORMAL_QOS, + priority.getPriority(header, scanRequest, createSomeUser())); + } + + private User createSomeUser() { + return User.createUserForTesting(conf, "someuser", new String[] { "somegroup" }); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java index c3cf9d31be79..db281eccc6fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java @@ -110,7 +110,7 @@ public void setupBasicMocks() throws IOException, ServiceException { MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class); Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper(); - Mockito.doReturn(rms).when(rs).getRegionServerMetrics(); + Mockito.doReturn(rms).when(rs).getMetrics(); MetricsHBaseServer ms = Mockito.mock(MetricsHBaseServer.class); Mockito.doReturn(new MetricsHBaseServerWrapperStub()).when(ms).getHBaseServerWrapper(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java index 7a92664db648..544f05f96c49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java @@ -91,8 +91,11 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { // Wait the SCP of abort rs to finish - UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() - .filter(p -> p instanceof ServerCrashProcedure && p.isFinished()).count() > 0); + UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster() + .getMaster() + .getProcedures() + .stream() + .anyMatch(p -> p instanceof ServerCrashProcedure && p.isFinished())); UTIL.getAdmin().disableTable(TABLE_NAME); UTIL.getAdmin().deleteTable(TABLE_NAME); UTIL.shutdownMiniCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 79ee15dba6d9..d4d41fab0ebc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -123,11 +123,11 @@ public static void startCluster() throws Exception { while (cluster.getLiveRegionServerThreads().isEmpty() && cluster.getRegionServer(0) == null && - rs.getRegionServerMetrics() == null) { + rs.getMetrics() == null) { Threads.sleep(100); } rs = cluster.getRegionServer(0); - metricsRegionServer = rs.getRegionServerMetrics(); + metricsRegionServer = rs.getMetrics(); serverSource = metricsRegionServer.getMetricsSource(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index b9f89b72dae6..cfcf849df1a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -121,7 +121,7 @@ public static void stopMasterAndAssignMeta(HBaseTestingUtility HTU) while (true) { sn = MetaTableLocator.getMetaRegionLocation(zkw); if (sn != null && sn.equals(hrs.getServerName()) - && hrs.onlineRegions.containsKey( + && hrs.getOnlineRegions().containsKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { break; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java index 34da4d892da8..f760770f6264 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java @@ -122,13 +122,13 @@ public void accept(HRegion hRegion) { } } ; testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .secureBulkLoadManager.setFsCreatedListener(fsCreatedListener); + .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener); /// create table testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY)); /// prepare files Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0) - .getRegionServer().getRootDir(); + .getRegionServer().getDataRootDir(); Path dir1 = new Path(rootdir, "dir1"); prepareHFile(dir1, key1, value1); Path dir2 = new Path(rootdir, "dir2"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java index 3e5ab9b99d68..239d203c7350 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -125,13 +125,13 @@ Matchers. any(), Matchers. any(), Matchers.> any()); // Find region key; don't pick up key for hbase:meta by mistake. String key = null; - for (Map.Entry entry: rs.onlineRegions.entrySet()) { + for (Map.Entry entry: rs.getOnlineRegions().entrySet()) { if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) { key = entry.getKey(); break; } } - rs.onlineRegions.put(key, spiedRegion); + rs.getOnlineRegions().put(key, spiedRegion); Connection conn = testUtil.getConnection(); try (Table table = conn.getTable(tableName)) { @@ -141,7 +141,7 @@ Matchers. any(), Matchers. any(), long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore); assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); - rs.cacheFlusher.requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY); + rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY); synchronized (flushed) { while (!flushed.booleanValue()) { flushed.wait(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index e1a7ecdf5408..04cf392a7106 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -170,8 +170,11 @@ protected static void rollAllWALs() throws Exception { @Override public boolean evaluate() throws Exception { - return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() - .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); + return UTIL.getMiniHBaseCluster() + .getLiveRegionServerThreads() + .stream() + .map(RegionServerThread::getRegionServer) + .allMatch(HRegionServer::walRollRequestFinished); } @Override