Skip to content

Commit

Permalink
HBASE-23282 HBCKServerCrashProcedure for 'Unknown Servers'
Browse files Browse the repository at this point in the history
Have the existing scheduleRecoveries launch a new HBCKSCP
instead of SCP. It gets regions to recover from Master
in-memory context AND from a scan of hbase:meta. This
new HBCKSCP is For processing 'Unknown Servers', servers that
are 'dead' and purged but still have references in
hbase:meta. Rare occurance but needs tooling to address.
Later have catalogjanitor take care of these deviations
between Master in-memory and hbase:meta content (usually
because of overdriven cluster with failed RPCs to hbase:meta,
etc)

Changed expireServers in ServerManager so could pass in
custom reaction to expired server.... This is how we
run our custom HBCKSCP while keeping all other aspects
of expiring services (rather than try replicate it
externally).
  • Loading branch information
saintstack committed Nov 18, 2019
1 parent ab63bde commit 44c8b58
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 55 deletions.
Expand Up @@ -248,6 +248,10 @@ public boolean isClosed() {
return state == State.CLOSED;
}

public boolean isClosedOrAbnormallyClosed() {
return isClosed() || this.state == State.ABNORMALLY_CLOSED;
}

public boolean isOpening() {
return state == State.OPENING;
}
Expand Down
@@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -80,6 +79,7 @@
// TODO: Only works with single hbase:meta region currently. Fix.
// TODO: Should it start over every time? Could it continue if runs into problem? Only if
// problem does not mess up 'results'.
// TODO: Do more by way of 'repair'; see note on unknownServers below.
@InterfaceAudience.Private
public class CatalogJanitor extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(CatalogJanitor.class.getName());
Expand Down Expand Up @@ -169,17 +169,16 @@ int scan() throws IOException {
LOG.debug("CatalogJanitor already running");
return gcs;
}
Report report = scanForReport();
this.lastReport = report;
if (!report.isEmpty()) {
LOG.warn(report.toString());
this.lastReport = scanForReport();
if (!this.lastReport.isEmpty()) {
LOG.warn(this.lastReport.toString());
}

if (isRIT(this.services.getAssignmentManager())) {
LOG.warn("Playing-it-safe skipping merge/split gc'ing of regions from hbase:meta while " +
"regions-in-transition (RIT)");
}
Map<RegionInfo, Result> mergedRegions = report.mergedRegions;
Map<RegionInfo, Result> mergedRegions = this.lastReport.mergedRegions;
for (Map.Entry<RegionInfo, Result> e : mergedRegions.entrySet()) {
if (this.services.isInMaintenanceMode()) {
// Stop cleaning if the master is in maintenance mode
Expand All @@ -192,7 +191,7 @@ int scan() throws IOException {
}
}
// Clean split parents
Map<RegionInfo, Result> splitParents = report.splitParents;
Map<RegionInfo, Result> splitParents = this.lastReport.splitParents;

// Now work on our list of found parents. See if any we can clean up.
HashSet<String> parentNotCleaned = new HashSet<>();
Expand Down Expand Up @@ -443,7 +442,14 @@ public static class Report {

private final List<Pair<RegionInfo, RegionInfo>> holes = new ArrayList<>();
private final List<Pair<RegionInfo, RegionInfo>> overlaps = new ArrayList<>();

/**
* TODO: If CatalogJanitor finds an 'Unknown Server', it should 'fix' it by queuing
* a {@link org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure} for
* found server for it to clean up meta.
*/
private final List<Pair<RegionInfo, ServerName>> unknownServers = new ArrayList<>();

private final List<byte []> emptyRegionInfo = new ArrayList<>();

@VisibleForTesting
Expand Down Expand Up @@ -703,7 +709,9 @@ private void checkServer(RegionLocations locations) {
if (locations.getRegionLocations() == null) {
return;
}
// Check referenced servers are known/online.
// Check referenced servers are known/online. Here we are looking
// at both the default replica -- the main replica -- and then replica
// locations too.
for (HRegionLocation location: locations.getRegionLocations()) {
if (location == null) {
continue;
Expand All @@ -717,19 +725,25 @@ private void checkServer(RegionLocations locations) {
// This should never happen but if it does, will mess up below.
continue;
}
RegionInfo ri = location.getRegion();
// Skip split parent region
if (location.getRegion().isSplitParent()) {
if (ri.isSplitParent()) {
continue;
}
// skip the offline regions which belong to disabled table.
if (isTableDisabled(location.getRegion())) {
if (isTableDisabled(ri)) {
continue;
}
RegionState rs = this.services.getAssignmentManager().getRegionStates().getRegionState(ri);
if (rs.isClosedOrAbnormallyClosed()) {
// If closed against an 'Unknown Server', that is should be fine.
continue;
}
ServerManager.ServerLiveState state = this.services.getServerManager().
isServerKnownAndOnline(sn);
switch (state) {
case UNKNOWN:
this.report.unknownServers.add(new Pair<>(location.getRegion(), sn));
this.report.unknownServers.add(new Pair<>(ri, sn));
break;

default:
Expand Down
Expand Up @@ -61,8 +61,8 @@ public class DeadServer {
private final Set<ServerName> processingServers = new HashSet<ServerName>();

/**
* A dead server that comes back alive has a different start code. The new start code should be
* greater than the old one, but we don't take this into account in this method.
* Handles restart of a server. The new server instance has a different start code.
* The new start code should be greater than the old one. We don't check that here.
*
* @param newServerName Servername as either <code>host:port</code> or
* <code>host,port,startcode</code>.
Expand All @@ -78,7 +78,8 @@ public synchronized boolean cleanPreviousInstance(final ServerName newServerName
// remove from processingServers
boolean removed = processingServers.remove(sn);
if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
LOG.debug("Removed {}, processing={}, numProcessing={}", sn, removed,
processingServers.size());
}
return true;
}
Expand Down Expand Up @@ -122,7 +123,6 @@ public synchronized Set<ServerName> copyServerNames() {

/**
* Adds the server to the dead server list if it's not there already.
* @param sn the server name
*/
public synchronized void add(ServerName sn) {
if (!deadServers.containsKey(sn)){
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
Expand Down Expand Up @@ -2556,17 +2558,26 @@ public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProc
List<HBaseProtos.ServerName> serverNames = request.getServerNameList();
List<Long> pids = new ArrayList<>();
try {
for (HBaseProtos.ServerName serverName : serverNames) {
ServerName server = ProtobufUtil.toServerName(serverName);
for (HBaseProtos.ServerName sn: serverNames) {
ServerName serverName = ProtobufUtil.toServerName(sn);
LOG.info("{} schedule ServerCrashProcedure for {}",
master.getClientIdAuditPrefix(), server);
if (shouldSubmitSCP(server)) {
master.getServerManager().moveFromOnlineToDeadServers(server);
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
pids.add(procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
server, true, containMetaWals(server))));
this.master.getClientIdAuditPrefix(), serverName);
if (shouldSubmitSCP(serverName)) {
final boolean containsMetaWALs = containMetaWals(serverName);
long pid = this.master.getServerManager().expireServer(serverName,
new Function<ServerName, Long>() {
@Override
public Long apply(ServerName serverName) {
ProcedureExecutor<MasterProcedureEnv> procExec =
master.getMasterProcedureExecutor();
return procExec.submitProcedure(
new HBCKServerCrashProcedure(procExec.getEnvironment(),
serverName, true, containsMetaWALs));
}
});
pids.add(pid);
} else {
pids.add(-1L);
pids.add(Procedure.NO_PROC_ID);
}
}
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
Expand All @@ -49,6 +50,7 @@
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
Expand Down Expand Up @@ -535,17 +537,37 @@ private List<String> getRegionServersInZK(final ZKWatcher zkw)
* have queued an SCP for this server or SCP processing is currently disabled because we
* are in startup phase).
*/
public synchronized boolean expireServer(final ServerName serverName) {
public boolean expireServer(final ServerName serverName) {
return expireServer(serverName, new Function<ServerName, Long>() {
@Override
public Long apply(ServerName serverName) {
return master.getAssignmentManager().submitServerCrash(serverName, true);
}
}) != Procedure.NO_PROC_ID;
}

/**
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
* Used when expireServer is externally invoked by hbck2.
* @param function Takes ServerName and returns pid. See default implementation which queues
* an SCP via the AssignmentManager.
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
* many reasons including the fact that its this server that is going down or we already
* have queued an SCP for this server or SCP processing is currently disabled because we
* are in startup phase).
*/
synchronized long expireServer(final ServerName serverName,
Function<ServerName, Long> function) {
// THIS server is going down... can't handle our own expiration.
if (serverName.equals(master.getServerName())) {
if (!(master.isAborted() || master.isStopped())) {
master.stop("We lost our znode?");
}
return false;
return Procedure.NO_PROC_ID;
}
if (this.deadservers.isDeadServer(serverName)) {
LOG.warn("Expiration called on {} but crash processing already in progress", serverName);
return false;
LOG.warn("Expiration called on {} but already in DeadServer", serverName);
return Procedure.NO_PROC_ID;
}
moveFromOnlineToDeadServers(serverName);

Expand All @@ -557,36 +579,38 @@ public synchronized boolean expireServer(final ServerName serverName) {
if (this.onlineServers.isEmpty()) {
master.stop("Cluster shutdown set; onlineServer=0");
}
return false;
return Procedure.NO_PROC_ID;
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
long pid = master.getAssignmentManager().submitServerCrash(serverName, true);
long pid = function.apply(serverName);
if (pid <= 0) {
return false;
} else {
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
for (ServerListener listener : this.listeners) {
listener.serverRemoved(serverName);
}
}
return true;
return Procedure.NO_PROC_ID;
}
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
}
return pid;
}

// Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
@VisibleForTesting
public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
synchronized (onlineServers) {
if (!this.onlineServers.containsKey(sn)) {
synchronized (this.onlineServers) {
boolean online = this.onlineServers.containsKey(sn);
if (online) {
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
} else {
// If not online, that is odd but may happen if 'Unknown Servers' -- where meta
// has references to servers not online nor in dead servers list. If
// 'Unknown Server', don't add to DeadServers else will be there for ever.
LOG.trace("Expiration of {} but server not online", sn);
}
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
}
this.rsAdmins.remove(sn);
}
Expand Down
Expand Up @@ -1488,7 +1488,7 @@ public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
boolean carryingMeta;
long pid;
ServerStateNode serverNode = regionStates.getServerNode(serverName);
if(serverNode == null){
if (serverNode == null) {
LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
return -1;
}
Expand All @@ -1498,7 +1498,7 @@ public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
rsReports.remove(serverName);
}

// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
// sure that, the region list fetched by SCP will not be changed any more.
Expand Down

0 comments on commit 44c8b58

Please sign in to comment.