Skip to content

Commit

Permalink
Configure HMaster to reject decommissioned hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
aalhour committed Feb 19, 2024
1 parent 9656006 commit bacf114
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 4 deletions.
14 changes: 14 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,20 @@ public enum OperationStatusCode {
*/
public final static boolean HBASE_SERVER_USEIP_ENABLED_DEFAULT = false;

/**
* Should the HMaster reject hosts of decommissioned RegionServers, bypassing matching their port
* and startcode parts of their ServerName or not? When True, the HMaster will reject a
* RegionServer's request to `reportForDuty` if it's hostname exists in the list of decommissioned
* RegionServers it maintains internally. Added in HBASE-28342.
*/
public final static String REJECT_DECOMMISSIONED_HOSTS_KEY =
"hbase.master.reject.decommissioned.hosts";

/**
* Default value of {@link #REJECT_DECOMMISSIONED_HOSTS_KEY}
*/
public final static boolean REJECT_DECOMMISSIONED_HOSTS_DEFAULT = false;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ public HMaster(final Configuration conf) throws IOException {
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);

// Do we publish the status?

boolean shouldPublish =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
Expand Down Expand Up @@ -997,7 +996,10 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
masterRegion = MasterRegionFactory.create(this);
rsListStorage = new MasterRegionServerList(masterRegion, this);

// Initialize the ServerManager and register it as a configuration observer
this.serverManager = createServerManager(this, rsListStorage);
this.configurationManager.registerObserver(this.serverManager);

this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
if (
!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class HostIsConsideredDecommissionedException extends HBaseIOException {
public HostIsConsideredDecommissionedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -51,6 +52,7 @@
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
Expand Down Expand Up @@ -100,7 +102,7 @@
* only after the handler is fully enabled and has completed the handling.
*/
@InterfaceAudience.Private
public class ServerManager {
public class ServerManager implements ConfigurationObserver {
public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
"hbase.master.wait.on.regionservers.maxtostart";

Expand Down Expand Up @@ -172,6 +174,9 @@ public class ServerManager {
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();

/** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
private volatile boolean rejectDecommissionedHostsConfig;

/**
* Constructor.
*/
Expand All @@ -183,6 +188,35 @@ public ServerManager(final MasterServices master, RegionServerList storage) {
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
persistFlushedSequenceId =
c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
}

/**
* Implementation of the ConfigurationObserver interface. We are interested in live-loading the
* configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
* @param conf Server configuration instance
*/
@Override
public void onConfigurationChange(Configuration conf) {
final boolean newValue = getRejectDecommissionedHostsConfig(conf);
if (rejectDecommissionedHostsConfig == newValue) {
// no-op
return;
}

LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
rejectDecommissionedHostsConfig, newValue);

rejectDecommissionedHostsConfig = newValue;
}

/**
* Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
* @param conf Configuration instance of the Master
*/
public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
}

/**
Expand Down Expand Up @@ -227,11 +261,14 @@ ServerName regionServerStartup(RegionServerStartupRequest request, int versionNu
final String hostname =
request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());

// Check if the host should be rejected based on it's decommissioned status
checkRejectableDecommissionedStatus(sn);

checkClockSkew(sn, request.getServerCurrentTime());
checkIsDead(sn, "STARTUP");
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
LOG.warn(
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
}
storage.started(sn);
return sn;
Expand Down Expand Up @@ -293,6 +330,36 @@ public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDea
updateLastFlushedSequenceIds(sn, sl);
}

/**
* Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
* to do so, any RegionServer trying to join the cluster will have it's host checked against the
* list of hosts of currently decommissioned servers and potentially get prevented from reporting
* for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
* details.
* @param sn The ServerName to check for
* @throws HostIsConsideredDecommissionedException if the Master is configured to reject
* decommissioned hosts and this host exists in
* the list of the decommissioned servers
*/
private void checkRejectableDecommissionedStatus(ServerName sn)
throws HostIsConsideredDecommissionedException {
// If the Master is not configured to reject decommissioned hosts, return early.
if (!rejectDecommissionedHostsConfig) {
return;
}

// Look for a match for the hostname in the list of decommissioned servers
for (ServerName server : getDrainingServersList()) {
if (Objects.equals(server.getHostname(), sn.getHostname())) {
// Found a match and master is configured to reject decommissioned hosts, throw exception!
LOG.warn("Rejecting RegionServer {} from reporting for duty because Master is configured "
+ "to reject decommissioned hosts and this host was marked as such in the past.", sn);
throw new HostIsConsideredDecommissionedException(
"Master is configured to reject decommissioned hosts");
}
}
}

/**
* Check is a server of same host and port already exists, if not, or the existed one got a
* smaller start code, record it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HostIsConsideredDecommissionedException;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
Expand Down Expand Up @@ -2664,6 +2665,11 @@ private RegionServerStartupResponse reportForDuty() throws IOException {
LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof HostIsConsideredDecommissionedException) {
LOG.error(HBaseMarkers.FATAL,
"Master rejected startup because the host is considered decommissioned", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof ServerNotRunningYetException) {
LOG.debug("Master is not running yet");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -241,6 +243,136 @@ public void run() {
waitForClusterOnline(master);
}

/**
* Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
* configured to reject decommissioned hosts and when there is a match for the joining
* RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
*/
@Test
public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);

// Set the cluster to reject decommissioned hosts
cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);

master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
waitForClusterOnline(master);

assertEquals(0, master.getMaster().listDecommissionedRegionServers().size());
assertEquals(0, master.getMaster().getServerManager().getDrainingServersList().size());
assertEquals(1, master.getMaster().getServerManager().getOnlineServers().size());

// Decommission the region server and tries to re-add it
List<ServerName> serversToDecommission = new ArrayList<ServerName>();
serversToDecommission.add(rs.getRegionServer().getServerName());
master.getMaster().decommissionRegionServers(serversToDecommission, true);

// Assert that the server is now decommissioned
ServerName decommissionedServer = master.getMaster().listDecommissionedRegionServers().get(0);
assertEquals(1, master.getMaster().listDecommissionedRegionServers().size());
assertEquals(1, master.getMaster().getServerManager().getDrainingServersList().size());
assertEquals(1, master.getMaster().getServerManager().getOnlineServers().size());
assertEquals(rs.getRegionServer().getServerName().toString(),
decommissionedServer.getServerName());

// Create a second region server
cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
rs2 = cluster.addRegionServer();
rs2.start();
waitForSecondRsStarted();

// Assert that the number of decommissioned and live hosts didn't change and that the hostname
// of rs2 matches that of the decommissioned server
String rs2HostName = rs2.getRegionServer().getServerName().getHostname();
assertEquals(1, master.getMaster().listDecommissionedRegionServers().size());
assertEquals(1, master.getMaster().getServerManager().getDrainingServersList().size());
assertEquals(1, master.getMaster().getServerManager().getOnlineServers().size());
assertEquals(rs2HostName, decommissionedServer.getHostname());
}

/**
* Tests that the RegionServer's reportForDuty gets accepted by the master when the master is not
* configured to reject decommissioned hosts, even when there is a match for the joining
* RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
*/
@Test
public void testReportForDutyGetsAcceptedByMasterWhenNotConfiguredToRejectDecommissionedHosts()
throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);

// Set the cluster to not reject decommissioned hosts (default behavior)
cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, false);

master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
waitForClusterOnline(master);

assertEquals(0, master.getMaster().listDecommissionedRegionServers().size());
assertEquals(0, master.getMaster().getServerManager().getDrainingServersList().size());
assertEquals(1, master.getMaster().getServerManager().getOnlineServers().size());

// Decommission the region server and tries to re-add it
List<ServerName> serversToDecommission = new ArrayList<>();
serversToDecommission.add(rs.getRegionServer().getServerName());
master.getMaster().decommissionRegionServers(serversToDecommission, true);

// Assert that the server is now decommissioned
ServerName decommissionedServer = master.getMaster().listDecommissionedRegionServers().get(0);
assertEquals(1, master.getMaster().listDecommissionedRegionServers().size());
assertEquals(1, master.getMaster().getServerManager().getDrainingServersList().size());
assertEquals(1, master.getMaster().getServerManager().getOnlineServers().size());
assertEquals(rs.getRegionServer().getServerName().toString(),
decommissionedServer.getServerName());

// Create a second region server and try adding both region servers to it, it should succeed
cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
rs2 = cluster.addRegionServer();
rs2.start();
waitForSecondRsStarted();

master.getMaster().stop("Stopping master");

// Start a new master and use another random unique port
// Also let it wait for exactly 2 region severs to report in
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
backupMaster = cluster.addMaster();
LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
backupMaster.start();

waitForClusterOnline(backupMaster);

// Assert that the backup master has become active.
assertTrue(backupMaster.getMaster().isActiveMaster());
assertTrue(backupMaster.getMaster().isInitialized());

// Assert that the number of decommissioned hosts is the same and that the live and online hosts
// did in fact change and rs2 is now part of the cluster even though there is a match for its
// hostname in the list of decommissioned servers
String rs2HostName = rs2.getRegionServer().getServerName().getHostname();
assertEquals(1, backupMaster.getMaster().listDecommissionedRegionServers().size());
assertEquals(1, backupMaster.getMaster().getServerManager().getDrainingServersList().size());
assertEquals(2, backupMaster.getMaster().getServerManager().getOnlineServers().size());
assertEquals(rs2HostName, decommissionedServer.getHostname());
}

/**
* Tests region sever reportForDuty with a non-default environment edge
*/
Expand Down

0 comments on commit bacf114

Please sign in to comment.