Skip to content
Browse files

added remote commands plus minor fixes.

  • Loading branch information...
1 parent 0dab72c commit 77dcbe76e2ddaaa2efef1cc7a7af73a709a69598 @carlosdotdanger committed Mar 19, 2011
View
6 bin/pooper.sh
@@ -56,9 +56,7 @@ JAVA_OPTS="$JAVA_OPTS -Dpooper.config=$POOPER_CONFIG"
if [ "$1" == "login" ] ; then
COMMAND=`$JAVA $JAVA_OPTS -jar $POOPER_JAR get-login-command $2 | tail -n1`
else
- COMMAND="$JAVA $JAVA_OPTS -jar $POOPER_JAR $1 $2 $3 $4 $5 $6"
+ COMMAND="$JAVA $JAVA_OPTS -jar $POOPER_JAR $*"
fi
-#echo "COMMAND:"
-#echo "$COMMAND"
-#echo "--------"
+
exec $COMMAND
View
1 config/log4j.properties
@@ -2,4 +2,5 @@ log4j.rootLogger=WARN, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+#log4j.logger.com.lunabeat=INFO
View
2 src/com/lunabeat/dooper/CmdException.java
@@ -17,7 +17,7 @@
package com.lunabeat.dooper;
-class CmdException extends Exception {
+public class CmdException extends Exception {
ClusterInstance _instance;
/**
View
10 src/com/lunabeat/dooper/CmdSessionResult.java
@@ -24,8 +24,10 @@
private int _code;
private String _stderr;
private String _stdout;
+ private ClusterInstance _instance;
- public CmdSessionResult(int code,String stdout,String stderr){
+ public CmdSessionResult(ClusterInstance instance,int code,String stdout,String stderr){
+ _instance = instance;
_code = code;
_stdout = stdout;
_stderr = stderr;
@@ -48,4 +50,10 @@ public CmdSessionResult(int code,String stdout,String stderr){
*/ public String getStdout() {
return _stdout;
}
+
+ /**
+ * @return instance info
+ */ public ClusterInstance getInstance() {
+ return _instance;
+ }
}
View
14 src/com/lunabeat/dooper/HadoopCluster.java
@@ -417,6 +417,7 @@ public void putFile(ClusterInstance host, String src, String dest) throws SCPExc
String fileMode = _config.get(ClusterConfig.SCP_FILE_MODE_KEY, ClusterConfig.SCP_DEFAULT_FILE_MODE);
LOGGER.info("SCP '" + src + "' '" + outFileName + "' '" + pathAndFile[0] + "' " + fileMode);
scp.put(src, outFileName, pathAndFile[0], fileMode);
+ conn.close();
} catch (IOException e) {
throw new SCPException(e.getMessage(), e.getCause(), host);
}
@@ -432,7 +433,7 @@ public void putFile(ClusterInstance host, String src, String dest) throws SCPExc
public List<CmdSessionResult> remoteCommand(List<ClusterInstance> hosts, String command) throws CmdException{
ArrayList<CmdSessionResult> results = new ArrayList<CmdSessionResult>();
for(ClusterInstance host:hosts)
- remoteCommand(host,command);
+ results.add(remoteCommand(host,command));
return results;
}
@@ -452,7 +453,10 @@ public CmdSessionResult remoteCommand(ClusterInstance host, String command) thro
conn.authenticateWithPublicKey(
_config.get(ClusterConfig.USERNAME_KEY),
keyfile, BLANK);
+ if(!isAuthenticated)
+ throw new CmdException("Could not authenticate.",host);
Session session = conn.openSession();
+ LOGGER.info("EXEC '"+command + "' on instance: " + host.getInstance().getInstanceId());
session.execCommand(command);
InputStream outStrm = new StreamGobbler(session.getStdout());
InputStream errStrm = new StreamGobbler(session.getStderr());
@@ -461,16 +465,18 @@ public CmdSessionResult remoteCommand(ClusterInstance host, String command) thro
StringBuilder sb = new StringBuilder();
String stdout;
while((stdout = stdoutRdr.readLine()) != null){
- sb.append(stdout);
+ sb.append(stdout).append("\n");
}
stdout = sb.toString();
sb = new StringBuilder();
String stderr;
while((stderr = stderrRdr.readLine()) != null){
- sb.append(stderr);
+ sb.append(stderr).append("\n");
}
stderr = sb.toString();
- return new CmdSessionResult(session.getExitStatus(),stdout,stderr);
+ conn.close();
+ conn = null;
+ return new CmdSessionResult(host,session.getExitStatus(),stdout,stderr);
} catch (IOException e) {
throw new CmdException(e.getMessage(), e.getCause(), host);
}
View
2 src/com/lunabeat/pooper/Main.java
@@ -49,7 +49,7 @@ public static void main(String[] args) {
for(int x = 1; x < args.length;x++){
commandArgs[x-1] = args[x];
}
- appcom.runCommand(command,commandArgs);
+ appcom.runAppCommand(command,commandArgs);
}catch(IOException e){
System.out.println("could not open config file '" + configPath + "'");
System.exit(1);
View
278 src/com/lunabeat/pooper/commands/AppCommand.java
@@ -24,10 +24,13 @@
import com.lunabeat.dooper.ClusterConfig;
import com.lunabeat.dooper.ClusterInstance;
import com.lunabeat.dooper.ClusterList;
+import com.lunabeat.dooper.CmdException;
+import com.lunabeat.dooper.CmdSessionResult;
import com.lunabeat.dooper.HadoopCluster;
import com.lunabeat.dooper.MasterTimeoutException;
import com.lunabeat.dooper.SCPException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -37,6 +40,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import static java.lang.System.out;
+
/**
*
* @author cory
@@ -65,6 +70,7 @@ public AppCommand(ClusterConfig config) {
tmpMap.put("create-groups", new CommandInfo(9, new String[]{"clusterName:string"}, "Create security groups for cluster"));
tmpMap.put("get-login-command", new CommandInfo(10, new String[]{"target:string"}, "Get remote login command for master or specified instance id. This command is for external scripts."));
tmpMap.put("login", new CommandInfo(11, new String[]{"target:string"}, "Launch remote login for master or specific instance."));
+ tmpMap.put("command", new CommandInfo(12, new String[]{"clusterName:string", "instances:instances", "command:string"}, "Run command on cluster machines."));
return Collections.unmodifiableMap(tmpMap);
}
@@ -77,20 +83,20 @@ public static CommandInfo commandInfo(String name) {
return COMMANDS.get(name);
}
- public void runCommand(String commandName, String[] args) {
+ public void runAppCommand(String commandName, String[] args) {
CommandInfo cInfo = COMMANDS.get(commandName);
if (cInfo == null) {
- System.out.println("INVALID COMMAND: '" + commandName + "'.");
+ out.println("INVALID COMMAND: '" + commandName + "'.");
System.exit(1);
}
try {
checkCommandArgs(commandName, cInfo, args);
} catch (ArgumentException e) {
- System.out.println(e.getProblem());
+ out.println(e.getProblem());
System.exit(1);
}
- System.out.println("Running " + commandName);
+ out.println("Running " + commandName);
switch (cInfo.getIndex()) {
case 0:
listClusters();
@@ -127,7 +133,10 @@ public void runCommand(String commandName, String[] args) {
break;
case 11:
LOG.fatal("Direct call to login.");
- System.out.println("COMMAND 'login' needs to be handled by external script for now.\nUse 'get-login-command' to retrieve command and exec in shell.");
+ out.println("COMMAND 'login' needs to be handled by external script for now.\nUse 'get-login-command' to retrieve command and exec in shell.");
+ break;
+ case 12:
+ runRemoteCommand(args);
break;
default:
throw new RuntimeException("Bad AppInfo index for '" + commandName + "' :" + cInfo.getIndex());
@@ -197,20 +206,20 @@ private void checkCommandArgs(String commandName, CommandInfo cInfo, String[] ar
}
private void listClusters() {
- Map<String, Map<String, List<Instance>>> out = ClusterList.getClusterMap(_config);
- System.out.println("found running clusters:");
+ Map<String, Map<String, List<Instance>>> outMap = ClusterList.getClusterMap(_config);
+ out.println("found running clusters:");
HashSet<String> emptyClusters = new HashSet<String>();
- for (String clustername : out.keySet()) {
- //System.out.println(clustername);
+ for (String clustername : outMap.keySet()) {
+ //out.println(clustername);
boolean empty = true;
- for (String group : out.get(clustername).keySet()) {
- if (out.get(clustername).get(group).size() < 1) {
+ for (String group : outMap.get(clustername).keySet()) {
+ if (outMap.get(clustername).get(group).size() < 1) {
emptyClusters.add(group.replace(HadoopCluster.MASTER_SUFFIX, ""));
} else {
- System.out.println(group + " (" + out.get(clustername).get(group).size() + ")");
+ out.println(group + " (" + outMap.get(clustername).get(group).size() + ")");
empty = false;
}
- for (Instance i : out.get(clustername).get(group)) {
+ for (Instance i : outMap.get(clustername).get(group)) {
StringBuilder sb = new StringBuilder("\t");
sb.append(i.getInstanceId());
sb.append("\t");
@@ -219,33 +228,34 @@ private void listClusters() {
sb.append(i.getState().getName());
//sb.append("\t");
//sb.append(i.getPublicDnsName());
- System.out.println(sb.toString());
+ out.println(sb.toString());
}
if (!empty) {
- System.out.println();
+ out.println();
}
}
if (!empty) {
- System.out.println("-----------");
+ out.println("-----------");
}
}
- System.out.println("Found empty cluster security groups:");
+ out.println("Found empty cluster security groups:");
+
for (String s : emptyClusters) {
- System.out.println("\t" + s);
+ out.println("\t" + s);
}
}
private void deleteCluster(String clusterName) {
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
if (!cluster.groupsExist()) {
- System.out.println("Cluster '" + clusterName + "' does not exist.");
+ out.println("Cluster '" + clusterName + "' does not exist.");
return;
}
if (cluster.removeSecurityGroups()) {
- System.out.println("Deleted cluster '" + clusterName + "'.");
+ out.println("Deleted cluster '" + clusterName + "'.");
} else {
- System.out.println("'" + clusterName + "' has instances and was not deleted.");
+ out.println("'" + clusterName + "' has instances and was not deleted.");
}
}
@@ -256,45 +266,45 @@ private void launchCluster(String[] args) {
int nodes = Integer.parseInt(args[2]);
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
try {
- System.out.println("launching master.");
+ out.println("launching master.");
RunInstancesResult mr = cluster.launchMaster(instanceSize);
if (mr == null) {
- System.out.println("Launch master failed!");
+ out.println("Launch master failed!");
System.exit(100);
}
Reservation r = mr.getReservation();
Instance i = r.getInstances().get(0);
if (i == null) {
- System.out.println("Launch master failed! reservation id: " + r.getReservationId());
+ out.println("Launch master failed! reservation id: " + r.getReservationId());
System.exit(100);
}
- System.out.println("reservation id: " + r.getReservationId());
- System.out.println("\tinstance: " + i.getInstanceId() + "\t" + i.getState().getName());
+ out.println("reservation id: " + r.getReservationId());
+ out.println("\tinstance: " + i.getInstanceId() + "\t" + i.getState().getName());
- System.out.println("launching slaves (" + nodes + ").");
- System.out.println("waiting for master to get address.");
+ out.println("launching slaves (" + nodes + ").");
+ out.println("waiting for master to get address.");
RunInstancesResult sr = cluster.launchSlaves(nodes, instanceSize);
if (sr == null) {
- System.out.println("Launch slaves failed!");
+ out.println("Launch slaves failed!");
System.exit(100);
}
Reservation sres = sr.getReservation();
List<Instance> sis = sres.getInstances();
if (sis == null || sis.isEmpty()) {
- System.out.println("Launch slaves failed! reservation id: " + sres.getReservationId());
+ out.println("Launch slaves failed! reservation id: " + sres.getReservationId());
System.exit(100);
}
- System.out.println("reservation id: " + sres.getReservationId());
+ out.println("reservation id: " + sres.getReservationId());
for (Instance si : sis) {
- System.out.println("\tinstance: " + si.getInstanceId() + "\t" + si.getState().getName());
+ out.println("\tinstance: " + si.getInstanceId() + "\t" + si.getState().getName());
}
- System.out.println("Success.");
+ out.println("Success.");
} catch (IOException e) {
- System.out.println("IOException during userdata file encoding.");
- System.out.println(e.getMessage());
+ out.println("IOException during userdata file encoding.");
+ out.println(e.getMessage());
System.exit(100);
} catch (MasterTimeoutException e) {
- System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ e.group() + " " + e.size() + " " + e.howMany() + "' after master is running.\n" + "master id: " + e.masterId());
System.exit(0);
}
@@ -305,24 +315,24 @@ private void launchMaster(String[] args) {
String instanceSize = args[1];
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
try {
- System.out.println("launching master.");
+ out.println("launching master.");
RunInstancesResult mr = cluster.launchMaster(instanceSize);
if (mr == null) {
- System.out.println("Launch master failed!");
+ out.println("Launch master failed!");
System.exit(100);
}
Reservation r = mr.getReservation();
Instance i = r.getInstances().get(0);
if (i == null) {
- System.out.println("Launch master failed! reservation id: " + r.getReservationId());
+ out.println("Launch master failed! reservation id: " + r.getReservationId());
System.exit(100);
}
- System.out.println("reservation id: " + r.getReservationId());
- System.out.println("\tinstance: " + i.getInstanceId() + "\t" + i.getState().getName());
- System.out.println("Success.");
+ out.println("reservation id: " + r.getReservationId());
+ out.println("\tinstance: " + i.getInstanceId() + "\t" + i.getState().getName());
+ out.println("Success.");
} catch (IOException e) {
- System.out.println("IOException during userdata file encoding.");
- System.out.println(e.getMessage());
+ out.println("IOException during userdata file encoding.");
+ out.println(e.getMessage());
System.exit(100);
}
}
@@ -333,29 +343,29 @@ private void launchSlaves(String[] args) {
int nodes = Integer.parseInt(args[2]);
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
try {
- System.out.println("launching slaves (" + nodes + ").");
+ out.println("launching slaves (" + nodes + ").");
RunInstancesResult sr = cluster.launchSlaves(nodes, instanceSize);
if (sr == null) {
- System.out.println("Launch slaves failed!");
+ out.println("Launch slaves failed!");
System.exit(100);
}
Reservation sres = sr.getReservation();
List<Instance> sis = sres.getInstances();
if (sis == null || sis.isEmpty()) {
- System.out.println("Launch slaves failed! reservation id: " + sres.getReservationId());
+ out.println("Launch slaves failed! reservation id: " + sres.getReservationId());
System.exit(100);
}
- System.out.println("reservation id: " + sres.getReservationId());
+ out.println("reservation id: " + sres.getReservationId());
for (Instance si : sis) {
- System.out.println("\tinstance: " + si.getInstanceId() + "\t" + si.getState().getName());
+ out.println("\tinstance: " + si.getInstanceId() + "\t" + si.getState().getName());
}
- System.out.println("Success.");
+ out.println("Success.");
} catch (IOException e) {
- System.out.println("IOException during userdata file encoding.");
- System.out.println(e.getMessage());
+ out.println("IOException during userdata file encoding.");
+ out.println(e.getMessage());
System.exit(100);
} catch (MasterTimeoutException e) {
- System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ e.group() + " " + e.size() + " " + e.howMany() + "' after master is running.\n" + "master id: " + e.masterId());
System.exit(0);
}
@@ -365,16 +375,16 @@ private void terminateCluster(String clusterName) {
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
TerminateInstancesResult tr = cluster.terminateCluster();
if (tr == null || tr.getTerminatingInstances().isEmpty()) {
- System.out.println("no instances terminated.");
+ out.println("no instances terminated.");
System.exit(0);
}
- System.out.println("Terminating " + tr.getTerminatingInstances().size() + " instances.");
+ out.println("Terminating " + tr.getTerminatingInstances().size() + " instances.");
for (InstanceStateChange i : tr.getTerminatingInstances()) {
- System.out.println("\t" + i.getInstanceId() + " "
+ out.println("\t" + i.getInstanceId() + " "
+ i.getPreviousState().getName()
+ " -> " + i.getCurrentState().getName());
}
- System.out.println("Success.");
+ out.println("Success.");
}
private void terminateSlaves(String[] args) {
@@ -383,60 +393,60 @@ private void terminateSlaves(String[] args) {
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
TerminateInstancesResult tr = cluster.terminateSlaves(nodes);
if (tr == null || tr.getTerminatingInstances().isEmpty()) {
- System.out.println("no instances terminated.");
+ out.println("no instances terminated.");
System.exit(0);
}
- System.out.println("Terminating " + tr.getTerminatingInstances().size() + " instances.");
+ out.println("Terminating " + tr.getTerminatingInstances().size() + " instances.");
for (InstanceStateChange i : tr.getTerminatingInstances()) {
- System.out.println("\t" + i.getInstanceId() + " "
+ out.println("\t" + i.getInstanceId() + " "
+ i.getPreviousState().getName()
+ "-> " + i.getCurrentState().getName());
}
- System.out.println("Success.");
+ out.println("Success.");
}
private void describeCluster(String name) {
HadoopCluster cluster = new HadoopCluster(name, _config);
if (!cluster.groupsExist()) {
- System.out.println("Cluster " + name + " not found.");
+ out.println("Cluster " + name + " not found.");
System.exit(0);
}
- System.out.println(name);
- System.out.println("---------------");
- System.out.println("Master:");
- System.out.println("-------");
+ out.println(name);
+ out.println("---------------");
+ out.println("Master:");
+ out.println("-------");
ClusterInstance master = cluster.getMaster();
if (master == null) {
- System.out.println("no master found.");
+ out.println("no master found.");
} else {
- System.out.println("id:\t\t" + master.getInstance().getInstanceId());
- System.out.println("state:\t\t" + master.getInstance().getState().getName());
- System.out.println("size:\t\t" + master.getInstance().getInstanceType());
- System.out.println("ami:\t\t" + master.getInstance().getImageId());
- System.out.println("public:\t\t" + master.getInstance().getPublicDnsName());
- System.out.println("privte:\t\t" + master.getInstance().getPrivateDnsName());
- System.out.println("launch:\t\t" + master.getInstance().getLaunchTime().toString());
- System.out.println("reserv:\t\t" + master.getReservationId());
- System.out.println("groups:\t\t" + master.getSecurityGroups());
+ out.println("id:\t\t" + master.getInstance().getInstanceId());
+ out.println("state:\t\t" + master.getInstance().getState().getName());
+ out.println("size:\t\t" + master.getInstance().getInstanceType());
+ out.println("ami:\t\t" + master.getInstance().getImageId());
+ out.println("public:\t\t" + master.getInstance().getPublicDnsName());
+ out.println("privte:\t\t" + master.getInstance().getPrivateDnsName());
+ out.println("launch:\t\t" + master.getInstance().getLaunchTime().toString());
+ out.println("reserv:\t\t" + master.getReservationId());
+ out.println("groups:\t\t" + master.getSecurityGroups());
}
- System.out.println();
- System.out.println("Slaves:");
- System.out.println("-------");
+ out.println();
+ out.println("Slaves:");
+ out.println("-------");
List<ClusterInstance> slaves = cluster.getSlaves();
if (master == null) {
- System.out.println("no slaves found.");
+ out.println("no slaves found.");
} else {
for (ClusterInstance slave : slaves) {
- System.out.println("id:\t\t" + slave.getInstance().getInstanceId());
- System.out.println("state:\t\t" + slave.getInstance().getState().getName());
- System.out.println("size:\t\t" + slave.getInstance().getInstanceType());
- System.out.println("ami id:\t\t" + slave.getInstance().getImageId());
- System.out.println("public:\t\t" + slave.getInstance().getPublicDnsName());
- System.out.println("privat:\t\t" + slave.getInstance().getPrivateDnsName());
- System.out.println("launch:\t\t" + slave.getInstance().getLaunchTime().toString());
- System.out.println("reserv:\t\t" + slave.getReservationId());
- System.out.println("groups:\t\t" + slave.getSecurityGroups());
- System.out.println("---");
+ out.println("id:\t\t" + slave.getInstance().getInstanceId());
+ out.println("state:\t\t" + slave.getInstance().getState().getName());
+ out.println("size:\t\t" + slave.getInstance().getInstanceType());
+ out.println("ami id:\t\t" + slave.getInstance().getImageId());
+ out.println("public:\t\t" + slave.getInstance().getPublicDnsName());
+ out.println("privat:\t\t" + slave.getInstance().getPrivateDnsName());
+ out.println("launch:\t\t" + slave.getInstance().getLaunchTime().toString());
+ out.println("reserv:\t\t" + slave.getReservationId());
+ out.println("groups:\t\t" + slave.getSecurityGroups());
+ out.println("---");
}
}
@@ -451,34 +461,34 @@ private void pushFile(String[] args) {
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
try {
if ("master".contentEquals(target) || "cluster".contentEquals(target)) {
- System.out.println("Pushing to master.");
+ out.println("Pushing to master.");
if (cluster.getMaster() == null) {
- System.out.println("No master found.");
+ out.println("No master found.");
}
cluster.putFile(cluster.getMaster(), src, dest);
- System.out.println("Copied to " + cluster.getMaster().getInstance().getInstanceId() + ".");
+ out.println("Copied to " + cluster.getMaster().getInstance().getInstanceId() + ".");
}
if ("slaves".contentEquals(target) || "cluster".contentEquals(target)) {
- System.out.println("Pushing to slaves.");
+ out.println("Pushing to slaves.");
List<ClusterInstance> slaves = cluster.getSlaves();
if (slaves == null) {
- System.out.println("No slaves found.");
+ out.println("No slaves found.");
}
- System.out.println("Copying to " + slaves.size() + " slaves.");
+ out.println("Copying to " + slaves.size() + " slaves.");
for (ClusterInstance slave : slaves) {
cluster.putFile(slave, src, dest);
- System.out.println("Copied to " + slave.getInstance().getInstanceId() + ".");
+ out.println("Copied to " + slave.getInstance().getInstanceId() + ".");
}
}
- System.out.println("Copied " + src + " to " + dest + " on " + target + ".");
+ out.println("Copied " + src + " to " + dest + " on " + target + ".");
} catch (SCPException scpe) {
boolean isMaster = scpe.getInstance().getInstance().getInstanceId().contentEquals(cluster.getMaster().getInstance().getInstanceId());
- System.out.println("Error pushing to " + (isMaster ? "master" : "slave"));
- System.out.println("InstanceId: " + scpe.getInstance().getInstance().getInstanceId());
- System.out.println("message: " + scpe.getMessage());
+ out.println("Error pushing to " + (isMaster ? "master" : "slave"));
+ out.println("InstanceId: " + scpe.getInstance().getInstance().getInstanceId());
+ out.println("message: " + scpe.getMessage());
if (scpe.getCause() != null) {
- System.out.println("cause: " + scpe.getCause().getMessage());
+ out.println("cause: " + scpe.getCause().getMessage());
}
System.exit(1);
}
@@ -488,7 +498,7 @@ private void pushFile(String[] args) {
private void createGroups(String clusterName) {
HadoopCluster cluster = new HadoopCluster(clusterName, _config);
cluster.createSecurityGroups();
- System.out.println("Created groups for " + clusterName + ".");
+ out.println("Created groups for " + clusterName + ".");
}
private void getLoginCommand(String target) {
@@ -507,7 +517,7 @@ private void getLoginCommand(String target) {
}
}
if(host == null){
- System.out.println("'" + target + "' is not a valid cluster name or instance id.\nexiting.");
+ out.println("'" + target + "' is not a valid cluster name or instance id.\nexiting.");
System.exit(0);
}
StringBuilder sb = new StringBuilder("ssh -i")
@@ -516,7 +526,65 @@ private void getLoginCommand(String target) {
.append(_config.get(ClusterConfig.USERNAME_KEY))
.append("@")
.append(host);
- System.out.println(sb.toString());
+ out.println(sb.toString());
+ }
+
+ private void runRemoteCommand(String[] args){
+ String clusterName = args[0];
+ String target = args[1];
+ StringBuilder sb = new StringBuilder(args[2]);
+ for(int x =3; x < args.length;x++){
+ sb.append(" ").append(args[x]);
+ }
+ String command = sb.toString();
+ HadoopCluster cluster = new HadoopCluster(clusterName, _config);
+ ArrayList<CmdSessionResult> results = new ArrayList<CmdSessionResult>();
+ try {
+
+ if ("master".contentEquals(target) || "cluster".contentEquals(target)) {
+ out.println("Running command on master.");
+ if (cluster.getMaster() == null) {
+ out.println("No master found.");
+ }
+ results.add(cluster.remoteCommand(cluster.getMaster(), command));
+ }
+ if ("slaves".contentEquals(target) || "cluster".contentEquals(target)) {
+ out.println("Running command on slaves.");
+ List<ClusterInstance> slaves = cluster.getSlaves();
+ if (slaves == null) {
+ out.println("No slaves found.");
+ }
+ out.println("Running on " + slaves.size() + " slaves.");
+ results.addAll(cluster.remoteCommand(slaves, command));
+
+ }
+
+ out.println("Successfully Ran '" + command + "' on " + target + ".");
+ out.println("Report:");
+ for(CmdSessionResult result:results){
+ out.println("----------");
+ out.println("Instance: " + result.getInstance().getInstance().getInstanceId());
+ out.println("Exit Status: " + result.getCode());
+ if(result.getStdout().length() > 0){
+ out.println("STDOUT:");
+ out.println(result.getStdout());
+ }
+ if(result.getStderr().length() > 0){
+ out.println("STDERR:");
+ out.println(result.getStderr());
+ }
+ out.println("----------");
+ }
+ } catch (CmdException cmde) {
+ boolean isMaster = cmde.getInstance().getInstance().getInstanceId().contentEquals(cluster.getMaster().getInstance().getInstanceId());
+ out.println("Error running command on " + (isMaster ? "master" : "slave"));
+ out.println("InstanceId: " + cmde.getInstance().getInstance().getInstanceId());
+ out.println("message: " + cmde.getMessage());
+ if (cmde.getCause() != null) {
+ out.println("cause: " + cmde.getCause().getMessage());
+ }
+ System.exit(1);
+ }
}
public static class CommandInfo {

0 comments on commit 77dcbe7

Please sign in to comment.
Something went wrong with that request. Please try again.