Skip to content

Commit

Permalink
HBASE-23259: Ability to start minicluster with pre-determined master …
Browse files Browse the repository at this point in the history
…ports

This patch adds the plumbing needed to start the mini cluster with
a pre-determined set of master ports. This is required for HBASE-18095
because the internal client 'Connection's from region servers need
to know the master RPC end points in their configuration even before
the mini cluster starts.

Following patches on HBASE-18095 will use this plumbing for unit tests.
  • Loading branch information
bharathv committed Nov 11, 2019
1 parent 9494c12 commit a9d7824
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 31 deletions.
Expand Up @@ -171,6 +171,11 @@ public enum OperationStatusCode {
/** Configuration key for master web API port */
public static final String MASTER_INFO_PORT = "hbase.master.info.port";

/** Configuration key for the list of master host:ports **/
public static final String MASTER_ADDRS_KEY = "hbase.master.addrs";

public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;

/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";

Expand Down
Expand Up @@ -23,23 +23,21 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;

import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.JVMClusterUtil;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* This class creates a single process HBase cluster. One thread is created for
* a master and one per region server.
Expand All @@ -62,10 +60,6 @@ public class LocalHBaseCluster {
private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>();
private final static int DEFAULT_NO = 1;
/** local mode */
public static final String LOCAL = "local";
/** 'local:' */
public static final String LOCAL_COLON = LOCAL + ":";
public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports";

private final Configuration conf;
Expand All @@ -90,7 +84,7 @@ public LocalHBaseCluster(final Configuration conf)
* @throws IOException
*/
public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
throws IOException {
throws IOException {
this(conf, 1, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf));
}
Expand All @@ -104,14 +98,14 @@ public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
* @throws IOException
*/
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers)
throws IOException {
final int noRegionServers) throws IOException {
this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf));
}

@SuppressWarnings("unchecked")
private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
private static Class<? extends HRegionServer> getRegionServerImplementation(
final Configuration conf) {
return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
HRegionServer.class);
}
Expand Down Expand Up @@ -183,6 +177,47 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters,
}
}

/**
* Create new LocalHBaseCluster using pre-defined master rpc ports. Every other port is picked
* randomly. This also populates the master addresses in the base and region server configs.
* @param conf Base configuration to use for the cluster.
* @param noMasters Number of masters.
* @param noRegionServers Number of region servers.
* @param masterClass Impl of master class
* @param regionServerClass Impl of RS class
* @param masterPorts Array of ports, 2 per master (RPC/INFO) to use.
*/
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers, final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass, final List<Integer> masterPorts)
throws IOException {
this.conf = conf;
Preconditions.checkArgument(masterPorts.size() == noMasters);
Preconditions.checkArgument(conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, false));
this.masterClass = (Class<? extends HMaster>)
conf.getClass(HConstants.MASTER_IMPL, masterClass);
this.regionServerClass =
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
regionServerClass);
// Every port except the master ports are random.
conf.set(HConstants.MASTER_INFO_PORT, "0");
conf.set(HConstants.REGIONSERVER_PORT, "0");
conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
List<String> masterHostPorts = new ArrayList<>();
for (int i = 0, j = 0; i < noMasters; i++) {
Configuration c = new Configuration(conf);
conf.setInt(HConstants.MASTER_PORT, masterPorts.get(j++));
HMaster master = addMaster(c, i).getMaster();
masterHostPorts.add(
master.getServerName().getHostname() + ":" + master.getServerName().getPort());
}
// Populate the master addresses in the base configuration.
this.conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
for (int i = 0; i< noRegionServers; i++) {
addRegionServer(new Configuration(this.conf), i);
}
}

public JVMClusterUtil.RegionServerThread addRegionServer()
throws IOException {
return addRegionServer(new Configuration(conf), this.regionThreads.size());
Expand Down
Expand Up @@ -1049,6 +1049,21 @@ public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
return startMiniCluster(option);
}

/**
* Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number and
* a set of master ports. All other options will use default values, defined in
* {@link StartMiniClusterOption.Builder}.
* @param numSlaves slave node number, for both HBase region server and HDFS data node.
* @see #startMiniCluster(StartMiniClusterOption option)
* @see #shutdownMiniDFSCluster()
*/
public MiniHBaseCluster startMiniClusterWithMasterPorts(int numSlaves) throws Exception {
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numRegionServers(numSlaves).numDataNodes(numSlaves)
.masterPorts(randomFreePorts(1)).build();
return startMiniCluster(option);
}

/**
* Start up a minicluster of hbase, dfs and zookeeper all using default options.
* Option default value can be found in {@link StartMiniClusterOption.Builder}.
Expand Down Expand Up @@ -1127,7 +1142,8 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
TraceUtil.initTracer(c);
this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
option.getMasterPorts(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Table t = getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());
Expand Down Expand Up @@ -1254,7 +1270,8 @@ public void restartHBaseCluster(StartMiniClusterOption option)
}
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
option.getMasterPorts(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
Expand Down Expand Up @@ -3836,6 +3853,14 @@ public static int randomFreePort() {
return portAllocator.randomFreePort();
}

public static List<Integer> randomFreePorts(int numPorts) {
List<Integer> ports = new ArrayList<>();
for (int i = 0; i < numPorts; i++) {
ports.add(portAllocator.randomFreePort());
}
return ports;
}

static class PortAllocator {
private static final int MIN_RANDOM_PORT = 0xc000;
private static final int MAX_RANDOM_PORT = 0xfffe;
Expand Down
Expand Up @@ -40,7 +40,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;

/**
Expand Down Expand Up @@ -88,10 +88,11 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass);
this(conf, numMasters, numRegionServers, null, null, masterClass, regionserverClass);
}

/**
* @param masterPorts Ports to be used by master.
* @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
* restart where for sure the regionservers come up on same address+port (but
* just with different startcode); by default mini hbase clusters choose new
Expand All @@ -100,7 +101,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers
* @throws InterruptedException
*/
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
List<Integer> rsPorts,
List<Integer> masterPorts, List<Integer> rsPorts,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
Expand All @@ -109,7 +110,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers
// Hadoop 2
CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();

init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass);
init(numMasters, numRegionServers, masterPorts, rsPorts, masterClass, regionserverClass);
this.initialClusterStatus = getClusterMetrics();
}

Expand Down Expand Up @@ -225,9 +226,10 @@ public void run() {
}
}

private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
private void init(final int nMasterNodes, final int nRegionNodes,
final List<Integer> masterPorts, final List<Integer> rsPorts,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
try {
if (masterClass == null){
Expand All @@ -238,8 +240,14 @@ private void init(final int nMasterNodes, final int nRegionNodes, List<Integer>
}

// start up a LocalHBaseCluster
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
masterClass, regionserverClass);
if (masterPorts == null) {
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
masterClass, regionserverClass);
} else {
Preconditions.checkState(masterPorts.size() == nMasterNodes);
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, masterClass,
regionserverClass, masterPorts);
}

// manually add the regionservers as other users
for (int i = 0; i < nRegionNodes; i++) {
Expand Down
Expand Up @@ -51,6 +51,11 @@ public final class StartMiniClusterOption {
*/
private final Class<? extends HMaster> masterClass;

/**
* List of ports (one port per master) for masters to use.
*/
private final List<Integer> masterPorts;

/**
* Number of region servers to start up.
* If this value is > 1, then make sure config "hbase.regionserver.info.port" is -1
Expand Down Expand Up @@ -100,11 +105,12 @@ public final class StartMiniClusterOption {
* Private constructor. Use {@link Builder#build()}.
*/
private StartMiniClusterOption(int numMasters, Class<? extends HMaster> masterClass,
int numRegionServers, List<Integer> rsPorts,
int numRegionServers, List<Integer> masterPorts, List<Integer> rsPorts,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes,
String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) {
this.numMasters = numMasters;
this.masterClass = masterClass;
this.masterPorts = masterPorts;
this.numRegionServers = numRegionServers;
this.rsPorts = rsPorts;
this.rsClass = rsClass;
Expand All @@ -123,6 +129,10 @@ public Class<? extends HMaster> getMasterClass() {
return masterClass;
}

public List<Integer> getMasterPorts() {
return masterPorts;
}

public int getNumRegionServers() {
return numRegionServers;
}
Expand Down Expand Up @@ -180,6 +190,7 @@ public static Builder builder() {
public static final class Builder {
private int numMasters = 1;
private Class<? extends HMaster> masterClass = null;
private List<Integer> masterPorts = null;
private int numRegionServers = 1;
private List<Integer> rsPorts = null;
private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass = null;
Expand All @@ -196,8 +207,8 @@ public StartMiniClusterOption build() {
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length;
}
return new StartMiniClusterOption(numMasters, masterClass, numRegionServers, rsPorts, rsClass,
numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir);
return new StartMiniClusterOption(numMasters, masterClass, numRegionServers, masterPorts,
rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir);
}

public Builder numMasters(int numMasters) {
Expand All @@ -210,6 +221,11 @@ public Builder masterClass(Class<? extends HMaster> masterClass) {
return this;
}

public Builder masterPorts(List<Integer> masterPorts) {
this.masterPorts = masterPorts;
return this;
}

public Builder numRegionServers(int numRegionServers) {
this.numRegionServers = numRegionServers;
return this;
Expand Down

0 comments on commit a9d7824

Please sign in to comment.