diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index 2b07523844d7..f07b8e766126 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -133,7 +133,7 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;

Server Metrics

- <& ServerMetricsTmpl; mWrap = regionServer.getRegionServerMetrics().getRegionServerWrapper(); + <& ServerMetricsTmpl; mWrap = regionServer.getMetrics().getRegionServerWrapper(); mServerWrap = regionServer.getRpcServer().getMetrics().getHBaseServerWrapper(); bbAllocator = regionServer.getRpcServer().getByteBuffAllocator(); &>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java index e533b7531253..8963b19ee6eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java @@ -78,8 +78,7 @@ public interface Server extends Abortable, Stoppable { /** * @return Return the FileSystem object used (can return null!). */ - // TODO: On Master, return Master's. On RegionServer, return RegionServers. The FileSystems - // may differ. TODO. + // TODO: Distinguish between "dataFs" and "walFs". default FileSystem getFileSystem() { // This default is pretty dodgy! Configuration c = getConfiguration(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2a31a384a9f8..52ba32606859 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -515,8 +515,8 @@ public HMaster(final Configuration conf) this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); - LOG.info("hbase.rootdir=" + getRootDir() + - ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); + LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(), + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); // Disable usage of meta replicas in the master this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); @@ -3441,7 +3441,7 @@ public void reportMobCompactionEnd(TableName tableName) throws IOException { */ public void requestMobCompaction(TableName tableName, List columns, boolean allFiles) throws IOException { - mobCompactThread.requestMobCompaction(conf, fs, tableName, columns, allFiles); + mobCompactThread.requestMobCompaction(conf, getFileSystem(), tableName, columns, allFiles); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 4943d4de6563..4b3ba0e905a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -392,9 +392,10 @@ protected Class getRpcSchedulerFactoryClass() { } @Override - protected RpcServerInterface createRpcServer(Server server, Configuration conf, - RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) - throws IOException { + protected RpcServerInterface createRpcServer(final Server server, + final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, + final String name) throws IOException { + final Configuration conf = regionServer.getConfiguration(); // RpcServer at HM by default enable ByteBufferPool iff HM having user table region in it boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, LoadBalancer.isMasterCanHostUserRegions(conf)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f22137dad9c0..4567ed667af8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -188,7 +187,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.misc.Signal; -import sun.misc.SignalHandler; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -241,9 +239,6 @@ @SuppressWarnings({ "deprecation"}) public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId, ConfigurationObserver { - // Time to pause if master says 'please hold'. Make configurable if needed. - private static final int INIT_PAUSE_TIME_MS = 1000; - private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); /** @@ -253,10 +248,12 @@ public class HRegionServer extends HasThread implements @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") public static boolean TEST_SKIP_REPORTING_TRANSITION = false; - //RegionName vs current action in progress - //true - if open region action in progress - //false - if close region action in progress - protected final ConcurrentMap regionsInTransitionInRS = + /** + * A map from RegionName to current action in progress. Boolean value indicates: + * true - if open region action in progress + * false - if close region action in progress + */ + private final ConcurrentMap regionsInTransitionInRS = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); /** @@ -271,10 +268,9 @@ public class HRegionServer extends HasThread implements private final Cache executedRegionProcedures = CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); - // Cache flushing - protected MemStoreFlusher cacheFlusher; + private MemStoreFlusher cacheFlusher; - protected HeapMemoryManager hMemManager; + private HeapMemoryManager hMemManager; /** * Cluster connection to be shared by services. @@ -292,8 +288,8 @@ public class HRegionServer extends HasThread implements protected TableDescriptors tableDescriptors; // Replication services. If no replication, this handler will be null. - protected ReplicationSourceService replicationSourceHandler; - protected ReplicationSinkService replicationSinkHandler; + private ReplicationSourceService replicationSourceHandler; + private ReplicationSinkService replicationSinkHandler; // Compactions public CompactSplit compactSplitThread; @@ -302,7 +298,12 @@ public class HRegionServer extends HasThread implements * Map of regions currently being served by this region server. Key is the * encoded region name. All access should be synchronized. */ - protected final Map onlineRegions = new ConcurrentHashMap<>(); + private final Map onlineRegions = new ConcurrentHashMap<>(); + /** + * Lock for gating access to {@link #onlineRegions}. + * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap? + */ + private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); /** * Map of encoded region names to the DataNode locations they should be hosted on @@ -313,19 +314,16 @@ public class HRegionServer extends HasThread implements * it seems a bit weird to store ServerName since ServerName refers to RegionServers * and here we really mean DataNode locations. */ - protected final Map regionFavoredNodesMap = - new ConcurrentHashMap<>(); + private final Map regionFavoredNodesMap = new ConcurrentHashMap<>(); - // Leases - protected Leases leases; + private LeaseManager leaseManager; // Instance of the hbase executor executorService. protected ExecutorService executorService; - // If false, the file system has become unavailable - protected volatile boolean fsOk; - protected HFileSystem fs; - protected HFileSystem walFs; + private volatile boolean dataFsOk; + private HFileSystem dataFs; + private HFileSystem walFs; // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests @@ -335,45 +333,36 @@ public class HRegionServer extends HasThread implements // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. private volatile boolean abortRequested; - public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; + static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; // Default abort timeout is 1200 seconds for safe private static final long DEFAULT_ABORT_TIMEOUT = 1200000; // Will run this task when abort timeout - public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; - - ConcurrentMap rowlocks = new ConcurrentHashMap<>(); + static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; // A state before we go into stopped state. At this stage we're closing user // space regions. private boolean stopping = false; - - volatile boolean killed = false; - + private volatile boolean killed = false; private volatile boolean shutDown = false; protected final Configuration conf; - private Path rootDir; + private Path dataRootDir; private Path walRootDir; - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - final int numRetries; - protected final int threadWakeFrequency; - protected final int msgInterval; + private final int threadWakeFrequency; + final int msgInterval; private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; private final int compactionCheckFrequency; private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; private final int flushCheckFrequency; - protected final int numRegionsToReport; - // Stub to do region server status calls against the master. private volatile RegionServerStatusService.BlockingInterface rssStub; private volatile LockService.BlockingInterface lockStub; // RPC client. Used to make the stub above that does region server status checking. - RpcClient rpcClient; + private RpcClient rpcClient; private RpcRetryingCallerFactory rpcRetryingCallerFactory; private RpcControllerFactory rpcControllerFactory; @@ -389,8 +378,7 @@ public class HRegionServer extends HasThread implements /** region server process name */ public static final String REGIONSERVER = "regionserver"; - MetricsRegionServer metricsRegionServer; - MetricsTable metricsTable; + private MetricsRegionServer metricsRegionServer; private SpanReceiverHost spanReceiverHost; /** @@ -398,21 +386,19 @@ public class HRegionServer extends HasThread implements */ private ChoreService choreService; - /* + /** * Check for compactions requests. */ - ScheduledChore compactionChecker; + private ScheduledChore compactionChecker; - /* + /** * Check for flushes */ - ScheduledChore periodicFlusher; + private ScheduledChore periodicFlusher; - protected volatile WALFactory walFactory; + private volatile WALFactory walFactory; - // WAL roller. log is protected rather than private to avoid - // eclipse warning when accessed by inner classes - protected LogRoller walRoller; + private LogRoller walRoller; // A thread which calls reportProcedureDone private RemoteProcedureResultReporter procedureResultReporter; @@ -456,11 +442,11 @@ public class HRegionServer extends HasThread implements /** * The server name the Master sees us as. Its made from the hostname the * master passes us, port, and server startcode. Gets set after registration - * against Master. + * against Master. */ protected ServerName serverName; - /* + /** * hostname specified by hostname config */ protected String useThisHostnameInstead; @@ -473,8 +459,11 @@ public class HRegionServer extends HasThread implements @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname"; - // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. - // Exception will be thrown if both are used. + /** + * HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. + * Exception will be thrown if both are used. + */ + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = "hbase.regionserver.hostname.disable.master.reversedns"; @@ -527,7 +516,7 @@ public class HRegionServer extends HasThread implements protected final RSRpcServices rpcServices; - protected CoordinatedStateManager csm; + private CoordinatedStateManager csm; /** * Configuration manager is used to register/deregister and notify the configuration observers @@ -540,9 +529,9 @@ public class HRegionServer extends HasThread implements private volatile ThroughputController flushThroughputController; - protected SecureBulkLoadManager secureBulkLoadManager; + private SecureBulkLoadManager secureBulkLoadManager; - protected FileSystemUtilizationChore fsUtilizationChore; + private FileSystemUtilizationChore fsUtilizationChore; private final NettyEventLoopGroupConfig eventLoopGroupConfig; @@ -551,28 +540,30 @@ public class HRegionServer extends HasThread implements * means it needs to just come up and make do without a Master to talk to: e.g. in test or * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only * purpose is as a Replication-stream sink; see HBASE-18846 for more. + * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ? */ private final boolean masterless; - static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; + private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; /**regionserver codec list **/ - public static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; + private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; // A timer to shutdown the process if abort takes too long private Timer abortMonitor; /** - * Starts a HRegionServer at the default location + * Starts a HRegionServer at the default location. + *

+ * Don't start any services or managers in here in the Constructor. + * Defer till after we register with the Master as much as possible. See {@link #startServices}. */ - // Don't start any services or managers in here in the Constructor. - // Defer till after we register with the Master as much as possible. See #startServices. - public HRegionServer(Configuration conf) throws IOException { + public HRegionServer(final Configuration conf) throws IOException { super("RegionServer"); // thread name TraceUtil.initTracer(conf); try { this.startcode = System.currentTimeMillis(); this.conf = conf; - this.fsOk = true; + this.dataFsOk = true; this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); this.eventLoopGroupConfig = setupNetty(this.conf); MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); @@ -584,8 +575,6 @@ public HRegionServer(Configuration conf) throws IOException { // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); // Config'ed params - this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); @@ -596,8 +585,6 @@ public HRegionServer(Configuration conf) throws IOException { boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; - this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); - this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -636,12 +623,8 @@ public HRegionServer(Configuration conf) throws IOException { mobFileCache = new MobFileCache(conf); } - uncaughtExceptionHandler = new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - abort("Uncaught exception in executorService thread " + t.getName(), e); - } - }; + uncaughtExceptionHandler = + (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); initializeFileSystem(); spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); @@ -714,12 +697,9 @@ protected String getUseThisHostnameInstead(Configuration conf) throws IOExceptio */ private static void setupWindows(final Configuration conf, ConfigurationManager cm) { if (!SystemUtils.IS_OS_WINDOWS) { - Signal.handle(new Signal("HUP"), new SignalHandler() { - @Override - public void handle(Signal signal) { - conf.reloadConfiguration(); - cm.notifyAllObservers(conf); - } + Signal.handle(new Signal("HUP"), signal -> { + conf.reloadConfiguration(); + cm.notifyAllObservers(conf); }); } } @@ -744,14 +724,14 @@ private void initializeFileSystem() throws IOException { // underlying hadoop hdfs accessors will be going against wrong filesystem // (unless all is set to defaults). FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); - this.fs = new HFileSystem(this.conf, useHBaseChecksum); - this.rootDir = FSUtils.getRootDir(this.conf); + this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); + this.dataRootDir = FSUtils.getRootDir(this.conf); this.tableDescriptors = getFsTableDescriptors(); } - protected TableDescriptors getFsTableDescriptors() throws IOException { + private TableDescriptors getFsTableDescriptors() throws IOException { return new FSTableDescriptors(this.conf, - this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); + this.dataFs, this.dataRootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); } protected Function getMetaTableObserver() { @@ -948,7 +928,7 @@ private void initializeZooKeeper() throws IOException, InterruptedException { * if the region server is shut down * @param tracker znode tracker to use * @throws IOException any IO exception, plus if the RS is stopped - * @throws InterruptedException + * @throws InterruptedException if the waiting thread is interrupted */ private void blockAndCheckIfStopped(ZKNodeTracker tracker) throws IOException, InterruptedException { @@ -982,7 +962,7 @@ public void run() { try { if (!isStopped() && !isAborted()) { - ShutdownHook.install(conf, fs, this, Thread.currentThread()); + ShutdownHook.install(conf, dataFs, this, Thread.currentThread()); // Initialize the RegionServerCoprocessorHost now that our ephemeral // node was created, in case any coprocessors want to use ZooKeeper this.rsHost = new RegionServerCoprocessorHost(this, this.conf); @@ -1028,13 +1008,13 @@ public void run() { // The main run loop. while (!isStopped() && isHealthy()) { if (!isClusterUp()) { - if (isOnlineRegionsEmpty()) { + if (onlineRegions.isEmpty()) { stop("Exiting; cluster shutdown set and not carrying any regions"); } else if (!this.stopping) { this.stopping = true; LOG.info("Closing user regions"); closeUserRegions(this.abortRequested); - } else if (this.stopping) { + } else { boolean allUserRegionsOffline = areAllUserRegionsOffline(); if (allUserRegionsOffline) { // Set stopped if no more write requests tp meta tables @@ -1070,8 +1050,8 @@ public void run() { } } - if (this.leases != null) { - this.leases.closeAfterLeasesExpire(); + if (this.leaseManager != null) { + this.leaseManager.closeAfterLeasesExpire(); } if (this.splitLogWorker != null) { splitLogWorker.stop(); @@ -1101,7 +1081,6 @@ public void run() { if (this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); - sendShutdownInterrupt(); // Stop the snapshot and other procedure handlers, forcefully killing all running tasks if (rspmHost != null) { @@ -1111,7 +1090,7 @@ public void run() { if (this.killed) { // Just skip out w/o closing regions. Used when testing. } else if (abortRequested) { - if (this.fsOk) { + if (this.dataFsOk) { closeUserRegions(abortRequested); // Don't leave any open file handles } LOG.info("aborting server " + this.serverName); @@ -1132,7 +1111,7 @@ public void run() { // Closing the compactSplit thread before closing meta regions if (!this.killed && containsMetaTableRegions()) { - if (!abortRequested || this.fsOk) { + if (!abortRequested || this.dataFsOk) { if (this.compactSplitThread != null) { this.compactSplitThread.join(); this.compactSplitThread = null; @@ -1141,7 +1120,7 @@ public void run() { } } - if (!this.killed && this.fsOk) { + if (!this.killed && this.dataFsOk) { waitOnAllRegionsToClose(abortRequested); LOG.info("stopping server " + this.serverName + "; all regions closed."); } @@ -1155,8 +1134,8 @@ public void run() { rsSpaceQuotaManager = null; } - //fsOk flag may be changed when closing regions throws exception. - if (this.fsOk) { + // flag may be changed when closing regions throws exception. + if (this.dataFsOk) { shutdownWAL(!abortRequested); } @@ -1170,8 +1149,8 @@ public void run() { if (this.rpcClient != null) { this.rpcClient.close(); } - if (this.leases != null) { - this.leases.close(); + if (this.leaseManager != null) { + this.leaseManager.close(); } if (this.pauseMonitor != null) { this.pauseMonitor.stop(); @@ -1188,6 +1167,7 @@ public void run() { try { deleteMyEphemeralNode(); } catch (KeeperException.NoNodeException nn) { + // pass } catch (KeeperException e) { LOG.warn("Failed deleting my ephemeral node", e); } @@ -1307,7 +1287,7 @@ public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { * @param rss The stub to send to the Master * @param regionSizeStore The store containing region sizes */ - void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, + private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, RegionSizeStore regionSizeStore) throws ServiceException { RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); @@ -1321,7 +1301,7 @@ void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, /** * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. * - * @param regionSizeStore The size in bytes of regions + * @param regionSizes The size in bytes of regions * @return The corresponding protocol buffer message. */ RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { @@ -1347,7 +1327,7 @@ RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { .build(); } - ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) + private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) throws IOException { // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests // per second, and other metrics As long as metrics are part of ServerLoad it's best to use @@ -1381,9 +1361,8 @@ ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long report for (HRegion region : regions) { if (region.getCoprocessorHost() != null) { Set regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); - Iterator iterator = regionCoprocessors.iterator(); - while (iterator.hasNext()) { - serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build()); + for (String regionCoprocessor : regionCoprocessors) { + serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); } } serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); @@ -1420,7 +1399,7 @@ ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long report return serverLoad.build(); } - String getOnlineRegionsAsPrintableString() { + private String getOnlineRegionsAsPrintableString() { StringBuilder sb = new StringBuilder(); for (Region r: this.onlineRegions.values()) { if (sb.length() > 0) sb.append(", "); @@ -1439,7 +1418,7 @@ private void waitOnAllRegionsToClose(final boolean abort) { Set closedRegions = new HashSet<>(); boolean interrupted = false; try { - while (!isOnlineRegionsEmpty()) { + while (!onlineRegions.isEmpty()) { int count = getNumberOfOnlineRegions(); // Only print a message if the count of regions has changed. if (count != lastCount) { @@ -1469,7 +1448,7 @@ private void waitOnAllRegionsToClose(final boolean abort) { } // No regions in RIT, we could stop waiting now. if (this.regionsInTransitionInRS.isEmpty()) { - if (!isOnlineRegionsEmpty()) { + if (!onlineRegions.isEmpty()) { LOG.info("We were exiting though online regions are not empty," + " because some regions failed closing"); } @@ -1486,7 +1465,7 @@ private void waitOnAllRegionsToClose(final boolean abort) { } } - private boolean sleep(long millis) { + private static boolean sleep(long millis) { boolean interrupted = false; try { Thread.sleep(millis); @@ -1513,7 +1492,7 @@ private void shutdownWAL(final boolean close) { } } - /* + /** * Run init. Sets up wal and starts up all server threads. * * @param c Extra configuration. @@ -1577,7 +1556,8 @@ protected void handleReportForDutyResponse(final RegionServerStartupResponse c) // This call sets up an initialized replication and WAL. Later we start it up. setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set - this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); + final MetricsTable metricsTable = + new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); this.metricsRegionServer = new MetricsRegionServer( new MetricsRegionServerWrapperImpl(this), conf, metricsTable); // Now that we have a metrics source, start the pause monitor @@ -1640,7 +1620,7 @@ private void startHeapMemoryManager() { } } - private void createMyEphemeralNode() throws KeeperException, IOException { + private void createMyEphemeralNode() throws KeeperException { RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); @@ -1657,13 +1637,11 @@ public RegionServerAccounting getRegionServerAccounting() { return regionServerAccounting; } - /* + /** * @param r Region to get RegionLoad for. * @param regionLoadBldr the RegionLoad.Builder, can be null * @param regionSpecifier the RegionSpecifier.Builder, can be null * @return RegionLoad instance. - * - * @throws IOException */ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, RegionSpecifier.Builder regionSpecifier) throws IOException { @@ -1737,16 +1715,12 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, return regionLoadBldr.build(); } - /** - * @param encodedRegionName - * @return An instance of RegionLoad. - */ public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { HRegion r = onlineRegions.get(encodedRegionName); return r != null ? createRegionLoad(r, null, null) : null; } - /* + /** * Inner class that runs on a long period checking if regions need compaction. */ private static class CompactionChecker extends ScheduledChore { @@ -1811,18 +1785,19 @@ protected void chore() { } } - static class PeriodicMemStoreFlusher extends ScheduledChore { - final HRegionServer server; - final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds - final static int MIN_DELAY_TIME = 0; // millisec + private static class PeriodicMemStoreFlusher extends ScheduledChore { + private final HRegionServer server; + private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds + private final static int MIN_DELAY_TIME = 0; // millisec + private final long rangeOfDelayMs; - final int rangeOfDelay; - public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { + PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { super("MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; - this.rangeOfDelay = this.server.conf.getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", - RANGE_OF_DELAY)*1000; + final long configuredRangeOfDelay = server.getConfiguration().getInt( + "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); + this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); } @Override @@ -1833,7 +1808,7 @@ protected void chore() { if (r.shouldFlush(whyFlush)) { FlushRequester requester = server.getFlushRequester(); if (requester != null) { - long randomDelay = (long) RandomUtils.nextInt(0, rangeOfDelay) + MIN_DELAY_TIME; + long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME; //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. @@ -1889,7 +1864,6 @@ private void setupWALAndReplication() throws IOException { /** * Start up replication source and sink handlers. - * @throws IOException */ private void startReplicationService() throws IOException { if (this.replicationSourceHandler == this.replicationSinkHandler && @@ -1905,11 +1879,6 @@ private void startReplicationService() throws IOException { } } - - public MetricsRegionServer getRegionServerMetrics() { - return this.metricsRegionServer; - } - /** * @return Master address tracker instance. */ @@ -1917,7 +1886,7 @@ public MasterAddressTracker getMasterAddressTracker() { return this.masterAddressTracker; } - /* + /** * Start maintenance Threads, Server, Worker and lease checker threads. * Start all threads we need to run. This is called after we've successfully * registered with the Master. @@ -2007,8 +1976,8 @@ private void startServices() throws IOException { // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. - Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", - uncaughtExceptionHandler); + Threads.setDaemonThreadRunning(this.leaseManager.getThread(), getName() + ".leaseChecker", + uncaughtExceptionHandler); // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for @@ -2036,7 +2005,7 @@ private void startServices() throws IOException { initializeMemStoreChunkCreator(); } - private void initializeThreads() throws IOException { + private void initializeThreads() { // Cache flushing thread. this.cacheFlusher = new MemStoreFlusher(conf, this); @@ -2047,7 +2016,7 @@ private void initializeThreads() throws IOException { // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); - this.leases = new Leases(this.threadWakeFrequency); + this.leaseManager = new LeaseManager(this.threadWakeFrequency); // Create the thread to clean the moved regions list movedRegionsCleaner = MovedRegionsCleaner.create(this); @@ -2092,10 +2061,8 @@ private void registerConfigurationObservers() { /** * Puts up the webui. - * @return Returns final port -- maybe different from what we started with. - * @throws IOException */ - private int putUpWebUI() throws IOException { + private void putUpWebUI() throws IOException { int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT); String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); @@ -2106,7 +2073,9 @@ private int putUpWebUI() throws IOException { addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); } // -1 is for disabling info server - if (port < 0) return port; + if (port < 0) { + return; + } if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { String msg = @@ -2117,8 +2086,7 @@ private int putUpWebUI() throws IOException { throw new IOException(msg); } // check if auto port bind enabled - boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, - false); + boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); while (true) { try { this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); @@ -2143,19 +2111,18 @@ private int putUpWebUI() throws IOException { HConstants.DEFAULT_MASTER_INFOPORT); conf.setInt("hbase.master.info.port.orig", masterInfoPort); conf.setInt(HConstants.MASTER_INFO_PORT, port); - return port; } /* * Verify that server is healthy */ private boolean isHealthy() { - if (!fsOk) { + if (!dataFsOk) { // File system problem return false; } // Verify that all threads are alive - boolean healthy = (this.leases == null || this.leases.isAlive()) + boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) && (this.walRoller == null || this.walRoller.isAlive()) && (this.compactionChecker == null || this.compactionChecker.isScheduled()) @@ -2167,7 +2134,7 @@ private boolean isHealthy() { } @Override - public List getWALs() throws IOException { + public List getWALs() { return walFactory.getWALs(); } @@ -2190,6 +2157,10 @@ public LogRoller getWalRoller() { return walRoller; } + WALFactory getWalFactory() { + return walFactory; + } + @Override public Connection getConnection() { return getClusterConnection(); @@ -2280,42 +2251,48 @@ public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOEx LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); } - @Override - public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { - TransitionCode code = context.getCode(); - long openSeqNum = context.getOpenSeqNum(); + /** + * Helper method for use in tests. Skip the region transition report when there's no master + * around to receive it. + */ + private boolean skipReportingTransition(final RegionStateTransitionContext context) { + final TransitionCode code = context.getCode(); + final long openSeqNum = context.getOpenSeqNum(); long masterSystemTime = context.getMasterSystemTime(); - RegionInfo[] hris = context.getHris(); - long[] procIds = context.getProcIds(); + final RegionInfo[] hris = context.getHris(); - if (TEST_SKIP_REPORTING_TRANSITION) { - // This is for testing only in case there is no master - // to handle the region transition report at all. - if (code == TransitionCode.OPENED) { - Preconditions.checkArgument(hris != null && hris.length == 1); - if (hris[0].isMetaRegion()) { - try { - MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, - hris[0].getReplicaId(),State.OPEN); - } catch (KeeperException e) { - LOG.info("Failed to update meta location", e); - return false; - } - } else { - try { - MetaTableAccessor.updateRegionLocation(clusterConnection, + if (code == TransitionCode.OPENED) { + Preconditions.checkArgument(hris != null && hris.length == 1); + if (hris[0].isMetaRegion()) { + try { + MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, + hris[0].getReplicaId(),State.OPEN); + } catch (KeeperException e) { + LOG.info("Failed to update meta location", e); + return false; + } + } else { + try { + MetaTableAccessor.updateRegionLocation(clusterConnection, hris[0], serverName, openSeqNum, masterSystemTime); - } catch (IOException e) { - LOG.info("Failed to update meta", e); - return false; - } + } catch (IOException e) { + LOG.info("Failed to update meta", e); + return false; } } - return true; } + return true; + } + + private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest( + final RegionStateTransitionContext context) { + final TransitionCode code = context.getCode(); + final long openSeqNum = context.getOpenSeqNum(); + final RegionInfo[] hris = context.getHris(); + final long[] procIds = context.getProcIds(); ReportRegionStateTransitionRequest.Builder builder = - ReportRegionStateTransitionRequest.newBuilder(); + ReportRegionStateTransitionRequest.newBuilder(); builder.setServer(ProtobufUtil.toServerName(serverName)); RegionStateTransition.Builder transition = builder.addTransitionBuilder(); transition.setTransitionCode(code); @@ -2328,9 +2305,22 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co for (long procId: procIds) { transition.addProcId(procId); } - ReportRegionStateTransitionRequest request = builder.build(); + + return builder.build(); + } + + @Override + public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { + if (TEST_SKIP_REPORTING_TRANSITION) { + return skipReportingTransition(context); + } + final ReportRegionStateTransitionRequest request = + createReportRegionStateTransitionRequest(context); + + // Time to pause if master says 'please hold'. Make configurable if needed. + final long initPauseTime = 1000; int tries = 0; - long pauseTime = INIT_PAUSE_TIME_MS; + long pauseTime; // Keep looping till we get an error. We want to send reports even though server is going down. // Only go down if clusterConnection is null. It is set to null almost as last thing as the // HRegionServer does down. @@ -2361,9 +2351,9 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co || ioe instanceof CallQueueTooBigException; if (pause) { // Do backoff else we flood the Master with requests. - pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); + pauseTime = ConnectionUtils.getPauseTime(initPauseTime, tries); } else { - pauseTime = INIT_PAUSE_TIME_MS; // Reset. + pauseTime = initPauseTime; // Reset. } LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + @@ -2385,7 +2375,7 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co * Trigger a flush in the primary region replica if this region is a secondary replica. Does not * block this thread. See RegionReplicaFlushHandler for details. */ - void triggerFlushInPrimaryRegion(final HRegion region) { + private void triggerFlushInPrimaryRegion(final HRegion region) { if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { return; } @@ -2497,12 +2487,6 @@ protected void kill() { abort("Simulated kill"); } - /** - * Called on stop/abort before closing the cluster connection and meta locator. - */ - protected void sendShutdownInterrupt() { - } - // Limits the time spent in the shutdown process. private void scheduleAbortTimer() { if (this.abortMonitor == null) { @@ -2593,8 +2577,7 @@ ReplicationSinkService getReplicationSinkService() { * * @return master + port, or null if server has been stopped */ - @VisibleForTesting - protected synchronized ServerName createRegionServerStatusStub() { + private synchronized ServerName createRegionServerStatusStub() { // Create RS stub without refreshing the master node from ZK, use cached data return createRegionServerStatusStub(false); } @@ -2758,23 +2741,13 @@ public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { } } - /** - * Closes all regions. Called on our way out. - * Assumes that its not possible for new regions to be added to onlineRegions - * while this method runs. - */ - protected void closeAllRegions(final boolean abort) { - closeUserRegions(abort); - closeMetaTableRegions(abort); - } - /** * Close meta region if we carry it * @param abort Whether we're running an abort. */ - void closeMetaTableRegions(final boolean abort) { + private void closeMetaTableRegions(final boolean abort) { HRegion meta = null; - this.lock.writeLock().lock(); + this.onlineRegionsLock.writeLock().lock(); try { for (Map.Entry e: onlineRegions.entrySet()) { RegionInfo hri = e.getValue().getRegionInfo(); @@ -2784,7 +2757,7 @@ void closeMetaTableRegions(final boolean abort) { if (meta != null) break; } } finally { - this.lock.writeLock().unlock(); + this.onlineRegionsLock.writeLock().unlock(); } if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); } @@ -2795,8 +2768,8 @@ void closeMetaTableRegions(final boolean abort) { * that are already closed or that are closing. * @param abort Whether we're running an abort. */ - void closeUserRegions(final boolean abort) { - this.lock.writeLock().lock(); + private void closeUserRegions(final boolean abort) { + this.onlineRegionsLock.writeLock().lock(); try { for (Map.Entry e: this.onlineRegions.entrySet()) { HRegion r = e.getValue(); @@ -2806,7 +2779,7 @@ void closeUserRegions(final boolean abort) { } } } finally { - this.lock.writeLock().unlock(); + this.onlineRegionsLock.writeLock().unlock(); } } @@ -2828,28 +2801,19 @@ public boolean isStopping() { return this.stopping; } - /** - * - * @return the configuration - */ @Override public Configuration getConfiguration() { return conf; } - /** @return the write lock for the server */ - ReentrantReadWriteLock.WriteLock getWriteLock() { - return lock.writeLock(); + protected Map getOnlineRegions() { + return this.onlineRegions; } public int getNumberOfOnlineRegions() { return this.onlineRegions.size(); } - boolean isOnlineRegionsEmpty() { - return this.onlineRegions.isEmpty(); - } - /** * For tests, web ui and metrics. * This method will only work if HRegionServer is in the same JVM as client; @@ -2873,13 +2837,7 @@ public void addRegion(HRegion region) { */ SortedMap getCopyOfOnlineRegionsSortedByOffHeapSize() { // we'll sort the regions in reverse - SortedMap sortedRegions = new TreeMap<>( - new Comparator() { - @Override - public int compare(Long a, Long b) { - return -1 * a.compareTo(b); - } - }); + SortedMap sortedRegions = new TreeMap<>(Comparator.reverseOrder()); // Copy over all regions. Regions are sorted by size with biggest first. for (HRegion region : this.onlineRegions.values()) { sortedRegions.put(region.getMemStoreOffHeapSize(), region); @@ -2894,13 +2852,7 @@ public int compare(Long a, Long b) { */ SortedMap getCopyOfOnlineRegionsSortedByOnHeapSize() { // we'll sort the regions in reverse - SortedMap sortedRegions = new TreeMap<>( - new Comparator() { - @Override - public int compare(Long a, Long b) { - return -1 * a.compareTo(b); - } - }); + SortedMap sortedRegions = new TreeMap<>(Comparator.reverseOrder()); // Copy over all regions. Regions are sorted by size with biggest first. for (HRegion region : this.onlineRegions.values()) { sortedRegions.put(region.getMemStoreHeapSize(), region); @@ -2927,23 +2879,27 @@ public CompactionRequester getCompactionRequestor() { } @Override - public Leases getLeases() { - return leases; + public LeaseManager getLeaseManager() { + return leaseManager; } /** * @return Return the rootDir. */ - protected Path getRootDir() { - return rootDir; + protected Path getDataRootDir() { + return dataRootDir; } - /** - * @return Return the fs. - */ @Override public FileSystem getFileSystem() { - return fs; + return dataFs; + } + + /** + * @return {@code true} when the data file system is available, {@code false} otherwise. + */ + boolean isDataFileSystemOk() { + return this.dataFsOk; } /** @@ -2965,15 +2921,6 @@ public String toString() { return getServerName().toString(); } - /** - * Interval at which threads should run - * - * @return the interval - */ - public int getThreadWakeFrequency() { - return threadWakeFrequency; - } - @Override public ZKWatcher getZooKeeper() { return zooKeeper; @@ -3051,7 +2998,7 @@ private static void createNewReplicationInstance(Configuration conf, HRegionServ private static T newReplicationInstance(String classname, Class xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir, WALProvider walProvider) throws IOException { - Class clazz = null; + final Class clazz; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); clazz = Class.forName(classname, true, classLoader).asSubclass(xface); @@ -3079,18 +3026,15 @@ public Map getWalGroupsReplicationStatus(){ /** * Utility for constructing an instance of the passed HRegionServer class. - * - * @param regionServerClass - * @param conf2 - * @return HRegionServer instance. */ - public static HRegionServer constructRegionServer( - Class regionServerClass, - final Configuration conf2) { + static HRegionServer constructRegionServer( + final Class regionServerClass, + final Configuration conf + ) { try { - Constructor c = regionServerClass - .getConstructor(Configuration.class); - return c.newInstance(conf2); + Constructor c = + regionServerClass.getConstructor(Configuration.class); + return c.newInstance(conf); } catch (Exception e) { throw new RuntimeException("Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); @@ -3100,7 +3044,7 @@ public static HRegionServer constructRegionServer( /** * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ - public static void main(String[] args) throws Exception { + public static void main(String[] args) { LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); @@ -3118,7 +3062,7 @@ public static void main(String[] args) throws Exception { * closed during a disable, etc., it will not be included in the returned list. * So, the returned list may not necessarily be ALL regions in this table, its * all the ONLINE regions in the table. - * @param tableName + * @param tableName table to limit the scope of the query * @return Online regions from tableName */ @Override @@ -3137,10 +3081,10 @@ public List getRegions(TableName tableName) { @Override public List getRegions() { - List allRegions = new ArrayList<>(); + List allRegions; synchronized (this.onlineRegions) { // Return a clone copy of the onlineRegions - allRegions.addAll(onlineRegions.values()); + allRegions = new ArrayList<>(onlineRegions.values()); } return allRegions; } @@ -3160,7 +3104,6 @@ public Set getOnlineTables() { return tables; } - // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). public String[] getRegionServerCoprocessors() { TreeSet coprocessors = new TreeSet<>(); try { @@ -3182,7 +3125,7 @@ public String[] getRegionServerCoprocessors() { } } coprocessors.addAll(rsHost.getCoprocessors()); - return coprocessors.toArray(new String[coprocessors.size()]); + return coprocessors.toArray(new String[0]); } /** @@ -3279,43 +3222,6 @@ protected boolean closeRegion(String encodedName, final boolean abort, final Ser return true; } - /** - * Close and offline the region for split or merge - * - * @param regionEncodedName the name of the region(s) to close - * @return true if closed the region successfully. - * @throws IOException - */ - protected boolean closeAndOfflineRegionForSplitOrMerge(final List regionEncodedName) - throws IOException { - for (int i = 0; i < regionEncodedName.size(); ++i) { - HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); - if (regionToClose != null) { - Map> hstoreFiles = null; - Exception exceptionToThrow = null; - try { - hstoreFiles = regionToClose.close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFiles == null) { - // The region was closed by someone else - exceptionToThrow = - new IOException("Failed to close region: already closed by another thread"); - } - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) { - throw (IOException) exceptionToThrow; - } - throw new IOException(exceptionToThrow); - } - // Offline the region - this.removeRegion(regionToClose, null); - } - } - return true; - } - /** * @return HRegion for the passed binary regionName or null if * named region is not member of the online regions. @@ -3325,10 +3231,6 @@ public HRegion getOnlineRegion(final byte[] regionName) { return this.onlineRegions.get(encodedRegionName); } - public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { - return this.regionFavoredNodesMap.get(encodedRegionName); - } - @Override public HRegion getRegion(final String encodedRegionName) { return this.onlineRegions.get(encodedRegionName); @@ -3354,10 +3256,8 @@ public boolean removeRegion(final HRegion r, ServerName destination) { /** * Protected Utility method for safely obtaining an HRegion handle. * - * @param regionName - * Name of online {@link HRegion} to return + * @param regionName Name of online {@link HRegion} to return * @return {@link HRegion} for regionName - * @throws NotServingRegionException */ protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { @@ -3370,7 +3270,7 @@ public HRegion getRegionByEncodedName(String encodedRegionName) return getRegionByEncodedName(null, encodedRegionName); } - protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) + private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) throws NotServingRegionException { HRegion region = this.onlineRegions.get(encodedRegionName); if (region == null) { @@ -3381,7 +3281,7 @@ protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegion Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); String regionNameStr = regionName == null? encodedRegionName: Bytes.toStringBinary(regionName); - if (isOpening != null && isOpening.booleanValue()) { + if (isOpening != null && isOpening) { throw new RegionOpeningException("Region " + regionNameStr + " is opening on " + this.serverName); } @@ -3391,14 +3291,12 @@ protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegion return region; } - /* + /** * Cleanup after Throwable caught invoking method. Converts t to * IOE if it isn't already. * * @param t Throwable - * * @param msg Message to log in error. Can be null. - * * @return Throwable converted to an IOE; methods can only let out IOEs. */ private Throwable cleanup(final Throwable t, final String msg) { @@ -3419,14 +3317,11 @@ private Throwable cleanup(final Throwable t, final String msg) { return t; } - /* - * @param t - * + /** * @param msg Message to put in new IOE if passed t is not an IOE - * * @return Make t an IOE if it isn't already. */ - protected IOException convertThrowableToIOE(final Throwable t, final String msg) { + private IOException convertThrowableToIOE(final Throwable t, final String msg) { return (t instanceof IOException ? (IOException) t : msg == null || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); } @@ -3437,16 +3332,16 @@ protected IOException convertThrowableToIOE(final Throwable t, final String msg) * * @return false if file system is not available */ - public boolean checkFileSystem() { - if (this.fsOk && this.fs != null) { + boolean checkFileSystem() { + if (this.dataFsOk && this.dataFs != null) { try { - FSUtils.checkFileSystemAvailable(this.fs); + FSUtils.checkFileSystemAvailable(this.dataFs); } catch (IOException e) { abort("File System not available", e); - this.fsOk = false; + this.dataFsOk = false; } } - return this.fsOk; + return this.dataFsOk; } @Override @@ -3465,7 +3360,7 @@ public void updateRegionFavoredNodesMapping(String encodedRegionName, /** * Return the favored nodes for a region given its encoded name. Look at the * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] - * @param encodedRegionName + * * @return array of favored locations */ @Override @@ -3481,12 +3376,12 @@ public ServerNonceManager getNonceManager() { private static class MovedRegionInfo { private final ServerName serverName; private final long seqNum; - private final long ts; + private final long moveTime; - public MovedRegionInfo(ServerName serverName, long closeSeqNum) { + MovedRegionInfo(ServerName serverName, long closeSeqNum) { this.serverName = serverName; this.seqNum = closeSeqNum; - ts = EnvironmentEdgeManager.currentTime(); + this.moveTime = EnvironmentEdgeManager.currentTime(); } public ServerName getServerName() { @@ -3497,20 +3392,24 @@ public long getSeqNum() { return seqNum; } - public long getMoveTime() { - return ts; + long getMoveTime() { + return moveTime; } } - // This map will contains all the regions that we closed for a move. - // We add the time it was moved as we don't want to keep too old information - protected Map movedRegions = new ConcurrentHashMap<>(3000); + /** + * This map will contains all the regions that we closed for a move. + * We add the time it was moved as we don't want to keep too old information + */ + private Map movedRegions = new ConcurrentHashMap<>(3000); - // We need a timeout. If not there is a risk of giving a wrong information: this would double - // the number of network calls instead of reducing them. + /** + * We need a timeout. If not there is a risk of giving a wrong information: this would double + * the number of network calls instead of reducing them. + */ private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); - protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { + private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { if (ServerName.isSameAddress(destination, this.getServerName())) { LOG.warn("Not adding moved region record: " + encodedName + " to self."); return; @@ -3544,14 +3443,8 @@ private MovedRegionInfo getMovedRegion(final String encodedRegionName) { */ protected void cleanMovedRegions() { final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; - Iterator> it = movedRegions.entrySet().iterator(); - while (it.hasNext()){ - Map.Entry e = it.next(); - if (e.getValue().getMoveTime() < cutOff) { - it.remove(); - } - } + movedRegions.entrySet().removeIf(e -> e.getValue().getMoveTime() < cutOff); } /* @@ -3619,7 +3512,7 @@ public CompactSplit getCompactSplitThread() { return this.compactSplitThread; } - public CoprocessorServiceResponse execRegionServerService( + CoprocessorServiceResponse execRegionServerService( @SuppressWarnings("UnusedParameters") final RpcController controller, final CoprocessorServiceRequest serviceRequest) throws ServiceException { try { @@ -3646,13 +3539,9 @@ public CoprocessorServiceResponse execRegionServerService( CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); final com.google.protobuf.Message.Builder responseBuilder = service.getResponsePrototype(methodDesc).newBuilderForType(); - service.callMethod(methodDesc, serviceController, request, - new com.google.protobuf.RpcCallback() { - @Override - public void run(com.google.protobuf.Message message) { - if (message != null) { - responseBuilder.mergeFrom(message); - } + service.callMethod(methodDesc, serviceController, request, message -> { + if (message != null) { + responseBuilder.mergeFrom(message); } }); IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); @@ -3698,7 +3587,8 @@ public ZKPermissionWatcher getZKPermissionWatcher() { /** * @return : Returns the ConfigurationManager object for testing purposes. */ - protected ConfigurationManager getConfigurationManager() { + @VisibleForTesting + ConfigurationManager getConfigurationManager() { return configurationManager; } @@ -3713,14 +3603,14 @@ public TableDescriptors getTableDescriptors() { /** * Reload the configuration from disk. */ - public void updateConfiguration() { + void updateConfiguration() { LOG.info("Reloading the configuration from disk."); // Reload the configuration from disk. conf.reloadConfiguration(); configurationManager.notifyAllObservers(conf); } - public CacheEvictionStats clearRegionBlockCache(Region region) { + CacheEvictionStats clearRegionBlockCache(Region region) { long evictedBlocks = 0; for(Store store : region.getStores()) { @@ -3753,6 +3643,10 @@ public HeapMemoryManager getHeapMemoryManager() { return hMemManager; } + MemStoreFlusher getMemStoreFlusher() { + return cacheFlusher; + } + /** * For testing * @return whether all wal roll request finished for this regionserver @@ -3801,8 +3695,8 @@ public SecureBulkLoadManager getSecureBulkLoadManager() { } @Override - public EntityLock regionLock(List regionInfos, String description, - Abortable abort) throws IOException { + public EntityLock regionLock(final List regionInfos, final String description, + final Abortable abort) { return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) .regionLock(regionInfos, description, abort); } @@ -3862,7 +3756,7 @@ public Connection createConnection(Configuration conf) throws IOException { this.rpcServices, this.rpcServices); } - public void executeProcedure(long procId, RSProcedureCallable callable) { + void executeProcedure(long procId, RSProcedureCallable callable) { executorService.submit(new RSProcedureHandler(this, procId, callable)); } @@ -3871,7 +3765,8 @@ public void remoteProcedureComplete(long procId, Throwable error) { } void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { - RegionServerStatusService.BlockingInterface rss = rssStub; + RegionServerStatusService.BlockingInterface rss; + // TODO: juggling class state with an instance variable, outside of a synchronized block :'( for (;;) { rss = rssStub; if (rss != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseManager.java similarity index 89% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseManager.java index 0afa3813edd6..15bc2afa346d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseManager.java @@ -53,29 +53,25 @@ * sleep time which is invariant. */ @InterfaceAudience.Private -public class Leases extends HasThread { - private static final Logger LOG = LoggerFactory.getLogger(Leases.class.getName()); - public static final int MIN_WAIT_TIME = 100; - private final Map leases = new ConcurrentHashMap<>(); +public class LeaseManager extends HasThread { + private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName()); + private static final int MIN_WAIT_TIME = 100; - protected final int leaseCheckFrequency; - protected volatile boolean stopRequested = false; + private final Map leases = new ConcurrentHashMap<>(); + private final int leaseCheckFrequency; + private volatile boolean stopRequested = false; /** - * Creates a lease monitor + * Creates a lease manager. * - * @param leaseCheckFrequency - how often the lease should be checked - * (milliseconds) + * @param leaseCheckFrequency - how often the lease should be checked (milliseconds) */ - public Leases(final int leaseCheckFrequency) { - super("RegionServerLeases"); // thread name + public LeaseManager(final int leaseCheckFrequency) { + super("RegionServer.LeaseManager"); // thread name this.leaseCheckFrequency = leaseCheckFrequency; setDaemon(true); } - /** - * @see Thread#run() - */ @Override public void run() { long toWait = leaseCheckFrequency; @@ -93,9 +89,7 @@ public void run() { toWait = Math.max(MIN_WAIT_TIME, toWait); Thread.sleep(toWait); - } catch (InterruptedException e) { - continue; - } catch (ConcurrentModificationException e) { + } catch (InterruptedException | ConcurrentModificationException e) { continue; } catch (Throwable e) { LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e); @@ -156,7 +150,6 @@ public void close() { * @param leaseTimeoutPeriod length of the lease in milliseconds * @param listener listener that will process lease expirations * @return The lease created. - * @throws LeaseStillHeldException */ public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) throws LeaseStillHeldException { @@ -167,8 +160,6 @@ public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseLi /** * Inserts lease. Resets expiration before insertion. - * @param lease - * @throws LeaseStillHeldException */ public void addLease(final Lease lease) throws LeaseStillHeldException { if (this.stopRequested) { @@ -184,8 +175,7 @@ public void addLease(final Lease lease) throws LeaseStillHeldException { /** * Renew a lease * - * @param leaseName name of lease - * @throws LeaseException + * @param leaseName name of the lease */ public void renewLease(final String leaseName) throws LeaseException { if (this.stopRequested) { @@ -202,20 +192,17 @@ public void renewLease(final String leaseName) throws LeaseException { /** * Client explicitly cancels a lease. + * * @param leaseName name of lease - * @throws org.apache.hadoop.hbase.regionserver.LeaseException */ public void cancelLease(final String leaseName) throws LeaseException { removeLease(leaseName); } /** - * Remove named lease. - * Lease is removed from the map of leases. - * Lease can be reinserted using {@link #addLease(Lease)} + * Remove named lease. Lease is removed from the map of leases. * * @param leaseName name of lease - * @throws org.apache.hadoop.hbase.regionserver.LeaseException * @return Removed lease */ Lease removeLease(final String leaseName) throws LeaseException { @@ -234,9 +221,6 @@ Lease removeLease(final String leaseName) throws LeaseException { public static class LeaseStillHeldException extends IOException { private final String leaseName; - /** - * @param name - */ public LeaseStillHeldException(final String name) { this.leaseName = name; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index a53008de3d98..feb5f36f4036 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -706,7 +706,7 @@ public void reclaimMemStoreMemory() { try { flushType = isAboveHighWaterMark(); while (flushType != FlushType.NORMAL && !server.isStopped()) { - server.cacheFlusher.setFlushType(flushType); + server.getMemStoreFlusher().setFlushType(flushType); if (!blocked) { startTime = EnvironmentEdgeManager.currentTime(); if (!server.getRegionServerAccounting().isOffheap()) { @@ -764,7 +764,7 @@ public void reclaimMemStoreMemory() { } else { flushType = isAboveLowWaterMark(); if (flushType != FlushType.NORMAL) { - server.cacheFlusher.setFlushType(flushType); + server.getMemStoreFlusher().setFlushType(flushType); wakeupFlushThread(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index a71429ba726e..97474bea3e82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -128,7 +128,7 @@ public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) { initBlockCache(); initMobFileCache(); - this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD, + this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD, HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD); this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor(); @@ -264,10 +264,10 @@ public int getLargeCompactionQueueSize() { @Override public int getFlushQueueSize() { //If there is no flusher there should be no queue. - if (this.regionServer.cacheFlusher == null) { + if (this.regionServer.getMemStoreFlusher() == null) { return 0; } - return this.regionServer.cacheFlusher.getFlushQueueSize(); + return this.regionServer.getMemStoreFlusher().getFlushQueueSize(); } @Override @@ -538,10 +538,10 @@ public double getPercentFileLocalSecondaryRegions() { @Override public long getUpdatesBlockedTime() { - if (this.regionServer.cacheFlusher == null) { + if (this.regionServer.getMemStoreFlusher() == null) { return 0; } - return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().sum(); + return this.regionServer.getMemStoreFlusher().getUpdatesBlockedMsHighWater().sum(); } @Override @@ -805,8 +805,8 @@ synchronized public void run() { } lastRan = currentTime; - WALProvider provider = regionServer.walFactory.getWALProvider(); - WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider(); + final WALProvider provider = regionServer.getWalFactory().getWALProvider(); + final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider(); numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) + (metaProvider == null ? 0 : metaProvider.getNumLogFiles()); walFileSize = (provider == null ? 0 : provider.getLogFileSize()) + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java index 56b759822384..ac74cd89d54e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java @@ -48,7 +48,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) { this.regionServer = regionServer; - this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD, + this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD, HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000; this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor(); this.runnable = new TableMetricsWrapperRunnable(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java index 2852ecf8a337..d83ae47768d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java @@ -134,11 +134,10 @@ public static void dumpQueue(HRegionServer hrs, PrintWriter out) out.println(hrs.compactSplitThread.dumpQueue()); } - if (hrs.cacheFlusher != null) { + if (hrs.getMemStoreFlusher() != null) { // 2. Print out flush Queue - out.println("\nFlush Queue summary: " - + hrs.cacheFlusher.toString()); - out.println(hrs.cacheFlusher.dumpQueue()); + out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString()); + out.println(hrs.getMemStoreFlusher().dumpQueue()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 8730c3326edc..3b2e58118389 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -116,8 +116,8 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; -import org.apache.hadoop.hbase.regionserver.Leases.Lease; -import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; +import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; @@ -395,7 +395,9 @@ public void run() throws IOException { // We're done. On way out re-add the above removed lease. The lease was temp removed for this // Rpc call and we are at end of the call now. Time to add it back. if (scanners.containsKey(scannerName)) { - if (lease != null) regionServer.leases.addLease(lease); + if (lease != null) { + regionServer.getLeaseManager().addLease(lease); + } } } } @@ -611,7 +613,7 @@ private boolean checkAndRowMutate(final HRegion region, final List doNonAtomicRegionMutation(final HRegion region, r = region.get(get); } } finally { - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateGet( + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateGet( region.getTableDescriptor().getTableName(), EnvironmentEdgeManager.currentTime() - before); } @@ -1027,7 +1026,7 @@ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion r } if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); + regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } // HBASE-17924 @@ -1085,15 +1084,16 @@ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion r private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, boolean batchContainsDelete) { - if (regionServer.metricsRegionServer != null) { + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { - regionServer.metricsRegionServer - .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); + metricsRegionServer.updatePutBatch( + region.getTableDescriptor().getTableName(), after - starttime); } if (batchContainsDelete) { - regionServer.metricsRegionServer - .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); + metricsRegionServer.updateDeleteBatch( + region.getTableDescriptor().getTableName(), after - starttime); } } } @@ -1157,7 +1157,7 @@ private void updateMutationMetrics(HRegion region, long starttime, boolean batch } requestCount.increment(); if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); + regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } return region.batchReplay(mutations.toArray( new MutationReplay[mutations.size()]), replaySeqId); @@ -1198,16 +1198,18 @@ public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThre private final LogDelegate ld; - public RSRpcServices(HRegionServer rs) throws IOException { + public RSRpcServices(final HRegionServer rs) throws IOException { this(rs, DEFAULT_LOG_DELEGATE); } // Directly invoked only for testing - RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { + RSRpcServices(final HRegionServer rs, final LogDelegate ld) throws IOException { + final Configuration conf = rs.getConfiguration(); this.ld = ld; regionServer = rs; - rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); - RpcSchedulerFactory rpcSchedulerFactory; + rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + + final RpcSchedulerFactory rpcSchedulerFactory; try { rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) .getDeclaredConstructor().newInstance(); @@ -1216,22 +1218,22 @@ public RSRpcServices(HRegionServer rs) throws IOException { throw new IllegalArgumentException(e); } // Server to handle client requests. - InetSocketAddress initialIsa; - InetSocketAddress bindAddress; + final InetSocketAddress initialIsa; + final InetSocketAddress bindAddress; if(this instanceof MasterRpcServices) { - String hostname = getHostname(rs.conf, true); - int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); + String hostname = getHostname(conf, true); + int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); // Creation of a HSA will force a resolve. initialIsa = new InetSocketAddress(hostname, port); - bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port); + bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port); } else { - String hostname = getHostname(rs.conf, false); - int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT, + String hostname = getHostname(conf, false); + int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); // Creation of a HSA will force a resolve. initialIsa = new InetSocketAddress(hostname, port); - bindAddress = new InetSocketAddress( - rs.conf.get("hbase.regionserver.ipc.address", hostname), port); + bindAddress = + new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port); } if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initialIsa); @@ -1239,26 +1241,26 @@ public RSRpcServices(HRegionServer rs) throws IOException { priority = createPriority(); // Using Address means we don't get the IP too. Shorten it more even to just the host name // w/o the domain. - String name = rs.getProcessName() + "/" + + final String name = rs.getProcessName() + "/" + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); // Set how many times to retry talking to another server over Connection. - ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); - rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name); + ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); + rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); rpcServer.setRsRpcServices(this); - scannerLeaseTimeoutPeriod = rs.conf.getInt( + scannerLeaseTimeoutPeriod = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - maxScannerResultSize = rs.conf.getLong( + maxScannerResultSize = conf.getLong( HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); - rpcTimeout = rs.conf.getInt( + rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - minimumScanTimeLimitDelta = rs.conf.getLong( + minimumScanTimeLimitDelta = conf.getLong( REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); - InetSocketAddress address = rpcServer.getListenerAddress(); + final InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); } @@ -1271,9 +1273,13 @@ public RSRpcServices(HRegionServer rs) throws IOException { .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); } - protected RpcServerInterface createRpcServer(Server server, Configuration conf, - RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) - throws IOException { + protected RpcServerInterface createRpcServer( + final Server server, + final RpcSchedulerFactory rpcSchedulerFactory, + final InetSocketAddress bindAddress, + final String name + ) throws IOException { + final Configuration conf = server.getConfiguration(); boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); try { return RpcServerFactory.createRpcServer(server, name, getServices(), @@ -1287,7 +1293,8 @@ protected RpcServerInterface createRpcServer(Server server, Configuration conf, } protected Class getRpcSchedulerFactoryClass() { - return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + final Configuration conf = regionServer.getConfiguration(); + return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, SimpleRpcSchedulerFactory.class); } @@ -1406,8 +1413,8 @@ Object addSize(RpcCallContext context, Result r, Object lastBlock) { private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, HRegion r, boolean needCursor) throws LeaseStillHeldException { - Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, - new ScannerListener(scannerName)); + Lease lease = regionServer.getLeaseManager().createLease( + scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); RpcCallback closeCallback; if (s instanceof RpcCallback) { @@ -1514,7 +1521,7 @@ protected void checkOpen() throws IOException { if (regionServer.isStopped()) { throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); } - if (!regionServer.fsOk) { + if (!regionServer.isDataFileSystemOk()) { throw new RegionServerStoppedException("File system not available"); } if (!regionServer.isOnline()) { @@ -1753,12 +1760,12 @@ public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, try { checkOpen(); requestCount.increment(); - Map onlineRegions = regionServer.onlineRegions; + Map onlineRegions = regionServer.getOnlineRegions(); List list = new ArrayList<>(onlineRegions.size()); for (HRegion region: onlineRegions.values()) { list.add(region.getRegionInfo()); } - Collections.sort(list, RegionInfo.COMPARATOR); + list.sort(RegionInfo.COMPARATOR); return ResponseConverter.buildGetOnlineRegionResponse(list); } catch (IOException ie) { throw new ServiceException(ie); @@ -2008,7 +2015,7 @@ public OpenRegionResponse openRegion(final RpcController controller, throw new ServiceException(ie); } // We are assigning meta, wait a little for regionserver to finish initialization. - int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout long endTime = System.currentTimeMillis() + timeout; synchronized (regionServer.online) { @@ -2048,7 +2055,7 @@ public OpenRegionResponse openRegion(final RpcController controller, } LOG.info("Open " + region.getRegionNameAsString()); - final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent( + final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent( encodedNameBytes, Boolean.TRUE); if (Boolean.FALSE.equals(previous)) { @@ -2059,7 +2066,7 @@ public OpenRegionResponse openRegion(final RpcController controller, regionServer.abort(error); throw new IOException(error); } - regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE); + regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); } if (Boolean.TRUE.equals(previous)) { @@ -2261,9 +2268,9 @@ public ReplicateWALEntryResponse replay(final RpcController controller, } catch (IOException ie) { throw new ServiceException(ie); } finally { - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateReplay( - EnvironmentEdgeManager.currentTime() - before); + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); } } } @@ -2273,7 +2280,6 @@ public ReplicateWALEntryResponse replay(final RpcController controller, * * @param controller the RPC controller * @param request the request - * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) @@ -2281,12 +2287,12 @@ public ReplicateWALEntryResponse replicateWALEntry(final RpcController controlle final ReplicateWALEntryRequest request) throws ServiceException { try { checkOpen(); - if (regionServer.replicationSinkHandler != null) { + if (regionServer.getReplicationSinkService() != null) { requestCount.increment(); List entries = request.getEntryList(); CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); - regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, + regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); @@ -2312,7 +2318,7 @@ public RollWALWriterResponse rollWALWriter(final RpcController controller, checkOpen(); requestCount.increment(); regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); - regionServer.walRoller.requestRollAll(); + regionServer.getWalRoller().requestRollAll(); regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); return builder.build(); @@ -2412,7 +2418,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, } } else { // secure bulk load - map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); + map = regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(map != null); @@ -2433,9 +2439,9 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, } catch (IOException ie) { throw new ServiceException(ie); } finally { - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateBulkLoad( - EnvironmentEdgeManager.currentTime() - start); + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); } } } @@ -2449,7 +2455,7 @@ public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, HRegion region = getRegion(request.getRegion()); - String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); + String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request); PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); builder.setBulkToken(bulkToken); return builder.build(); @@ -2467,9 +2473,8 @@ public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, HRegion region = getRegion(request.getRegion()); - regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); - CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); - return response; + regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request); + return CleanupBulkLoadResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } @@ -2582,11 +2587,12 @@ public GetResponse get(final RpcController controller, } catch (IOException ie) { throw new ServiceException(ie); } finally { - MetricsRegionServer mrs = regionServer.metricsRegionServer; - if (mrs != null) { + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { TableDescriptor td = region != null? region.getTableDescriptor(): null; if (td != null) { - mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before); + metricsRegionServer.updateGet( + td.getTableName(), EnvironmentEdgeManager.currentTime() - before); } } if (quota != null) { @@ -2848,7 +2854,7 @@ public MutateResponse mutate(final RpcController rpcc, MutateResponse.Builder builder = MutateResponse.newBuilder(); MutationProto mutation = request.getMutation(); if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); + regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; Result r = null; @@ -2955,22 +2961,23 @@ public MutateResponse mutate(final RpcController rpcc, quota.close(); } // Update metrics - if (regionServer.metricsRegionServer != null && type != null) { + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null && type != null) { long after = EnvironmentEdgeManager.currentTime(); switch (type) { case DELETE: if (request.hasCondition()) { - regionServer.metricsRegionServer.updateCheckAndDelete(after - before); + metricsRegionServer.updateCheckAndDelete(after - before); } else { - regionServer.metricsRegionServer.updateDelete( + metricsRegionServer.updateDelete( region == null ? null : region.getRegionInfo().getTable(), after - before); } break; case PUT: if (request.hasCondition()) { - regionServer.metricsRegionServer.updateCheckAndPut(after - before); + metricsRegionServer.updateCheckAndPut(after - before); } else { - regionServer.metricsRegionServer.updatePut( + metricsRegionServer.updatePut( region == null ? null : region.getRegionInfo().getTable(),after - before); } break; @@ -3027,7 +3034,7 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep LOG.warn("Getting exception closing " + scannerName, e); } finally { try { - regionServer.leases.cancelLease(scannerName); + regionServer.getLeaseManager().cancelLease(scannerName); } catch (LeaseException e) { LOG.warn("Getting exception closing " + scannerName, e); } @@ -3088,9 +3095,9 @@ private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) } } - private void addScannerLeaseBack(Leases.Lease lease) { + private void addScannerLeaseBack(LeaseManager.Lease lease) { try { - regionServer.leases.addLease(lease); + regionServer.getLeaseManager().addLease(lease); } catch (LeaseStillHeldException e) { // should not happen as the scanner id is unique. throw new AssertionError(e); @@ -3306,10 +3313,11 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan long end = EnvironmentEdgeManager.currentTime(); long responseCellSize = context != null ? context.getResponseCellSize() : 0; region.getMetrics().updateScanTime(end - before); - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScanSize( + final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + metricsRegionServer.updateScanSize( region.getTableDescriptor().getTableName(), responseCellSize); - regionServer.metricsRegionServer.updateScanTime( + metricsRegionServer.updateScanTime( region.getTableDescriptor().getTableName(), end - before); } } finally { @@ -3348,9 +3356,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque LOG.debug( "Server shutting down and client tried to access missing scanner " + scannerName); } - if (regionServer.leases != null) { + final LeaseManager leaseManager = regionServer.getLeaseManager(); + if (leaseManager != null) { try { - regionServer.leases.cancelLease(scannerName); + leaseManager.cancelLease(scannerName); } catch (LeaseException le) { // No problem, ignore if (LOG.isTraceEnabled()) { @@ -3384,11 +3393,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } HRegion region = rsh.r; String scannerName = rsh.scannerName; - Leases.Lease lease; + LeaseManager.Lease lease; try { // Remove lease while its being processed in server; protects against case // where processing of request takes > lease expiration time. - lease = regionServer.leases.removeLease(scannerName); + lease = regionServer.getLeaseManager().removeLease(scannerName); } catch (LeaseException e) { throw new ServiceException(e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 9d6fefe6c908..bf531f47a381 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -195,7 +195,7 @@ public long[] getProcIds() { /** * @return The RegionServer's "Leases" service */ - Leases getLeases(); + LeaseManager getLeaseManager(); /** * @return hbase executor service diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index 9e806bb6117d..d699caed7cd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -59,14 +59,11 @@ public String toString() { } private void doSplitting() { - server.metricsRegionServer.incrSplitRequest(); + server.getMetrics().incrSplitRequest(); if (user != null && user.getUGI() != null) { - user.getUGI().doAs (new PrivilegedAction() { - @Override - public Void run() { - requestRegionSplit(); - return null; - } + user.getUGI().doAs((PrivilegedAction) () -> { + requestRegionSplit(); + return null; }); } else { requestRegionSplit(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java index b94df22c82da..4a65763e1c50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -101,7 +101,7 @@ public String getWalPath() { private void splitWal() throws IOException { SplitLogWorker.TaskExecutor.Status status = - SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory); + SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.getWalFactory()); if (status != SplitLogWorker.TaskExecutor.Status.DONE) { throw new IOException("Split WAL " + walPath + " failed at server "); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 708e8bc97e39..82db21268dde 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; -import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.LeaseManager; import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; @@ -228,7 +228,7 @@ public void setFileSystem(FileSystem hfs) { } @Override - public Leases getLeases() { + public LeaseManager getLeaseManager() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 7f8548801c6d..604797a91df2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; -import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.LeaseManager; import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; @@ -527,7 +527,7 @@ public List getRegions(TableName tableName) throws IOException { } @Override - public Leases getLeases() { + public LeaseManager getLeaseManager() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java index cf80d8e355a5..e0260f9f95a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java @@ -111,7 +111,7 @@ public void setupBasicMocks() { MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class); Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper(); - Mockito.doReturn(rms).when(master).getRegionServerMetrics(); + Mockito.doReturn(rms).when(master).getMetrics(); // Mock admin admin = Mockito.mock(Admin.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java index 5b633c5b12cf..dcc738018782 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java @@ -156,7 +156,7 @@ private void testCleanupAfterFailover(int compactNum) throws Exception { int walNum = rsServedTable.getWALs().size(); // Roll WAL - rsServedTable.walRoller.requestRollAll(); + rsServedTable.getWalRoller().requestRollAll(); // Flush again region.flush(true); // The WAL which contains compaction event marker should be archived diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 9bbce091440e..c5fcf8a1a843 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -501,7 +501,7 @@ public void testFlushingWhenLogRolling() throws Exception { assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); table.put(createPut(1, 12345678)); // Make numRolledLogFiles greater than maxLogs - desiredRegionAndServer.getSecond().walRoller.requestRollAll(); + desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); // Wait for some time till the flush caused by log rolling happens. TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index 2f4dce88fb61..d75d055e44fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -60,12 +59,13 @@ public class TestPriorityRpc { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestPriorityRpc.class); + private Configuration conf; private HRegionServer regionServer = null; private PriorityFunction priority = null; @Before public void setup() { - Configuration conf = HBaseConfiguration.create(); + conf = HBaseConfiguration.create(); conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.getDataTestDir(this.getClass().getName()); @@ -106,8 +106,8 @@ public void testQosFunctionForMeta() throws IOException { .thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable()); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + assertEquals( + HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, createSomeUser())); } @Test @@ -120,8 +120,7 @@ public void testQosFunctionWithoutKnownArgument() throws IOException { headerBuilder.setMethodName("foo"); RequestHeader header = headerBuilder.build(); PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); - assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, createSomeUser())); } @Test @@ -141,12 +140,12 @@ public void testQosFunctionForScanMethod() throws IOException { Mockito.when(mockRpc.getRegion(Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); // make isSystemTable return false - Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); + Mockito.when(mockRegionInfo.getTable()) + .thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - int qos = priority.getPriority(header, scanRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})); - assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); + final int qos = priority.getPriority(header, scanRequest, createSomeUser()); + assertEquals(Integer.toString(qos), qos, HConstants.NORMAL_QOS); //build a scan request with scannerID scanBuilder = ScanRequest.newBuilder(); @@ -158,18 +157,26 @@ public void testQosFunctionForScanMethod() throws IOException { Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); - Mockito.when(mockRegionInfo.getTable()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable()); + Mockito.when(mockRegionInfo.getTable()) + .thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable()); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + assertEquals( + HConstants.SYSTEMTABLE_QOS, + priority.getPriority(header, scanRequest, createSomeUser())); //the same as above but with non-meta region // make isSystemTable return false - Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); - assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest, - User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + Mockito.when(mockRegionInfo.getTable()) + .thenReturn(TableName.valueOf("testQosFunctionForScanMethod")); + assertEquals( + HConstants.NORMAL_QOS, + priority.getPriority(header, scanRequest, createSomeUser())); + } + + private User createSomeUser() { + return User.createUserForTesting(conf, "someuser", new String[] { "somegroup" }); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java index c3cf9d31be79..db281eccc6fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java @@ -110,7 +110,7 @@ public void setupBasicMocks() throws IOException, ServiceException { MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class); Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper(); - Mockito.doReturn(rms).when(rs).getRegionServerMetrics(); + Mockito.doReturn(rms).when(rs).getMetrics(); MetricsHBaseServer ms = Mockito.mock(MetricsHBaseServer.class); Mockito.doReturn(new MetricsHBaseServerWrapperStub()).when(ms).getHBaseServerWrapper(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java index 7a92664db648..544f05f96c49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java @@ -91,8 +91,11 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { // Wait the SCP of abort rs to finish - UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() - .filter(p -> p instanceof ServerCrashProcedure && p.isFinished()).count() > 0); + UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster() + .getMaster() + .getProcedures() + .stream() + .anyMatch(p -> p instanceof ServerCrashProcedure && p.isFinished())); UTIL.getAdmin().disableTable(TABLE_NAME); UTIL.getAdmin().deleteTable(TABLE_NAME); UTIL.shutdownMiniCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 79ee15dba6d9..d4d41fab0ebc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -123,11 +123,11 @@ public static void startCluster() throws Exception { while (cluster.getLiveRegionServerThreads().isEmpty() && cluster.getRegionServer(0) == null && - rs.getRegionServerMetrics() == null) { + rs.getMetrics() == null) { Threads.sleep(100); } rs = cluster.getRegionServer(0); - metricsRegionServer = rs.getRegionServerMetrics(); + metricsRegionServer = rs.getMetrics(); serverSource = metricsRegionServer.getMetricsSource(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index b9f89b72dae6..cfcf849df1a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -121,7 +121,7 @@ public static void stopMasterAndAssignMeta(HBaseTestingUtility HTU) while (true) { sn = MetaTableLocator.getMetaRegionLocation(zkw); if (sn != null && sn.equals(hrs.getServerName()) - && hrs.onlineRegions.containsKey( + && hrs.getOnlineRegions().containsKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { break; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java index 34da4d892da8..f760770f6264 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java @@ -122,13 +122,13 @@ public void accept(HRegion hRegion) { } } ; testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .secureBulkLoadManager.setFsCreatedListener(fsCreatedListener); + .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener); /// create table testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY)); /// prepare files Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0) - .getRegionServer().getRootDir(); + .getRegionServer().getDataRootDir(); Path dir1 = new Path(rootdir, "dir1"); prepareHFile(dir1, key1, value1); Path dir2 = new Path(rootdir, "dir2"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java index 3e5ab9b99d68..239d203c7350 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -125,13 +125,13 @@ Matchers. any(), Matchers. any(), Matchers.> any()); // Find region key; don't pick up key for hbase:meta by mistake. String key = null; - for (Map.Entry entry: rs.onlineRegions.entrySet()) { + for (Map.Entry entry: rs.getOnlineRegions().entrySet()) { if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) { key = entry.getKey(); break; } } - rs.onlineRegions.put(key, spiedRegion); + rs.getOnlineRegions().put(key, spiedRegion); Connection conn = testUtil.getConnection(); try (Table table = conn.getTable(tableName)) { @@ -141,7 +141,7 @@ Matchers. any(), Matchers. any(), long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore); assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); - rs.cacheFlusher.requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY); + rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY); synchronized (flushed) { while (!flushed.booleanValue()) { flushed.wait(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index e1a7ecdf5408..04cf392a7106 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -170,8 +170,11 @@ protected static void rollAllWALs() throws Exception { @Override public boolean evaluate() throws Exception { - return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() - .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); + return UTIL.getMiniHBaseCluster() + .getLiveRegionServerThreads() + .stream() + .map(RegionServerThread::getRegionServer) + .allMatch(HRegionServer::walRollRequestFinished); } @Override