Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23322 [hbck2] Simplification on HBCKSCP scheduling #855

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2555,35 +2555,18 @@ public MasterProtos.BypassProcedureResponse bypassProcedure(RpcController contro
public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProcedure(
RpcController controller, MasterProtos.ScheduleServerCrashProcedureRequest request)
throws ServiceException {
List<HBaseProtos.ServerName> serverNames = request.getServerNameList();
List<Long> pids = new ArrayList<>();
try {
for (HBaseProtos.ServerName sn: serverNames) {
ServerName serverName = ProtobufUtil.toServerName(sn);
LOG.info("{} schedule ServerCrashProcedure for {}",
this.master.getClientIdAuditPrefix(), serverName);
if (shouldSubmitSCP(serverName)) {
final boolean containsMetaWALs = containMetaWals(serverName);
long pid = this.master.getServerManager().expireServer(serverName,
new Function<ServerName, Long>() {
@Override
public Long apply(ServerName serverName) {
ProcedureExecutor<MasterProcedureEnv> procExec =
master.getMasterProcedureExecutor();
return procExec.submitProcedure(
new HBCKServerCrashProcedure(procExec.getEnvironment(),
serverName, true, containsMetaWALs));
}
});
pids.add(pid);
} else {
pids.add(Procedure.NO_PROC_ID);
}
for (HBaseProtos.ServerName sn: request.getServerNameList()) {
ServerName serverName = ProtobufUtil.toServerName(sn);
LOG.info("{} schedule ServerCrashProcedure for {}",
this.master.getClientIdAuditPrefix(), serverName);
if (shouldSubmitSCP(serverName)) {
pids.add(this.master.getServerManager().expireServer(serverName, true));
} else {
pids.add(Procedure.NO_PROC_ID);
}
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();
} catch (IOException e) {
throw new ServiceException(e);
}
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
Expand Down Expand Up @@ -532,32 +531,18 @@ private List<String> getRegionServersInZK(final ZKWatcher zkw)

/**
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
* many reasons including the fact that its this server that is going down or we already
* have queued an SCP for this server or SCP processing is currently disabled because we
* are in startup phase).
*/
public boolean expireServer(final ServerName serverName) {
return expireServer(serverName, new Function<ServerName, Long>() {
@Override
public Long apply(ServerName serverName) {
return master.getAssignmentManager().submitServerCrash(serverName, true);
}
}) != Procedure.NO_PROC_ID;
* @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
* not (could happen for many reasons including the fact that its this server that is
* going down or we already have queued an SCP for this server or SCP processing is
* currently disabled because we are in startup phase).
*/
@VisibleForTesting // Redo test so we can make this protected.
public synchronized long expireServer(final ServerName serverName) {
return expireServer(serverName, false);

}

/**
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
* Used when expireServer is externally invoked by hbck2.
* @param function Takes ServerName and returns pid. See default implementation which queues
* an SCP via the AssignmentManager.
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
* many reasons including the fact that its this server that is going down or we already
* have queued an SCP for this server or SCP processing is currently disabled because we
* are in startup phase).
*/
synchronized long expireServer(final ServerName serverName,
Function<ServerName, Long> function) {
synchronized long expireServer(final ServerName serverName, boolean force) {
// THIS server is going down... can't handle our own expiration.
if (serverName.equals(master.getServerName())) {
if (!(master.isAborted() || master.isStopped())) {
Expand All @@ -582,10 +567,7 @@ synchronized long expireServer(final ServerName serverName,
return Procedure.NO_PROC_ID;
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
long pid = function.apply(serverName);
if (pid <= 0) {
return Procedure.NO_PROC_ID;
}
long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
Expand Down Expand Up @@ -1484,15 +1485,21 @@ public int getNumRegionsOpened() {
return 0;
}

public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
boolean carryingMeta;
long pid;
/**
* Usually run by the Master in reaction to server crash during normal processing.
* Can also be invoked via external RPC to effect repair; in the latter case,
* the 'force' flag is set so we push through the SCP though context may indicate
* already-running-SCP (An old SCP may have exited abnormally, or damaged cluster
* may still have references in hbase:meta to 'Unknown Servers' -- servers that
* are not online or in dead servers list, etc.)
* @param force Set if the request came in externally over RPC (via hbck2). Force means
* run the SCP even if it seems as though there might be an outstanding
* SCP running.
* @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
*/
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this force flag. I get that all we have to look for are side-effects, but seems like there should be a way of accounting the actively running procedures and at least wait for the current one to finish before starting the next. Or maybe the procedure implementations can negotiate the mutual exclusion lock between themselves? This could would unconditionally schedule the action and the action itself would refuse to run as long as another one is in flight. And then, of course, the second action might wake up and find that it has no work to do.

// May be an 'Unknown Server' so handle case where serverNode is null.
ServerStateNode serverNode = regionStates.getServerNode(serverName);
if (serverNode == null) {
LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
return -1;
}

// Remove the in-memory rsReports result
synchronized (rsReports) {
rsReports.remove(serverName);
Expand All @@ -1502,26 +1509,43 @@ public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
// sure that, the region list fetched by SCP will not be changed any more.
serverNode.writeLock().lock();
if (serverNode != null) {
serverNode.writeLock().lock();
}
boolean carryingMeta;
long pid;
try {
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
carryingMeta = isCarryingMeta(serverName);
if (!serverNode.isInState(ServerState.ONLINE)) {
LOG.info(
"Skip to add SCP for {} with meta= {}, " +
"since there should be a SCP is processing or already done for this server node",
serverName, carryingMeta);
return -1;
if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta);
return Procedure.NO_PROC_ID;
} else {
serverNode.setState(ServerState.CRASHED);
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
serverName, shouldSplitWal, carryingMeta));
LOG.info(
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
serverName, carryingMeta, pid);
MasterProcedureEnv mpe = procExec.getEnvironment();
// If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
// HBCKSCP scours Master in-memory state AND hbase;meta for references to
// serverName just-in-case. An SCP that is scheduled when the server is
// 'Unknown' probably originated externally with HBCK2 fix-it tool.
ServerState oldState = null;
if (serverNode != null) {
oldState = serverNode.getState();
serverNode.setState(ServerState.CRASHED);
}

if (force) {
pid = procExec.submitProcedure(
new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
} else {
pid = procExec.submitProcedure(
new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
}
LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta,
serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
}
} finally {
serverNode.writeLock().unlock();
if (serverNode != null) {
serverNode.writeLock().unlock();
}
}
return pid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,9 @@ public void removeServer(final ServerName serverName) {
serverMap.remove(serverName);
}

/**
* @return Pertinent ServerStateNode or NULL if none found.
*/
@VisibleForTesting
public ServerStateNode getServerNode(final ServerName serverName) {
return serverMap.get(serverName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@
*/
@InterfaceAudience.Private
public class ServerStateNode implements Comparable<ServerStateNode> {

private final Set<RegionStateNode> regions;
private final ServerName serverName;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private volatile ServerState state = ServerState.ONLINE;

public ServerStateNode(ServerName serverName) {
Expand Down Expand Up @@ -120,6 +117,7 @@ public boolean equals(final Object other) {

@Override
public String toString() {
return String.format("ServerStateNode(%s)", getServerName());
return getServerName() + "/" + getState() + "/regionCount=" + this.regions.size() +
"/lock=" + this.lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,15 @@ List<RegionInfo> getRegionsOnCrashedServer(MasterProcedureEnv env) {
LOG.warn("Failed get of all regions; continuing", ioe);
}
if (ps == null || ps.isEmpty()) {
LOG.warn("No regions found in hbase:meta");
return ris;
}
List<RegionInfo> aggregate = ris == null || ris.isEmpty()?
new ArrayList<>(): new ArrayList<>(ris);
int before = aggregate.size();
ps.stream().filter(p -> p.getSecond() != null && p.getSecond().equals(getServerName())).
forEach(p -> aggregate.add(p.getFirst()));
LOG.info("Found {} mentions of {} in hbase:meta", aggregate.size() - before, getServerName());
return aggregate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,17 @@ public void test() throws Exception {
((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
assertFalse("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
Procedure.NO_PROC_ID);

// Wait the SCP to finish
SCP_LATCH.countDown();
UTIL.waitFor(60000, () -> procedure.get().isFinished());

assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
SERVER_FOR_TEST,
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
Procedure.NO_PROC_ID);
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(SERVER_FOR_TEST);
assertNull("serverNode should be deleted after SCP finished", serverNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ protected void sendTransitionReport(final ServerName serverName,

protected void doCrash(final ServerName serverName) {
this.master.getServerManager().moveFromOnlineToDeadServers(serverName);
this.am.submitServerCrash(serverName, false/* No WALs here */);
this.am.submitServerCrash(serverName, false/* No WALs here */, false);
// add a new server to avoid killing all the region servers which may hang the UTs
ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
newRsAdded++;
Expand Down