Skip to content

Commit

Permalink
moved web ports into config, improved slave launch when master times out
Browse files Browse the repository at this point in the history
  • Loading branch information
Cory Cyr committed Mar 5, 2011
1 parent dbb9d90 commit 959bb19
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 22 deletions.
Empty file.
2 changes: 1 addition & 1 deletion nbproject/private/configs/delete-cluster.properties
@@ -1 +1 @@
application.args=deete-cluster pooper
application.args=delete-cluster pooper
2 changes: 1 addition & 1 deletion nbproject/private/configs/launch-cluster.properties
@@ -1 +1 @@
application.args=launch-cluster poop m1.large 3
application.args=launch-cluster pooper m1.large 1
1 change: 1 addition & 0 deletions nbproject/private/configs/launch-slaves.properties
@@ -0,0 +1 @@
application.args=launch-slaves pooper m1.large 2
3 changes: 2 additions & 1 deletion src/com/lunabeat/dooper/ClusterConfig.java
Expand Up @@ -32,6 +32,7 @@ public class ClusterConfig implements AWSCredentials {
public static final String KEYPAIR_NAME_KEY = "EC2.KeypairName";
public static final String KEYPAIR_FILE_KEY = "EC2.KeypairFile";
public static final String MASTER_HOST_KEY = "Master.Host";
public static final String WEB_PORTS_KEY = "EC2.WebPorts";
public static final List<String> INSTANCE_TYPES = initTypes();

private static List<String> initTypes(){
Expand Down Expand Up @@ -79,7 +80,7 @@ private void testConfig() throws ConfigException {
String type = _requiredFields.getProperty(key);
String confValue = _properties.getProperty(key);
if (confValue == null) {
errs.add(key + " missing " + type + " value.");
errs.add(key + " missing (" + type + ")");
}
}
if (errs.size() > 0) {
Expand Down
48 changes: 32 additions & 16 deletions src/com/lunabeat/dooper/HadoopCluster.java
Expand Up @@ -39,7 +39,6 @@
* @author cory
*/
public class HadoopCluster {
private static final int[] webPorts = {50030,50060,50070,50075};
private static final long DEFAULT_HOLD_TIME = 30000L; //30 seconds
public static final String MASTER_SUFFIX = "-master";
public static final String GROUP_NAME_KEY = "group-name";
Expand Down Expand Up @@ -79,16 +78,10 @@ private void init() {

}

private void update() {
update(false);
}

private void update(boolean force) {
private void update() {
//unless force == true check the holdtime and bail if it's too soon.
Date now = new Date();
if (!force && (now.getTime() - lastUpdate < holdTime)) {
return;
}
init();
//get master + slave info
_slaves.clear();
Expand Down Expand Up @@ -125,14 +118,14 @@ public ClusterInstance getMaster() {
}

public RunInstancesResult launchMaster(String size) throws IOException {
update(true);
update();
if((_master != null) &&( (InstanceStateName.Running == InstanceStateName.fromValue(_master.getInstance().getState().getName()) ) || (InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName()) ) )){
Reservation masterReservation =
_ec2.describeInstances(new DescribeInstancesRequest().withInstanceIds(_master.getInstance().getInstanceId())).getReservations().get(0);
return new RunInstancesResult().withReservation(masterReservation);
}
//make the groups
createSecurityGroups(true);
createSecurityGroups();
String AMIImage = (_config.get("AMI."+size+".Image") == null)?
_config.get(ClusterConfig.DEFAULT_AMI_KEY)
:_config.get("AMI."+size+".Image");
Expand All @@ -149,13 +142,17 @@ public RunInstancesResult launchMaster(String size) throws IOException {
}

public RunInstancesResult launchSlaves(int howMany, String size) throws IOException {
update(true);
if(_master == null)
update();
if(_master == null ||
(InstanceStateName.Terminated ==
InstanceStateName.fromValue(_master.getInstance().getState().getName()))||
(InstanceStateName.ShuttingDown ==
InstanceStateName.fromValue(_master.getInstance().getState().getName())))
return null;
//wait for master to get internal ip field to pass in userinfo
boolean success = false;
if(InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName())){
int attempts = 0;
boolean success = false;
while(attempts < WAIT_FOR_MASTER_MAX_TIMES && !success){
update();
String pDns = _master.getInstance().getPrivateDnsName();
Expand All @@ -171,6 +168,12 @@ public RunInstancesResult launchSlaves(int howMany, String size) throws IOExcept
}
}
}
if(!success){
System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves " +
_groupName + " " + size + " " + howMany + "' after master is running.");
System.exit(0);
}

String AMIImage = (_config.get("AMI."+size+".Image") == null)?
_config.get(ClusterConfig.DEFAULT_AMI_KEY)
:_config.get("AMI."+size+".Image");
Expand Down Expand Up @@ -262,9 +265,22 @@ public boolean groupsExist(){
return false;
}

public void createSecurityGroups(boolean openWebPorts){
public void createSecurityGroups(){
if(groupsExist())
return;
String portList = _config.get(ClusterConfig.WEB_PORTS_KEY);
boolean hasWebPorts = false;
List<Integer> webPorts = new ArrayList<Integer>();
if (!"0".contentEquals(portList)){
String[] portParts = portList.split(",");
for(String portString:portParts){
try{
webPorts.add(Integer.parseInt(portString));
}catch(NumberFormatException e){
throw new RuntimeException(ClusterConfig.WEB_PORTS_KEY + " config value must be list of ints or '0'");
}
}
}
UserIdGroupPair slaveUserIdGroupPair = new UserIdGroupPair().withGroupName(_groupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
UserIdGroupPair masterUserIdGroupPair = new UserIdGroupPair().withGroupName(_masterGroupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
CreateSecurityGroupRequest masterCsr = new CreateSecurityGroupRequest().withGroupName(_masterGroupName).
Expand All @@ -281,10 +297,10 @@ public void createSecurityGroups(boolean openWebPorts){
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(TCP).withToPort(HI_PORT).withFromPort(LOW_PORT));
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(UDP).withToPort(HI_PORT).withFromPort(LOW_PORT));
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(ICMP).withToPort(-1).withFromPort(-1));
if(openWebPorts){
if(hasWebPorts)
for(int port:webPorts)
ipPerms.add(new IpPermission().withToPort(port).withFromPort(port).withIpProtocol(TCP).withIpRanges(ALL_IPS));
}


AuthorizeSecurityGroupIngressRequest masterASR = new AuthorizeSecurityGroupIngressRequest().withGroupName(_masterGroupName)
.withIpPermissions(ipPerms);
Expand Down
4 changes: 3 additions & 1 deletion src/com/lunabeat/dooper/RequiredConfigFields.properties
Expand Up @@ -3,7 +3,9 @@ AWS.AccessKey=string
AWS.SecretKey=string
EC2.KeypairName=string
EC2.KeypairFile=string
EC2.UserDataFile=string
EC2.WebPorts=comma separated ints or 0 for none
AMI.DefaultImage=string
S3.ApplicationBucket=string
EC2.UserDataFile=string


3 changes: 2 additions & 1 deletion src/com/lunabeat/pooper/commands/AppCommand.java
Expand Up @@ -217,7 +217,9 @@ private void launchCluster(String[] args) {
}
System.out.println("reservation id: "+ r.getReservationId());
System.out.println("\tinstance: " + i.getInstanceId()+"\t"+i.getState().getName());

System.out.println("launching slaves (" + nodes + ")." );
System.out.println("waiting for master to get address." );
RunInstancesResult sr = cluster.launchSlaves(nodes,instanceSize);
if(sr == null){
System.out.println("Launch slaves failed!");
Expand Down Expand Up @@ -278,7 +280,6 @@ private void launchSlaves(String[] args) {
HadoopCluster cluster = new HadoopCluster(clusterName,_config);
try{
System.out.println("launching slaves (" + nodes + ")." );
System.out.println("waiting for master to get address." );
RunInstancesResult sr = cluster.launchSlaves(nodes,instanceSize);
if(sr == null){
System.out.println("Launch slaves failed!");
Expand Down
2 changes: 1 addition & 1 deletion test/com/lunabeat/dooper/HadoopClusterTest.java
Expand Up @@ -58,7 +58,7 @@ public void tearDown() {
*/@Test
public void testCreateSecurityGroups(){
out.println("createSecurityGroups");
cluster.createSecurityGroups(true);
cluster.createSecurityGroups();
assertTrue(cluster.groupsExist());

}
Expand Down

0 comments on commit 959bb19

Please sign in to comment.