Permalink
Browse files

added remote commands to HadoopCluster (untested).

  • Loading branch information...
1 parent 009e69c commit 0dab72cd482e08fc2eb31d9d2abd2e442cb6d916 @carlosdotdanger committed Mar 19, 2011
View
51 src/com/lunabeat/dooper/CmdException.java
@@ -0,0 +1,51 @@
+/**
+ Copyright [2011] [carlosdotdanger]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ **/
+
+package com.lunabeat.dooper;
+
+
+class CmdException extends Exception {
+ ClusterInstance _instance;
+
+ /**
+ *
+ * @param message
+ * @param host
+ */
+ CmdException(String message,ClusterInstance host){
+ super(message);
+ _instance = host;
+ }
+
+ /**
+ *
+ * @param message
+ * @param cause
+ * @param host
+ */
+ CmdException(String message,Throwable cause,ClusterInstance host){
+ super(message,cause);
+ _instance = host;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public ClusterInstance getInstance(){
+ return _instance;
+ }
+}
View
51 src/com/lunabeat/dooper/CmdSessionResult.java
@@ -0,0 +1,51 @@
+/*
+ Copyright [2011] [carlosdotdanger]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package com.lunabeat.dooper;
+
+/**
+ *
+ *
+ */
+public class CmdSessionResult {
+ private int _code;
+ private String _stderr;
+ private String _stdout;
+
+ public CmdSessionResult(int code,String stdout,String stderr){
+ _code = code;
+ _stdout = stdout;
+ _stderr = stderr;
+ }
+
+ /**
+ * @return result code
+ */ public int getCode() {
+ return _code;
+ }
+
+ /**
+ * @return stderr
+ */ public String getStderr() {
+ return _stderr;
+ }
+
+ /**
+ * @return stdout
+ */ public String getStdout() {
+ return _stdout;
+ }
+}
View
68 src/com/lunabeat/dooper/HadoopCluster.java
@@ -17,6 +17,8 @@
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
+import ch.ethz.ssh2.Session;
+import ch.ethz.ssh2.StreamGobbler;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.AuthorizeSecurityGroupIngressRequest;
@@ -41,6 +43,8 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -139,10 +143,8 @@ public RunInstancesResult launchMaster(String size) throws IOException {
}
//make the groups
createSecurityGroups();
- String AMIImage = (_config.get("AMI." + size + ".Image") == null)
- ? _config.get(ClusterConfig.DEFAULT_AMI_KEY)
- : _config.get("AMI." + size + ".Image");
- System.out.println("AMIImage = [" + AMIImage + "]");
+ String AMIImage = _config.get( "AMI." + size + ".Image", _config.get(ClusterConfig.DEFAULT_AMI_KEY) );
+ LOGGER.info("AMIImage = [" + AMIImage + "]");
RunInstancesRequest rir = new RunInstancesRequest().withImageId(AMIImage).
withMinCount(1).
withMaxCount(1).
@@ -192,9 +194,7 @@ public RunInstancesResult launchSlaves(int howMany, String size) throws IOExcept
}
- String AMIImage = (_config.get("AMI." + size + ".Image") == null)
- ? _config.get(ClusterConfig.DEFAULT_AMI_KEY)
- : _config.get("AMI." + size + ".Image");
+ String AMIImage = _config.get( "AMI." + size + ".Image", _config.get(ClusterConfig.DEFAULT_AMI_KEY) );
RunInstancesRequest rir = new RunInstancesRequest().withImageId(AMIImage).
withMinCount(howMany).
withMaxCount(howMany).
@@ -422,6 +422,60 @@ public void putFile(ClusterInstance host, String src, String dest) throws SCPExc
}
}
+/**
+ *
+ * @param hosts
+ * @param command
+ * @return
+ * @throws CmdException
+ */
+ public List<CmdSessionResult> remoteCommand(List<ClusterInstance> hosts, String command) throws CmdException{
+ ArrayList<CmdSessionResult> results = new ArrayList<CmdSessionResult>();
+ for(ClusterInstance host:hosts)
+ remoteCommand(host,command);
+ return results;
+ }
+
+/**
+ *
+ * @param host
+ * @param command
+ * @return
+ * @throws CmdException
+ */
+ public CmdSessionResult remoteCommand(ClusterInstance host, String command) throws CmdException{
+ try{
+ Connection conn = new Connection(host.getInstance().getPublicDnsName());
+ conn.connect();
+ File keyfile = new File(_config.get(ClusterConfig.KEYPAIR_FILE_KEY));
+ boolean isAuthenticated =
+ conn.authenticateWithPublicKey(
+ _config.get(ClusterConfig.USERNAME_KEY),
+ keyfile, BLANK);
+ Session session = conn.openSession();
+ session.execCommand(command);
+ InputStream outStrm = new StreamGobbler(session.getStdout());
+ InputStream errStrm = new StreamGobbler(session.getStderr());
+ BufferedReader stdoutRdr = new BufferedReader(new InputStreamReader(outStrm));
+ BufferedReader stderrRdr = new BufferedReader(new InputStreamReader(errStrm));
+ StringBuilder sb = new StringBuilder();
+ String stdout;
+ while((stdout = stdoutRdr.readLine()) != null){
+ sb.append(stdout);
+ }
+ stdout = sb.toString();
+ sb = new StringBuilder();
+ String stderr;
+ while((stderr = stderrRdr.readLine()) != null){
+ sb.append(stderr);
+ }
+ stderr = sb.toString();
+ return new CmdSessionResult(session.getExitStatus(),stdout,stderr);
+ } catch (IOException e) {
+ throw new CmdException(e.getMessage(), e.getCause(), host);
+ }
+ }
+
/**
*
* @param path
View
5 src/com/lunabeat/dooper/SCPException.java
@@ -15,10 +15,7 @@
*/
package com.lunabeat.dooper;
-/**
- *
- * @author cory
- */
+
public class SCPException extends Exception {
ClusterInstance _instance;

0 comments on commit 0dab72c

Please sign in to comment.