Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* Class to hold dead servers list and utility querying dead server list. Servers are added when
* they expire or when we find them in filesystem on startup. When a server crash procedure is
Expand All @@ -56,12 +54,6 @@ public class DeadServer {
*/
private final Map<ServerName, Long> deadServers = new HashMap<>();

/**
* Set of dead servers currently being processed by a SCP. Added to this list at the start of SCP
* and removed after it is done processing the crash.
*/
private final Set<ServerName> processingServers = new HashSet<>();

/**
* @param serverName server name.
* @return true if this server is on the dead servers list false otherwise
Expand All @@ -70,15 +62,6 @@ public synchronized boolean isDeadServer(final ServerName serverName) {
return deadServers.containsKey(serverName);
}

/**
* Checks if there are currently any dead servers being processed by the master. Returns true if
* at least one region server is currently being processed as dead.
* @return true if any RS are being processed as dead
*/
synchronized boolean areDeadServersInProgress() {
return !processingServers.isEmpty();
}

public synchronized Set<ServerName> copyServerNames() {
Set<ServerName> clone = new HashSet<>(deadServers.size());
clone.addAll(deadServers.keySet());
Expand All @@ -90,29 +73,6 @@ public synchronized Set<ServerName> copyServerNames() {
*/
synchronized void putIfAbsent(ServerName sn) {
this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
processing(sn);
}

/**
* Add <code>sn<</code> to set of processing deadservers.
* @see #finish(ServerName)
*/
public synchronized void processing(ServerName sn) {
if (processingServers.add(sn)) {
// Only log on add.
LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
}
}

/**
* Complete processing for this dead server.
* @param sn ServerName for the dead server.
* @see #processing(ServerName)
*/
public synchronized void finish(ServerName sn) {
if (processingServers.remove(sn)) {
LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
}
}

public synchronized int size() {
Expand Down Expand Up @@ -171,17 +131,12 @@ public synchronized String toString() {
// Display unified set of servers from both maps
Set<ServerName> servers = new HashSet<>();
servers.addAll(deadServers.keySet());
servers.addAll(processingServers);
StringBuilder sb = new StringBuilder();
for (ServerName sn : servers) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(sn.toString());
// Star entries that are being processed
if (processingServers.contains(sn)) {
sb.append("*");
}
}
return sb.toString();
}
Expand Down Expand Up @@ -220,9 +175,6 @@ public synchronized Date getTimeOfDeath(final ServerName deadServerName) {
* @return true if this server was removed
*/
public synchronized boolean removeDeadServer(final ServerName deadServerName) {
Preconditions.checkState(!processingServers.contains(deadServerName),
"Asked to remove server still in processingServers set " + deadServerName + " (numProcessing="
+ processingServers.size() + ")");
return this.deadServers.remove(deadServerName) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2527,11 +2527,18 @@ public ClearDeadServersResponse clearDeadServers(RpcController controller,
LOG.debug("Some dead server is still under processing, won't clear the dead server list");
response.addAllServerName(request.getServerNameList());
} else {
DeadServer deadServer = master.getServerManager().getDeadServers();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
if (
!master.getServerManager().getDeadServers()
.removeDeadServer(ProtobufUtil.toServerName(pbServer))
) {
ServerName server = ProtobufUtil.toServerName(pbServer);
final boolean deadInProcess =
master.getProcedures().stream().anyMatch(p -> (p instanceof ServerCrashProcedure)
&& ((ServerCrashProcedure) p).getServerName().equals(server));
if (deadInProcess) {
throw new ServiceException(
String.format("Dead server '%s' is not 'dead' in fact...", server));
}

if (!deadServer.removeDeadServer(server)) {
response.addServerName(pbServer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
Expand Down Expand Up @@ -547,8 +548,8 @@ public DeadServer getDeadServers() {
* Checks if any dead servers are currently in progress.
* @return true if any RS are being processed as dead, false if not
*/
public boolean areDeadServersInProgress() {
return this.deadservers.areDeadServersInProgress();
public boolean areDeadServersInProgress() throws IOException {
return master.getProcedures().stream().anyMatch(p -> p instanceof ServerCrashProcedure);
}

void letRegionServersShutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
// This adds server to the DeadServer processing list but not to the DeadServers list.
// Server gets removed from processing list below on procedure successful finish.
if (!notifiedDeadServer) {
services.getServerManager().getDeadServers().processing(serverName);
notifiedDeadServer = true;
}

Expand Down Expand Up @@ -255,7 +254,6 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
case SERVER_CRASH_FINISH:
LOG.info("removed crashed server {} after splitting done", serverName);
services.getAssignmentManager().getRegionStates().removeServer(serverName);
services.getServerManager().getDeadServers().finish(serverName);
updateProgress(true);
return Flow.NO_MORE_STATE;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testRebalanceOnRegionServerNumberChange() throws IOException, Interr
/**
* Wait on crash processing. Balancer won't run if processing a crashed server.
*/
private void waitOnCrashProcessing() {
private void waitOnCrashProcessing() throws IOException {
while (UTIL.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) {
LOG.info("Waiting on processing of crashed server before proceeding...");
Threads.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -69,22 +70,10 @@ public static void tearDownAfterClass() throws Exception {
public void testIsDead() {
DeadServer ds = new DeadServer();
ds.putIfAbsent(hostname123);
ds.processing(hostname123);
assertTrue(ds.areDeadServersInProgress());
ds.finish(hostname123);
assertFalse(ds.areDeadServersInProgress());

ds.putIfAbsent(hostname1234);
ds.processing(hostname1234);
assertTrue(ds.areDeadServersInProgress());
ds.finish(hostname1234);
assertFalse(ds.areDeadServersInProgress());

ds.putIfAbsent(hostname12345);
ds.processing(hostname12345);
assertTrue(ds.areDeadServersInProgress());
ds.finish(hostname12345);
assertFalse(ds.areDeadServersInProgress());

// Already dead = 127.0.0.1,9090,112321
// Coming back alive = 127.0.0.1,9090,223341
Expand All @@ -104,15 +93,15 @@ public void testIsDead() {
}

@Test
public void testCrashProcedureReplay() {
public void testCrashProcedureReplay() throws IOException {
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
ServerCrashProcedure proc =
new ServerCrashProcedure(pExecutor.getEnvironment(), hostname123, false, false);

ProcedureTestingUtility.submitAndWait(pExecutor, proc);

assertFalse(master.getServerManager().getDeadServers().areDeadServersInProgress());
assertTrue(master.getServerManager().areDeadServersInProgress());
}

@Test
Expand Down Expand Up @@ -163,17 +152,14 @@ public void testClearDeadServer() {
d.putIfAbsent(hostname1234);
Assert.assertEquals(2, d.size());

d.finish(hostname123);
d.removeDeadServer(hostname123);
Assert.assertEquals(1, d.size());
d.finish(hostname1234);
d.removeDeadServer(hostname1234);
Assert.assertTrue(d.isEmpty());

d.putIfAbsent(hostname1234);
Assert.assertFalse(d.removeDeadServer(hostname123_2));
Assert.assertEquals(1, d.size());
d.finish(hostname1234);
Assert.assertTrue(d.removeDeadServer(hostname1234));
Assert.assertTrue(d.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void test() throws Exception {
cluster.killRegionServer(rsServerName);

master.getServerManager().moveFromOnlineToDeadServers(rsServerName);
master.getServerManager().getDeadServers().finish(rsServerName);
master.getServerManager().getDeadServers().removeDeadServer(rsServerName);
master.getAssignmentManager().getRegionStates().removeServer(rsServerName);
// Kill the server. Nothing should happen since an 'Unknown Server' as far
Expand Down