Skip to content
Browse files

fix in HadoopCluster.launchSlaves() - moved success check inside

if(state==pending) block.
  • Loading branch information...
1 parent 4237a19 commit b187cb977c6c62e09d5a4586b96120b99d569623 @carlosdotdanger committed Mar 5, 2011
Showing with 83 additions and 85 deletions.
  1. +82 −84 src/com/lunabeat/dooper/HadoopCluster.java
  2. +1 −1 src/com/lunabeat/pooper/commands/AppCommand.java
View
166 src/com/lunabeat/dooper/HadoopCluster.java
@@ -39,7 +39,7 @@
* @author cory
*/
public class HadoopCluster {
- private static final long DEFAULT_HOLD_TIME = 30000L; //30 seconds
+
public static final String MASTER_SUFFIX = "-master";
public static final String GROUP_NAME_KEY = "group-name";
private String _groupName = null;
@@ -48,16 +48,14 @@
private ArrayList<ClusterInstance> _slaves = new ArrayList<ClusterInstance>();
private ClusterConfig _config = null;
private AmazonEC2 _ec2 = null;
- private long lastUpdate = 0L;
- private long holdTime = DEFAULT_HOLD_TIME;
private static final Integer LOW_PORT = 0;
private static final Integer HI_PORT = 65535;
private static final String ALL_IPS = "0.0.0.0/0";
private static final String TCP = "tcp";
private static final String UDP = "udp";
private static final String ICMP = "icmp";
private static final Pattern userDataValue = Pattern.compile("%([a-zA-Z0-9\\._-]+)%");
- private static final int WAIT_FOR_MASTER_MAX_TIMES = 3;
+ private static final int WAIT_FOR_MASTER_MAX_TIMES = 5;
private static final int WAIT_FOR_MASTER_INTERVAL_SECONDS = 30;
/**
@@ -78,7 +76,6 @@ private void init() {
}
-
private void update() {
//unless force == true check the holdtime and bail if it's too soon.
Date now = new Date();
@@ -88,8 +85,7 @@ private void update() {
_master = null;
DescribeInstancesResult dir =
_ec2.describeInstances(new DescribeInstancesRequest().withFilters(
- new Filter().withName(GROUP_NAME_KEY).withValues(getGroupName(), getMasterGroupName())
- ));
+ new Filter().withName(GROUP_NAME_KEY).withValues(getGroupName(), getMasterGroupName())));
for (Reservation r : dir.getReservations()) {
String rid = r.getReservationId();
List<String> gnames = r.getGroupNames();
@@ -101,10 +97,6 @@ private void update() {
}
}
}
-
- //update timestamp
- lastUpdate = new Date().getTime();
-
}
public List<ClusterInstance> getSlaves() {
@@ -119,66 +111,65 @@ public ClusterInstance getMaster() {
public RunInstancesResult launchMaster(String size) throws IOException {
update();
- if((_master != null) &&( (InstanceStateName.Running == InstanceStateName.fromValue(_master.getInstance().getState().getName()) ) || (InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName()) ) )){
+ if ((_master != null) && ((InstanceStateName.Running == InstanceStateName.fromValue(_master.getInstance().getState().getName())) || (InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName())))) {
Reservation masterReservation =
- _ec2.describeInstances(new DescribeInstancesRequest().withInstanceIds(_master.getInstance().getInstanceId())).getReservations().get(0);
+ _ec2.describeInstances(new DescribeInstancesRequest().withInstanceIds(_master.getInstance().getInstanceId())).getReservations().get(0);
return new RunInstancesResult().withReservation(masterReservation);
}
//make the groups
createSecurityGroups();
- String AMIImage = (_config.get("AMI."+size+".Image") == null)?
- _config.get(ClusterConfig.DEFAULT_AMI_KEY)
- :_config.get("AMI."+size+".Image");
- System.out.println("AMIImage = ["+AMIImage+"]");
- RunInstancesRequest rir = new RunInstancesRequest().
- withImageId(AMIImage).
+ String AMIImage = (_config.get("AMI." + size + ".Image") == null)
+ ? _config.get(ClusterConfig.DEFAULT_AMI_KEY)
+ : _config.get("AMI." + size + ".Image");
+ System.out.println("AMIImage = [" + AMIImage + "]");
+ RunInstancesRequest rir = new RunInstancesRequest().withImageId(AMIImage).
withMinCount(1).
withMaxCount(1).
withInstanceType(size).
withSecurityGroups(_masterGroupName).
- withUserData(Base64.encodeBase64String(getUserData().getBytes()))
- .withKeyName(_config.get(ClusterConfig.KEYPAIR_NAME_KEY));
+ withUserData(Base64.encodeBase64String(getUserData().getBytes())).withKeyName(_config.get(ClusterConfig.KEYPAIR_NAME_KEY));
return _ec2.runInstances(rir);
}
public RunInstancesResult launchSlaves(int howMany, String size) throws IOException {
update();
- if(_master == null ||
- (InstanceStateName.Terminated ==
- InstanceStateName.fromValue(_master.getInstance().getState().getName()))||
- (InstanceStateName.ShuttingDown ==
- InstanceStateName.fromValue(_master.getInstance().getState().getName())))
+ if (_master == null
+ || (InstanceStateName.Terminated
+ == InstanceStateName.fromValue(_master.getInstance().getState().getName()))
+ || (InstanceStateName.ShuttingDown
+ == InstanceStateName.fromValue(_master.getInstance().getState().getName()))) {
return null;
+ }
//wait for master to get internal ip field to pass in userinfo
boolean success = false;
- if(InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName())){
+ if (InstanceStateName.Pending == InstanceStateName.fromValue(_master.getInstance().getState().getName())) {
int attempts = 0;
- while(attempts < WAIT_FOR_MASTER_MAX_TIMES && !success){
+ while ((attempts < WAIT_FOR_MASTER_MAX_TIMES) && !success) {
update();
String pDns = _master.getInstance().getPrivateDnsName();
- if(pDns == null || pDns.length() < 6){
- try{
- Thread.sleep(WAIT_FOR_MASTER_INTERVAL_SECONDS*1000);
- }catch(InterruptedException ie){
+ if (pDns == null || pDns.length() < 6) {
+ try {
+ Thread.sleep(WAIT_FOR_MASTER_INTERVAL_SECONDS * 1000);
+ } catch (InterruptedException ie) {
return null;
}
attempts++;
- }else{
+ } else {
success = true;
}
}
+ if (!success) {
+ System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves "
+ + _groupName + " " + size + " " + howMany + "' after master is running.");
+ System.exit(0);
+ }
}
- if(!success){
- System.out.println("Timed out waiting for master to start.\nDon't panic.\nTry: 'pooper launch-slaves " +
- _groupName + " " + size + " " + howMany + "' after master is running.");
- System.exit(0);
- }
- String AMIImage = (_config.get("AMI."+size+".Image") == null)?
- _config.get(ClusterConfig.DEFAULT_AMI_KEY)
- :_config.get("AMI."+size+".Image");
- RunInstancesRequest rir = new RunInstancesRequest().
- withImageId(AMIImage).
+
+ String AMIImage = (_config.get("AMI." + size + ".Image") == null)
+ ? _config.get(ClusterConfig.DEFAULT_AMI_KEY)
+ : _config.get("AMI." + size + ".Image");
+ RunInstancesRequest rir = new RunInstancesRequest().withImageId(AMIImage).
withMinCount(howMany).
withMaxCount(howMany).
withInstanceType(size).
@@ -197,8 +188,9 @@ public TerminateInstancesResult terminateCluster() {
for (ClusterInstance ci : _slaves) {
iids.add(ci.getInstance().getInstanceId());
}
- if(iids.size() < 1)
+ if (iids.size() < 1) {
return null;
+ }
TerminateInstancesRequest tir =
new TerminateInstancesRequest().withInstanceIds(iids);
return _ec2.terminateInstances(tir);
@@ -234,8 +226,9 @@ public TerminateInstancesResult terminateSlaves(int howMany) {
}
}
- if(iids.size() < 1)
+ if (iids.size() < 1) {
return null;
+ }
TerminateInstancesRequest tir =
new TerminateInstancesRequest().withInstanceIds(iids);
return _ec2.terminateInstances(tir);
@@ -255,39 +248,40 @@ public String getMasterGroupName() {
return _masterGroupName;
}
- public boolean groupsExist(){
+ public boolean groupsExist() {
update();
DescribeSecurityGroupsResult dsr =
- _ec2.describeSecurityGroups(new DescribeSecurityGroupsRequest().withFilters(new Filter(GROUP_NAME_KEY).withValues(_groupName,_masterGroupName)));
- if(dsr.getSecurityGroups().size() > 0){
+ _ec2.describeSecurityGroups(new DescribeSecurityGroupsRequest().withFilters(new Filter(GROUP_NAME_KEY).withValues(_groupName, _masterGroupName)));
+ if (dsr.getSecurityGroups().size() > 0) {
return true;
}
return false;
}
- public void createSecurityGroups(){
- if(groupsExist())
+ public void createSecurityGroups() {
+ if (groupsExist()) {
return;
+ }
String portList = _config.get(ClusterConfig.WEB_PORTS_KEY);
boolean hasWebPorts = false;
List<Integer> webPorts = new ArrayList<Integer>();
- if (!"0".contentEquals(portList)){
+ if (!"0".contentEquals(portList)) {
String[] portParts = portList.split(",");
- for(String portString:portParts){
- try{
+ for (String portString : portParts) {
+ try {
webPorts.add(Integer.parseInt(portString));
- }catch(NumberFormatException e){
+ } catch (NumberFormatException e) {
throw new RuntimeException(ClusterConfig.WEB_PORTS_KEY + " config value must be list of ints or '0'");
}
}
}
- UserIdGroupPair slaveUserIdGroupPair = new UserIdGroupPair().withGroupName(_groupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
- UserIdGroupPair masterUserIdGroupPair = new UserIdGroupPair().withGroupName(_masterGroupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
+ UserIdGroupPair slaveUserIdGroupPair = new UserIdGroupPair().withGroupName(_groupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
+ UserIdGroupPair masterUserIdGroupPair = new UserIdGroupPair().withGroupName(_masterGroupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
CreateSecurityGroupRequest masterCsr = new CreateSecurityGroupRequest().withGroupName(_masterGroupName).
- withDescription("Master group created by hadooper-pooper." );
+ withDescription("Master group created by hadooper-pooper.");
_ec2.createSecurityGroup(masterCsr);
CreateSecurityGroupRequest slaveCsr = new CreateSecurityGroupRequest().withGroupName(_groupName).
- withDescription("Slave group created by hadooper-pooper.");
+ withDescription("Slave group created by hadooper-pooper.");
_ec2.createSecurityGroup(slaveCsr);
ArrayList<IpPermission> ipPerms = new ArrayList<IpPermission>();
ipPerms.add(new IpPermission().withToPort(22).withFromPort(22).withIpProtocol(TCP).withIpRanges(ALL_IPS));
@@ -297,28 +291,30 @@ public void createSecurityGroups(){
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(TCP).withToPort(HI_PORT).withFromPort(LOW_PORT));
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(UDP).withToPort(HI_PORT).withFromPort(LOW_PORT));
ipPerms.add(new IpPermission().withUserIdGroupPairs(slaveUserIdGroupPair).withIpProtocol(ICMP).withToPort(-1).withFromPort(-1));
- if(hasWebPorts)
- for(int port:webPorts)
+ if (hasWebPorts) {
+ for (int port : webPorts) {
ipPerms.add(new IpPermission().withToPort(port).withFromPort(port).withIpProtocol(TCP).withIpRanges(ALL_IPS));
+ }
+ }
+
-
- AuthorizeSecurityGroupIngressRequest masterASR = new AuthorizeSecurityGroupIngressRequest().withGroupName(_masterGroupName)
- .withIpPermissions(ipPerms);
+ AuthorizeSecurityGroupIngressRequest masterASR = new AuthorizeSecurityGroupIngressRequest().withGroupName(_masterGroupName).withIpPermissions(ipPerms);
_ec2.authorizeSecurityGroupIngress(masterASR);
-
- AuthorizeSecurityGroupIngressRequest asr = new AuthorizeSecurityGroupIngressRequest().withGroupName(_groupName)
- .withIpPermissions(ipPerms);
+
+ AuthorizeSecurityGroupIngressRequest asr = new AuthorizeSecurityGroupIngressRequest().withGroupName(_groupName).withIpPermissions(ipPerms);
_ec2.authorizeSecurityGroupIngress(asr);
}
- public void removeSecurityGroups(){
- if(!groupsExist())
+ public void removeSecurityGroups() {
+ if (!groupsExist()) {
return;
- if(_master != null || _slaves.size() > 0)
+ }
+ if (_master != null || _slaves.size() > 0) {
return;
- UserIdGroupPair slaveUserIdGroupPair = new UserIdGroupPair().withGroupName(_groupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
- UserIdGroupPair masterUserIdGroupPair = new UserIdGroupPair().withGroupName(_masterGroupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
+ }
+ UserIdGroupPair slaveUserIdGroupPair = new UserIdGroupPair().withGroupName(_groupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
+ UserIdGroupPair masterUserIdGroupPair = new UserIdGroupPair().withGroupName(_masterGroupName).withUserId(_config.get(ClusterConfig.ACCOUNT_ID_KEY));
ArrayList<IpPermission> ipPerms = new ArrayList<IpPermission>();
ipPerms.add(new IpPermission().withToPort(22).withFromPort(22).withIpProtocol(TCP).withIpRanges(ALL_IPS));
ipPerms.add(new IpPermission().withUserIdGroupPairs(masterUserIdGroupPair).withIpProtocol(TCP).withToPort(HI_PORT).withFromPort(LOW_PORT));
@@ -336,38 +332,40 @@ public void removeSecurityGroups(){
}
- public String getUserData() throws IOException{
+ public String getUserData() throws IOException {
update();
StringBuilder userData = new StringBuilder();
FileReader fr = new FileReader(_config.get(ClusterConfig.USER_DATA_PATH_KEY));
- if(fr == null)
- throw new IOException("Could NOT open resource: ["+ClusterConfig.USER_DATA_PATH_KEY+"]");
+ if (fr == null) {
+ throw new IOException("Could NOT open resource: [" + ClusterConfig.USER_DATA_PATH_KEY + "]");
+ }
BufferedReader buf = new BufferedReader(fr);
String line = null;
- while((line = buf.readLine()) != null){
+ while ((line = buf.readLine()) != null) {
Matcher matches = userDataValue.matcher(line);
- if(!matches.find()){
+ if (!matches.find()) {
userData.append(line);
userData.append("\n");
continue;
}
String key = matches.group(1);
String value = "\"\"";
- if(key.contentEquals(ClusterConfig.MASTER_HOST_KEY)){
- if(_master != null )
+ if (key.contentEquals(ClusterConfig.MASTER_HOST_KEY)) {
+ if (_master != null) {
value = _master.getInstance().getPrivateDnsName();
- }else{
- value = _config.get(key,value);
+ }
+ } else {
+ value = _config.get(key, value);
}
- line = line.replaceAll("%"+key+"%", value);
+ line = line.replaceAll("%" + key + "%", value);
userData.append(line);
userData.append("\n");
}
-
- if(userData.length() < 1)
+
+ if (userData.length() < 1) {
return null;
+ }
return userData.toString();
}
-
}
View
2 src/com/lunabeat/pooper/commands/AppCommand.java
@@ -335,7 +335,7 @@ private void terminateCluster(String clusterName) {
for (InstanceStateChange i : tr.getTerminatingInstances()) {
System.out.println("\t" + i.getInstanceId() + " "
+ i.getPreviousState().getName()
- + "-> " + i.getCurrentState().getName());
+ + " -> " + i.getCurrentState().getName());
}
System.out.println("Success.");
}

0 comments on commit b187cb9

Please sign in to comment.
Something went wrong with that request. Please try again.