From a9d7824f44796e612981e063862eeda73f83153f Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 7 Nov 2019 17:09:47 -0800 Subject: [PATCH] HBASE-23259: Ability to start minicluster with pre-determined master ports This patch adds the plumbing needed to start the mini cluster with a pre-determined set of master ports. This is required for HBASE-18095 because the internal client 'Connection's from region servers need to know the master RPC end points in their configuration even before the mini cluster starts. Following patches on HBASE-18095 will use this plumbing for unit tests. --- .../org/apache/hadoop/hbase/HConstants.java | 5 ++ .../hadoop/hbase/LocalHBaseCluster.java | 69 ++++++++++++----- .../hadoop/hbase/HBaseTestingUtility.java | 29 ++++++- .../apache/hadoop/hbase/MiniHBaseCluster.java | 26 ++++--- .../hadoop/hbase/StartMiniClusterOption.java | 22 +++++- .../TestClusterWithFixedMasterPorts.java | 75 +++++++++++++++++++ 6 files changed, 195 insertions(+), 31 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestClusterWithFixedMasterPorts.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ce482ed022e7..af4796057ce5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -171,6 +171,11 @@ public enum OperationStatusCode { /** Configuration key for master web API port */ public static final String MASTER_INFO_PORT = "hbase.master.info.port"; + /** Configuration key for the list of master host:ports **/ + public static final String MASTER_ADDRS_KEY = "hbase.master.addrs"; + + public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT; + /** Parameter name for the master type being backup (waits for primary to go inactive). */ public static final String MASTER_TYPE_BACKUP = "hbase.master.backup"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index d3e62dc39001..7cd0789a7d9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -23,23 +23,21 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; - -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.util.JVMClusterUtil; - +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** * This class creates a single process HBase cluster. One thread is created for * a master and one per region server. @@ -62,10 +60,6 @@ public class LocalHBaseCluster { private final List masterThreads = new CopyOnWriteArrayList<>(); private final List regionThreads = new CopyOnWriteArrayList<>(); private final static int DEFAULT_NO = 1; - /** local mode */ - public static final String LOCAL = "local"; - /** 'local:' */ - public static final String LOCAL_COLON = LOCAL + ":"; public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports"; private final Configuration conf; @@ -90,7 +84,7 @@ public LocalHBaseCluster(final Configuration conf) * @throws IOException */ public LocalHBaseCluster(final Configuration conf, final int noRegionServers) - throws IOException { + throws IOException { this(conf, 1, noRegionServers, getMasterImplementation(conf), getRegionServerImplementation(conf)); } @@ -104,14 +98,14 @@ public LocalHBaseCluster(final Configuration conf, final int noRegionServers) * @throws IOException */ public LocalHBaseCluster(final Configuration conf, final int noMasters, - final int noRegionServers) - throws IOException { + final int noRegionServers) throws IOException { this(conf, noMasters, noRegionServers, getMasterImplementation(conf), getRegionServerImplementation(conf)); } @SuppressWarnings("unchecked") - private static Class getRegionServerImplementation(final Configuration conf) { + private static Class getRegionServerImplementation( + final Configuration conf) { return (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); } @@ -183,6 +177,47 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters, } } + /** + * Create new LocalHBaseCluster using pre-defined master rpc ports. Every other port is picked + * randomly. This also populates the master addresses in the base and region server configs. + * @param conf Base configuration to use for the cluster. + * @param noMasters Number of masters. + * @param noRegionServers Number of region servers. + * @param masterClass Impl of master class + * @param regionServerClass Impl of RS class + * @param masterPorts Array of ports, 2 per master (RPC/INFO) to use. + */ + public LocalHBaseCluster(final Configuration conf, final int noMasters, + final int noRegionServers, final Class masterClass, + final Class regionServerClass, final List masterPorts) + throws IOException { + this.conf = conf; + Preconditions.checkArgument(masterPorts.size() == noMasters); + Preconditions.checkArgument(conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, false)); + this.masterClass = (Class) + conf.getClass(HConstants.MASTER_IMPL, masterClass); + this.regionServerClass = + (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, + regionServerClass); + // Every port except the master ports are random. + conf.set(HConstants.MASTER_INFO_PORT, "0"); + conf.set(HConstants.REGIONSERVER_PORT, "0"); + conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); + List masterHostPorts = new ArrayList<>(); + for (int i = 0, j = 0; i < noMasters; i++) { + Configuration c = new Configuration(conf); + conf.setInt(HConstants.MASTER_PORT, masterPorts.get(j++)); + HMaster master = addMaster(c, i).getMaster(); + masterHostPorts.add( + master.getServerName().getHostname() + ":" + master.getServerName().getPort()); + } + // Populate the master addresses in the base configuration. + this.conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts)); + for (int i = 0; i< noRegionServers; i++) { + addRegionServer(new Configuration(this.conf), i); + } + } + public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException { return addRegionServer(new Configuration(conf), this.regionThreads.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a50ac11db6eb..8b280c2b4d68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1049,6 +1049,21 @@ public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception { return startMiniCluster(option); } + /** + * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number and + * a set of master ports. All other options will use default values, defined in + * {@link StartMiniClusterOption.Builder}. + * @param numSlaves slave node number, for both HBase region server and HDFS data node. + * @see #startMiniCluster(StartMiniClusterOption option) + * @see #shutdownMiniDFSCluster() + */ + public MiniHBaseCluster startMiniClusterWithMasterPorts(int numSlaves) throws Exception { + StartMiniClusterOption option = StartMiniClusterOption.builder() + .numRegionServers(numSlaves).numDataNodes(numSlaves) + .masterPorts(randomFreePorts(1)).build(); + return startMiniCluster(option); + } + /** * Start up a minicluster of hbase, dfs and zookeeper all using default options. * Option default value can be found in {@link StartMiniClusterOption.Builder}. @@ -1127,7 +1142,8 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option) TraceUtil.initTracer(c); this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(), - option.getRsPorts(), option.getMasterClass(), option.getRsClass()); + option.getMasterPorts(), option.getRsPorts(), option.getMasterClass(), + option.getRsClass()); // Don't leave here till we've done a successful scan of the hbase:meta Table t = getConnection().getTable(TableName.META_TABLE_NAME); ResultScanner s = t.getScanner(new Scan()); @@ -1254,7 +1270,8 @@ public void restartHBaseCluster(StartMiniClusterOption option) } this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(), - option.getRsPorts(), option.getMasterClass(), option.getRsClass()); + option.getMasterPorts(), option.getRsPorts(), option.getMasterClass(), + option.getRsClass()); // Don't leave here till we've done a successful scan of the hbase:meta Connection conn = ConnectionFactory.createConnection(this.conf); Table t = conn.getTable(TableName.META_TABLE_NAME); @@ -3836,6 +3853,14 @@ public static int randomFreePort() { return portAllocator.randomFreePort(); } + public static List randomFreePorts(int numPorts) { + List ports = new ArrayList<>(); + for (int i = 0; i < numPorts; i++) { + ports.add(portAllocator.randomFreePort()); + } + return ports; + } + static class PortAllocator { private static final int MIN_RANDOM_PORT = 0xc000; private static final int MAX_RANDOM_PORT = 0xfffe; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 948ba1f2c4e3..61371bbbaae8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -40,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; /** @@ -88,10 +88,11 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers Class masterClass, Class regionserverClass) throws IOException, InterruptedException { - this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass); + this(conf, numMasters, numRegionServers, null, null, masterClass, regionserverClass); } /** + * @param masterPorts Ports to be used by master. * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster * restart where for sure the regionservers come up on same address+port (but * just with different startcode); by default mini hbase clusters choose new @@ -100,7 +101,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers * @throws InterruptedException */ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, - List rsPorts, + List masterPorts, List rsPorts, Class masterClass, Class regionserverClass) throws IOException, InterruptedException { @@ -109,7 +110,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers // Hadoop 2 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); - init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass); + init(numMasters, numRegionServers, masterPorts, rsPorts, masterClass, regionserverClass); this.initialClusterStatus = getClusterMetrics(); } @@ -225,9 +226,10 @@ public void run() { } } - private void init(final int nMasterNodes, final int nRegionNodes, List rsPorts, - Class masterClass, - Class regionserverClass) + private void init(final int nMasterNodes, final int nRegionNodes, + final List masterPorts, final List rsPorts, + Class masterClass, + Class regionserverClass) throws IOException, InterruptedException { try { if (masterClass == null){ @@ -238,8 +240,14 @@ private void init(final int nMasterNodes, final int nRegionNodes, List } // start up a LocalHBaseCluster - hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, - masterClass, regionserverClass); + if (masterPorts == null) { + hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, + masterClass, regionserverClass); + } else { + Preconditions.checkState(masterPorts.size() == nMasterNodes); + hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, masterClass, + regionserverClass, masterPorts); + } // manually add the regionservers as other users for (int i = 0; i < nRegionNodes; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java index ad70c95b7c8f..db1c05bdd28e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java @@ -51,6 +51,11 @@ public final class StartMiniClusterOption { */ private final Class masterClass; + /** + * List of ports (one port per master) for masters to use. + */ + private final List masterPorts; + /** * Number of region servers to start up. * If this value is > 1, then make sure config "hbase.regionserver.info.port" is -1 @@ -100,11 +105,12 @@ public final class StartMiniClusterOption { * Private constructor. Use {@link Builder#build()}. */ private StartMiniClusterOption(int numMasters, Class masterClass, - int numRegionServers, List rsPorts, + int numRegionServers, List masterPorts, List rsPorts, Class rsClass, int numDataNodes, String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) { this.numMasters = numMasters; this.masterClass = masterClass; + this.masterPorts = masterPorts; this.numRegionServers = numRegionServers; this.rsPorts = rsPorts; this.rsClass = rsClass; @@ -123,6 +129,10 @@ public Class getMasterClass() { return masterClass; } + public List getMasterPorts() { + return masterPorts; + } + public int getNumRegionServers() { return numRegionServers; } @@ -180,6 +190,7 @@ public static Builder builder() { public static final class Builder { private int numMasters = 1; private Class masterClass = null; + private List masterPorts = null; private int numRegionServers = 1; private List rsPorts = null; private Class rsClass = null; @@ -196,8 +207,8 @@ public StartMiniClusterOption build() { if (dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } - return new StartMiniClusterOption(numMasters, masterClass, numRegionServers, rsPorts, rsClass, - numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir); + return new StartMiniClusterOption(numMasters, masterClass, numRegionServers, masterPorts, + rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir); } public Builder numMasters(int numMasters) { @@ -210,6 +221,11 @@ public Builder masterClass(Class masterClass) { return this; } + public Builder masterPorts(List masterPorts) { + this.masterPorts = masterPorts; + return this; + } + public Builder numRegionServers(int numRegionServers) { this.numRegionServers = numRegionServers; return this; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClusterWithFixedMasterPorts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClusterWithFixedMasterPorts.java new file mode 100644 index 000000000000..2f471a43be53 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClusterWithFixedMasterPorts.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.randomFreePorts; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestClusterWithFixedMasterPorts { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClusterWithFixedMasterPorts.class); + + private static final int MAX_INIT_RETRIES = 10; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @AfterClass + public static void cleanUp() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test runs a mini cluster with pre-determined master ports and then verifies that the + * configuration is reflected in the cluster conf accordingly. + */ + @Test + public void testValidateMasterAddrConfig() throws Exception { + int retries = 0; + boolean doRetry = false; + StartMiniClusterOption option = StartMiniClusterOption.builder() + .numRegionServers(1).numMasters(3) + .masterPorts(randomFreePorts(3)).build(); + do { + try { + TEST_UTIL.startMiniCluster(option); + } catch (BindException e) { + // Theoretically random ports can still cause port conflicts because they could be picked up + // by another process/thread before we bind to them. + doRetry = true; + } + } while (doRetry && retries++ < MAX_INIT_RETRIES); + List actualMasterAddrs = new ArrayList<>(); + for (JVMClusterUtil.MasterThread master: TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) { + ServerName sname = master.getMaster().getServerName(); + actualMasterAddrs.add(sname.getHostname() + ":" + sname.getPort()); + } + Assert.assertEquals(String.join(",", actualMasterAddrs), + TEST_UTIL.getMiniHBaseCluster().getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); + } +}