Skip to content

Commit

Permalink
HBASE-12429 Add port to ClusterManager's actions.
Browse files Browse the repository at this point in the history
  • Loading branch information
elliottneilclark committed Dec 29, 2014
1 parent 8dac7f9 commit e5ca773
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,38 @@ public String toString() {
/**
* Start the service on the given host
*/
void start(ServiceType service, String hostname) throws IOException;
void start(ServiceType service, String hostname, int port) throws IOException;

/**
* Stop the service on the given host
*/
void stop(ServiceType service, String hostname) throws IOException;
void stop(ServiceType service, String hostname, int port) throws IOException;

/**
* Restarts the service on the given host
* Restart the service on the given host
*/
void restart(ServiceType service, String hostname) throws IOException;
void restart(ServiceType service, String hostname, int port) throws IOException;

/**
* Kills the service running on the given host
*/
void kill(ServiceType service, String hostname) throws IOException;
void kill(ServiceType service, String hostname, int port) throws IOException;

/**
* Suspends the service running on the given host
*/
void suspend(ServiceType service, String hostname) throws IOException;
void suspend(ServiceType service, String hostname, int port) throws IOException;

/**
* Resumes the services running on the given host
*/
void resume(ServiceType service, String hostname) throws IOException;
void resume(ServiceType service, String hostname, int port) throws IOException;

/**
* Returns whether the service is running on the remote host. This only checks whether the
* service still has a pid.
*/
boolean isRunning(ServiceType service, String hostname) throws IOException;
boolean isRunning(ServiceType service, String hostname, int port) throws IOException;

/* TODO: further API ideas:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -105,21 +109,25 @@ public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName
}

@Override
public void startRegionServer(String hostname) throws IOException {
public void startRegionServer(String hostname, int port) throws IOException {
LOG.info("Starting RS on: " + hostname);
clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname);
clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
}

@Override
public void killRegionServer(ServerName serverName) throws IOException {
LOG.info("Aborting RS: " + serverName.getServerName());
clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
serverName.getHostname(),
serverName.getPort());
}

@Override
public void stopRegionServer(ServerName serverName) throws IOException {
LOG.info("Stopping RS: " + serverName.getServerName());
clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
serverName.getHostname(),
serverName.getPort());
}

@Override
Expand All @@ -133,7 +141,7 @@ private void waitForServiceToStop(ServiceType service, ServerName serverName, lo
long start = System.currentTimeMillis();

while ((System.currentTimeMillis() - start) < timeout) {
if (!clusterManager.isRunning(service, serverName.getHostname())) {
if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
return;
}
Threads.sleep(1000);
Expand All @@ -148,21 +156,21 @@ public MasterService.BlockingInterface getMasterAdminService()
}

@Override
public void startMaster(String hostname) throws IOException {
LOG.info("Starting Master on: " + hostname);
clusterManager.start(ServiceType.HBASE_MASTER, hostname);
public void startMaster(String hostname, int port) throws IOException {
LOG.info("Starting Master on: " + hostname + ":" + port);
clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
}

@Override
public void killMaster(ServerName serverName) throws IOException {
LOG.info("Aborting Master: " + serverName.getServerName());
clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname());
clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
}

@Override
public void stopMaster(ServerName serverName) throws IOException {
LOG.info("Stopping Master: " + serverName.getServerName());
clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname());
clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
}

@Override
Expand Down Expand Up @@ -207,13 +215,13 @@ public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws

@Override
public void waitUntilShutDown() {
//Simply wait for a few seconds for now (after issuing serverManager.kill
// Simply wait for a few seconds for now (after issuing serverManager.kill
throw new RuntimeException("Not implemented yet");
}

@Override
public void shutdown() throws IOException {
//not sure we want this
// not sure we want this
throw new RuntimeException("Not implemented yet");
}

Expand Down Expand Up @@ -241,30 +249,35 @@ public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
List<IOException> deferred = new ArrayList<IOException>();
//check whether current master has changed
if (!ServerName.isSameHostnameAndPort(initial.getMaster(), current.getMaster())) {
LOG.info("Restoring cluster - Initial active master : " + initial.getMaster().getHostname()
+ " has changed to : " + current.getMaster().getHostname());
final ServerName initMaster = initial.getMaster();
if (!ServerName.isSameHostnameAndPort(initMaster, current.getMaster())) {
LOG.info("Restoring cluster - Initial active master : "
+ initMaster.getHostAndPort()
+ " has changed to : "
+ current.getMaster().getHostAndPort());
// If initial master is stopped, start it, before restoring the state.
// It will come up as a backup master, if there is already an active master.
try {
if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, initial.getMaster().getHostname())) {
LOG.info("Restoring cluster - starting initial active master at:" + initial.getMaster().getHostname());
startMaster(initial.getMaster().getHostname());
if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
initMaster.getHostname(), initMaster.getPort())) {
LOG.info("Restoring cluster - starting initial active master at:"
+ initMaster.getHostAndPort());
startMaster(initMaster.getHostname(), initMaster.getPort());
}

//master has changed, we would like to undo this.
//1. Kill the current backups
//2. Stop current master
//3. Start backup masters
// master has changed, we would like to undo this.
// 1. Kill the current backups
// 2. Stop current master
// 3. Start backup masters
for (ServerName currentBackup : current.getBackupMasters()) {
if (!ServerName.isSameHostnameAndPort(currentBackup, initial.getMaster())) {
if (!ServerName.isSameHostnameAndPort(currentBackup, initMaster)) {
LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
stopMaster(currentBackup);
}
}
LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
stopMaster(current.getMaster());
waitForActiveAndReadyMaster(); //wait so that active master takes over
waitForActiveAndReadyMaster(); // wait so that active master takes over
} catch (IOException ex) {
// if we fail to start the initial active master, we do not want to continue stopping
// backup masters. Just keep what we have now
Expand All @@ -275,50 +288,56 @@ protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
for (ServerName backup : initial.getBackupMasters()) {
try {
//these are not started in backup mode, but we should already have an active master
if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname())) {
LOG.info("Restoring cluster - starting initial backup master: " + backup.getHostname());
startMaster(backup.getHostname());
if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
backup.getHostname(),
backup.getPort())) {
LOG.info("Restoring cluster - starting initial backup master: "
+ backup.getHostAndPort());
startMaster(backup.getHostname(), backup.getPort());
}
} catch (IOException ex) {
deferred.add(ex);
}
}
} else {
//current master has not changed, match up backup masters
HashMap<String, ServerName> initialBackups = new HashMap<String, ServerName>();
HashMap<String, ServerName> currentBackups = new HashMap<String, ServerName>();
Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
toStart.addAll(initial.getBackupMasters());
toKill.addAll(current.getBackupMasters());

for (ServerName server : initial.getBackupMasters()) {
initialBackups.put(server.getHostname(), server);
}
for (ServerName server : current.getBackupMasters()) {
currentBackups.put(server.getHostname(), server);
toStart.remove(server);
}
for (ServerName server: initial.getBackupMasters()) {
toKill.remove(server);
}

for (String hostname : Sets.difference(initialBackups.keySet(), currentBackups.keySet())) {
for (ServerName sn:toStart) {
try {
if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) {
LOG.info("Restoring cluster - starting initial backup master: " + hostname);
startMaster(hostname);
if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
LOG.info("Restoring cluster - starting initial backup master: " + sn.getHostAndPort());
startMaster(sn.getHostname(), sn.getPort());
}
} catch (IOException ex) {
deferred.add(ex);
}
}

for (String hostname : Sets.difference(currentBackups.keySet(), initialBackups.keySet())) {
for (ServerName sn:toKill) {
try {
if(clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) {
LOG.info("Restoring cluster - stopping backup master: " + hostname);
stopMaster(currentBackups.get(hostname));
if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
LOG.info("Restoring cluster - stopping backup master: " + sn.getHostAndPort());
stopMaster(sn);
}
} catch (IOException ex) {
deferred.add(ex);
}
}
}
if (!deferred.isEmpty()) {
LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:");
LOG.warn("Restoring cluster - restoring region servers reported "
+ deferred.size() + " errors:");
for (int i=0; i<deferred.size() && i < 3; i++) {
LOG.warn(deferred.get(i));
}
Expand All @@ -327,41 +346,61 @@ protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
return deferred.isEmpty();
}

protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
HashMap<String, ServerName> initialServers = new HashMap<String, ServerName>();
HashMap<String, ServerName> currentServers = new HashMap<String, ServerName>();

for (ServerName server : initial.getServers()) {
initialServers.put(server.getHostname(), server);
private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
@Override
public int compare(ServerName o1, ServerName o2) {
int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
if (compare != 0) return compare;
compare = o1.getPort() - o2.getPort();
if (compare != 0) return compare;
return 0;
}
}

protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
toStart.addAll(initial.getBackupMasters());
toKill.addAll(current.getBackupMasters());

for (ServerName server : current.getServers()) {
currentServers.put(server.getHostname(), server);
toStart.remove(server);
}
for (ServerName server: initial.getServers()) {
toKill.remove(server);
}

List<IOException> deferred = new ArrayList<IOException>();
for (String hostname : Sets.difference(initialServers.keySet(), currentServers.keySet())) {

for(ServerName sn:toStart) {
try {
if(!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) {
LOG.info("Restoring cluster - starting initial region server: " + hostname);
startRegionServer(hostname);
if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
sn.getHostname(),
sn.getPort())) {
LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort());
startRegionServer(sn.getHostname(), sn.getPort());
}
} catch (IOException ex) {
deferred.add(ex);
}
}

for (String hostname : Sets.difference(currentServers.keySet(), initialServers.keySet())) {
for(ServerName sn:toKill) {
try {
if(clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) {
LOG.info("Restoring cluster - stopping initial region server: " + hostname);
stopRegionServer(currentServers.get(hostname));
if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
sn.getHostname(),
sn.getPort())) {
LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort());
stopRegionServer(sn);
}
} catch (IOException ex) {
deferred.add(ex);
}
}
if (!deferred.isEmpty()) {
LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:");
LOG.warn("Restoring cluster - restoring region servers reported "
+ deferred.size() + " errors:");
for (int i=0; i<deferred.size() && i < 3; i++) {
LOG.warn(deferred.get(i));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ private void exec(String hostname, ServiceType service, Operation op) throws IOE
}

@Override
public void start(ServiceType service, String hostname) throws IOException {
public void start(ServiceType service, String hostname, int port) throws IOException {
exec(hostname, service, Operation.START);
}

@Override
public void stop(ServiceType service, String hostname) throws IOException {
public void stop(ServiceType service, String hostname, int port) throws IOException {
exec(hostname, service, Operation.STOP);
}

@Override
public void restart(ServiceType service, String hostname) throws IOException {
public void restart(ServiceType service, String hostname, int port) throws IOException {
exec(hostname, service, Operation.RESTART);
}

Expand All @@ -275,24 +275,24 @@ public void signal(ServiceType service, String signal, String hostname) throws I
}

@Override
public boolean isRunning(ServiceType service, String hostname) throws IOException {
public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service))
.getSecond();
return ret.length() > 0;
}

@Override
public void kill(ServiceType service, String hostname) throws IOException {
public void kill(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGKILL, hostname);
}

@Override
public void suspend(ServiceType service, String hostname) throws IOException {
public void suspend(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGSTOP, hostname);
}

@Override
public void resume(ServiceType service, String hostname) throws IOException {
public void resume(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGCONT, hostname);
}
}
Loading

0 comments on commit e5ca773

Please sign in to comment.