Permalink
Browse files

fixed really dumb bug in createGroups() webports.

  • Loading branch information...
1 parent ec2773a commit 2b0448cb309350acb836d0fd385dcf1ef8364dca @carlosdotdanger committed Mar 12, 2011
View
@@ -2,3 +2,4 @@ log4j.rootLogger=WARN, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+log4j.com.lunabeat.dooper.HadoopCluster.logger=INFO, A1
@@ -57,8 +57,7 @@
tmpList.add("master");
tmpList.add("slaves");
return Collections.unmodifiableList(tmpList);
- }
-
+ }
public static final String SCP_FILE_MODE = "0644";
@@ -44,6 +44,7 @@
* @author cory
*/
public class HadoopCluster {
+
private static final Log LOGGER = LogFactory.getLog(HadoopCluster.class.getName());
public static final String MASTER_SUFFIX = "-master";
public static final String GROUP_NAME_KEY = "group-name";
@@ -117,7 +118,10 @@ public ClusterInstance getMaster() {
public RunInstancesResult launchMaster(String size) throws IOException {
update();
- if ((_master != null) && ((InstanceStateName.Running == InstanceStateName.fromValue(_master.getInstance().getState().getName())) || (InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName())))) {
+ if ((_master != null) && ((InstanceStateName.Running
+ == InstanceStateName.fromValue(_master.getInstance().getState().getName()))
+ || (InstanceStateName.Pending
+ == InstanceStateName.fromValue(_master.getInstance().getState().getName())))) {
Reservation masterReservation =
_ec2.describeInstances(new DescribeInstancesRequest().withInstanceIds(_master.getInstance().getInstanceId())).getReservations().get(0);
return new RunInstancesResult().withReservation(masterReservation);
@@ -150,7 +154,8 @@ public RunInstancesResult launchSlaves(int howMany, String size) throws IOExcept
//wait for master to get internal ip field to pass in userinfo
boolean success = false;
- if (InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName())) {
+ if (InstanceStateName.Pending
+ == InstanceStateName.fromValue(_master.getInstance().getState().getName())) {
int attempts = 0;
while ((attempts < WAIT_FOR_MASTER_MAX_TIMES) && !success) {
update();
@@ -167,7 +172,11 @@ public RunInstancesResult launchSlaves(int howMany, String size) throws IOExcept
}
}
if (!success) {
- throw new MasterTimeoutException(_groupName, howMany, size, _master.getInstance().getInstanceId());
+ throw new MasterTimeoutException(
+ _groupName,
+ howMany,
+ size,
+ _master.getInstance().getInstanceId());
}
}
@@ -235,7 +244,8 @@ public TerminateInstancesResult terminateSlaves(int howMany) {
for (ClusterInstance slave : _slaves) {
InstanceStateName state =
InstanceStateName.fromValue(slave.getInstance().getState().getName());
- if (terminated < howMany && (state == InstanceStateName.Running || state == InstanceStateName.Pending)) {
+ if (terminated < howMany && (state == InstanceStateName.Running
+ || state == InstanceStateName.Pending)) {
iids.add(slave.getInstance().getInstanceId());
terminated++;
}
@@ -285,15 +295,16 @@ public void createSecurityGroups() {
return;
}
String portList = _config.get(ClusterConfig.WEB_PORTS_KEY);
- boolean hasWebPorts = false;
+
List<Integer> webPorts = new ArrayList<Integer>();
if (!"0".contentEquals(portList)) {
String[] portParts = portList.split(",");
for (String portString : portParts) {
try {
webPorts.add(Integer.parseInt(portString));
} catch (NumberFormatException e) {
- throw new RuntimeException(ClusterConfig.WEB_PORTS_KEY + " config value must be list of ints or '0'");
+ throw new RuntimeException(ClusterConfig.WEB_PORTS_KEY
+ + " config value must be list of ints or '0'");
}
}
}
@@ -313,13 +324,14 @@ public void createSecurityGroups() {
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(TCP).withToPort(HI_PORT).withFromPort(LOW_PORT));
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(UDP).withToPort(HI_PORT).withFromPort(LOW_PORT));
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(ICMP).withToPort(-1).withFromPort(-1));
- if (hasWebPorts) {
- for (int port : webPorts) {
- ipPerms.add(new IpPermission().withToPort(port).withFromPort(port).withIpProtocol(TCP).withIpRanges(ALL_IPS));
- }
+
+ for (int port : webPorts) {
+ LOGGER.info("Adding port " + port + " to security group.");
+ ipPerms.add(new IpPermission().withToPort(port).withFromPort(port).withIpProtocol(TCP).withIpRanges(ALL_IPS));
}
+
AuthorizeSecurityGroupIngressRequest masterASR = new AuthorizeSecurityGroupIngressRequest().withGroupName(_masterGroupName).withIpPermissions(ipPerms);
_ec2.authorizeSecurityGroupIngress(masterASR);
@@ -408,7 +420,7 @@ public void putFile(ClusterInstance host, String src, String dest) throws SCPExc
if (path == null || path.length() < 1) {
return pathAndFile;
}
- if(path.endsWith("/")){
+ if (path.endsWith("/")) {
pathAndFile[0] = path;
return pathAndFile;
}
@@ -49,7 +49,9 @@ public AppCommand(ClusterConfig config) {
tmpMap.put("terminate-cluster", new CommandInfo(5, new String[]{"name:string"}, "Terminate all instances in a Hadoop cluster."));
tmpMap.put("terminate-slaves", new CommandInfo(6, new String[]{"name:string", "nodes:int"}, "Terminate slaves in a cluster."));
tmpMap.put("describe-cluster", new CommandInfo(7, new String[]{"name:string"}, "Get instance info for a cluster."));
- tmpMap.put("push-file", new CommandInfo(8, new String[]{"clusterName:string","instances:instances","srcPath:string","destPath:string"}, "Copy file to cluster machines."));
+ tmpMap.put("push-file", new CommandInfo(8, new String[]{"clusterName:string", "instances:instances", "srcPath:string", "destPath:string"}, "Copy file to cluster machines."));
+ tmpMap.put("create-groups", new CommandInfo(9, new String[]{"clusterName:string"}, "Copy file to cluster machines."));
+
return Collections.unmodifiableMap(tmpMap);
}
@@ -103,6 +105,9 @@ public void runCommand(String commandName, String[] args) {
case 8:
pushFile(args);
break;
+ case 9:
+ createGroups(args[0]);
+ break;
default:
throw new RuntimeException("Bad AppInfo index for '" + commandName + "' :" + cInfo.getIndex());
}
@@ -147,17 +152,16 @@ private void checkCommandArgs(String commandName, CommandInfo cInfo, String[] ar
}
} else if ("instanceSize".contentEquals(type)) {
if (!ClusterConfig.INSTANCE_TYPES.contains(args[x])) {
- StringBuilder sb = new StringBuilder("Invalid instance size '").
- append(args[x]).append("'.\n").append("\tvalid sizes are:\n");
+ StringBuilder sb = new StringBuilder("Invalid instance size '").append(args[x]).append("'.\n").append("\tvalid sizes are:\n");
for (String s : ClusterConfig.INSTANCE_TYPES) {
sb.append("\t\t").append(s).append("\n");
}
throw new ArgumentException("Command '" + commandName + "'\n" + sb.toString());
}
- }else if ("instances".contentEquals(type)){
+ } else if ("instances".contentEquals(type)) {
- if(!ClusterConfig.INSTANCE_GROUP_TYPES.contains(args[x])){
- StringBuilder sb =
+ if (!ClusterConfig.INSTANCE_GROUP_TYPES.contains(args[x])) {
+ StringBuilder sb =
new StringBuilder("Invalid instance '").append(args[x]).
append("'.\n").append("\tvalid instances are:\n");
for (String s : ClusterConfig.INSTANCE_GROUP_TYPES) {
@@ -217,14 +221,14 @@ private void deleteCluster(String clusterName) {
return;
}
- if(cluster.removeSecurityGroups())
+ if (cluster.removeSecurityGroups()) {
System.out.println("Deleted cluster '" + clusterName + "'.");
- else
+ } else {
System.out.println("'" + clusterName + "' has instances and was not deleted.");
+ }
}
-
private void launchCluster(String[] args) {
String clusterName = args[0];
String instanceSize = args[1];
@@ -268,10 +272,10 @@ private void launchCluster(String[] args) {
System.out.println("IOException during userdata file encoding.");
System.out.println(e.getMessage());
System.exit(100);
- }catch(MasterTimeoutException e){
- System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
- + e.group() + " " + e.size() + " " + e.howMany() + "' after master is running.\n"+"master id: " + e.masterId());
- System.exit(0);
+ } catch (MasterTimeoutException e) {
+ System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ + e.group() + " " + e.size() + " " + e.howMany() + "' after master is running.\n" + "master id: " + e.masterId());
+ System.exit(0);
}
}
@@ -329,10 +333,10 @@ private void launchSlaves(String[] args) {
System.out.println("IOException during userdata file encoding.");
System.out.println(e.getMessage());
System.exit(100);
- }catch(MasterTimeoutException e){
- System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
- + e.group() + " " + e.size() + " " + e.howMany() + "' after master is running.\n"+"master id: " + e.masterId());
- System.exit(0);
+ } catch (MasterTimeoutException e) {
+ System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ + e.group() + " " + e.size() + " " + e.howMany() + "' after master is running.\n" + "master id: " + e.masterId());
+ System.exit(0);
}
}
@@ -423,41 +427,48 @@ private void pushFile(String[] args) {
String target = args[1];
String src = args[2];
String dest = args[3];
- HadoopCluster cluster = new HadoopCluster(clusterName,_config);
- try{
- if("master".contentEquals(target) || "cluster".contentEquals(target)){
- System.out.println("Pushing to master.");
- if(cluster.getMaster() == null)
- System.out.println("No master found.");
- cluster.putFile(cluster.getMaster(), src, dest);
- System.out.println("Copied to " + cluster.getMaster().getInstance().getInstanceId() + ".");
- }
- if("slaves".contentEquals(target) || "cluster".contentEquals(target)){
- System.out.println("Pushing to slaves.");
- List<ClusterInstance> slaves = cluster.getSlaves();
- if(slaves == null)
- System.out.println("No slaves found.");
- System.out.println("Copying to " + slaves.size() + " slaves.");
- for(ClusterInstance slave:slaves){
- cluster.putFile(slave, src, dest);
- System.out.println("Copied to " + slave.getInstance().getInstanceId() + ".");
+ HadoopCluster cluster = new HadoopCluster(clusterName, _config);
+ try {
+ if ("master".contentEquals(target) || "cluster".contentEquals(target)) {
+ System.out.println("Pushing to master.");
+ if (cluster.getMaster() == null) {
+ System.out.println("No master found.");
+ }
+ cluster.putFile(cluster.getMaster(), src, dest);
+ System.out.println("Copied to " + cluster.getMaster().getInstance().getInstanceId() + ".");
+ }
+ if ("slaves".contentEquals(target) || "cluster".contentEquals(target)) {
+ System.out.println("Pushing to slaves.");
+ List<ClusterInstance> slaves = cluster.getSlaves();
+ if (slaves == null) {
+ System.out.println("No slaves found.");
+ }
+ System.out.println("Copying to " + slaves.size() + " slaves.");
+ for (ClusterInstance slave : slaves) {
+ cluster.putFile(slave, src, dest);
+ System.out.println("Copied to " + slave.getInstance().getInstanceId() + ".");
+ }
}
- }
- System.out.println("Copied " + src + " to " + dest + " on " + target + ".");
- }catch(SCPException scpe){
+ System.out.println("Copied " + src + " to " + dest + " on " + target + ".");
+ } catch (SCPException scpe) {
boolean isMaster = scpe.getInstance().getInstance().getInstanceId().contentEquals(cluster.getMaster().getInstance().getInstanceId());
- System.out.println("Error pushing to " + (isMaster? "master" : "slave"));
+ System.out.println("Error pushing to " + (isMaster ? "master" : "slave"));
System.out.println("InstanceId: " + scpe.getInstance().getInstance().getInstanceId());
System.out.println("message: " + scpe.getMessage());
- if(scpe.getCause() != null){
+ if (scpe.getCause() != null) {
System.out.println("cause: " + scpe.getCause().getMessage());
}
System.exit(1);
}
}
+ private void createGroups(String clusterName) {
+ HadoopCluster cluster = new HadoopCluster(clusterName, _config);
+ cluster.createSecurityGroups();
+ System.out.println("Created groups for " + clusterName + ".");
+ }
public static class CommandInfo {
View
@@ -2,3 +2,4 @@ log4j.rootLogger=WARN, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+

0 comments on commit 2b0448c

Please sign in to comment.