Skip to content

Commit

Permalink
updated some SJQ code, refactored a bit of job running code.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbreese committed Apr 14, 2015
1 parent 46e8793 commit eacfd72
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 39 deletions.
Binary file modified lib/sjq-0.3.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions src/java/io/compgen/cgpipe/CGPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class CGPipe {
public static final File CGPIPE_HOME = new File(System.getenv("CGPIPE_HOME") != null ? System.getenv("CGPIPE_HOME") : System.getProperty("user.home"));
public static final File RCFILE = new File(CGPIPE_HOME,".cgpiperc");

// public static final Map<String, VarValue> globalConfig = new HashMap<String, VarValue>();

public static void main(String[] args) {
String fname = null;
String logFilename = null;
Expand Down Expand Up @@ -201,6 +203,9 @@ public static void main(String[] args) {
System.exit(1);
}
}

// Set the global config values
// globalConfig.putAll(root.cloneValues());

// Parse the AST and run it
Parser.exec(fname, root);
Expand Down
6 changes: 3 additions & 3 deletions src/java/io/compgen/cgpipe/parser/TemplateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ public class TemplateParser {
private boolean processedPre = false;
private boolean inScript = false;

public TemplateParser(List<NumberedLine> pre, List<NumberedLine> post) {
private TemplateParser(List<NumberedLine> pre, List<NumberedLine> post) {
this.pre = pre;
this.post = post;
}

public void addScriptLine(NumberedLine line) throws ASTParseException {
private void addScriptLine(NumberedLine line) throws ASTParseException {
curNode = curNode.parseLine(line);
}

public void addBodyLine(String body, NumberedLine line) throws ASTParseException {
private void addBodyLine(String body, NumberedLine line) throws ASTParseException {
processPre();
curNode = curNode.parseBody(body, line);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/io/compgen/cgpipe/parser/target/BuildTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JobDef eval(List<NumberedLine> pre, List<NumberedLine> post) throws ASTPa

TemplateParser.parseTemplate(lines, pre, post, jobRoot);

return new JobDef(jobRoot.getBody(), jobRoot.cloneValues("job."), outputs, inputs);
return new JobDef(jobRoot.getBody(), jobRoot.cloneValues(), outputs, inputs);
}

public boolean isSkippable() {
Expand Down
18 changes: 13 additions & 5 deletions src/java/io/compgen/cgpipe/runner/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ public abstract class JobRunner {
abstract public void innerDone() throws RunnerException;
abstract protected void setConfig(String k, String val);

protected String defaultShell = "/bin/sh";
public static String defaultShell = null;
static {
for (String path: new String[] {"/bin/bash", "/usr/bin/bash", "/usr/local/bin/bash", "/bin/sh"}) {
if (new File(path).exists()) {
defaultShell=path;
break;
}
}
}

static protected Log log = LogFactory.getLog(JobRunner.class);

Expand All @@ -54,6 +62,10 @@ public static JobRunner load(RootContext cxt, boolean dryrun) throws RunnerExcep
if (runner == null) {
runner = "shell";
}

if (cxt.contains("cgpipe.shell")) {
defaultShell = cxt.getString("cgpipe.shell");
}

JobRunner.log.info("job-runner: " +runner);
JobRunner obj = null;
Expand All @@ -71,10 +83,6 @@ public static JobRunner load(RootContext cxt, boolean dryrun) throws RunnerExcep
default:
throw new RunnerException("Can't load job runner: "+runner);
}

if (cxt.contains("cgpipe.shell")) {
obj.defaultShell = cxt.getString("cgpipe.shell");
}

String prefix = "cgpipe.runner."+runner;
Map<String, VarValue> cxtvals = cxt.cloneValues(prefix);
Expand Down
4 changes: 2 additions & 2 deletions src/java/io/compgen/cgpipe/runner/SGERunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class SGERunner extends JobRunner {
private boolean hvmemIsTotal = true;
private String account=null;
private String parallelEnv = "shm";
private String shell = ShellScriptRunner.findDefaultShell();
private String shell = ShellScriptRunner.defaultShell;

private int dryRunJobCount = 0;

Expand Down Expand Up @@ -303,7 +303,7 @@ private String buildScript(JobDef jobdef) {
is to specify it as the amount of memory per-processor
(only used when procs > 1)
shell - a shell to use for the script (default(s): /bin/bash, /usr/bin/bash, /usr/local/bin/bash, /bin/sh)
shell - a shell to use for the script (default(s): /defaultShell/bash, /usr/defaultShell/bash, /usr/local/defaultShell/bash, /defaultShell/sh)
*/
@Override
Expand Down
36 changes: 28 additions & 8 deletions src/java/io/compgen/cgpipe/runner/SJQRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -28,6 +29,7 @@ public class SJQRunner extends JobRunner {

private File socketFile = new File(CGPipe.CGPIPE_HOME, ".sjqserv");
private File passwdFile = new File(CGPipe.CGPIPE_HOME, ".sjqpass");
private File jobLogFile = new File(CGPipe.CGPIPE_HOME, ".sjqjobs");
private int maxProcs = Runtime.getRuntime().availableProcessors();
private String maxMem = null;
private String tempDir = null;
Expand Down Expand Up @@ -62,16 +64,32 @@ public void connect() throws IOException, CommandArgumentException, SJQServerExc
server.setSilent(false);
server.setPort(port);
server.setTimeout(30);
server.setJobLogFile(jobLogFile.getAbsolutePath());
server.start();
} else {
passwd = StringUtils.strip(StringUtils.readFile(passwdFile));
}

String addr = StringUtils.strip(StringUtils.readFile(socketFile));
if (addr != null && !addr.equals("")) {
String ip = addr.split(":")[0];
int port = Integer.parseInt(addr.split(":")[1]);
client = new SJQClient(ip, port, passwd);
try {
String addr = StringUtils.strip(StringUtils.readFile(socketFile));
if (addr != null && !addr.equals("")) {
String ip = addr.split(":")[0];
int port = Integer.parseInt(addr.split(":")[1]);
client = new SJQClient(ip, port, passwd);
}
} catch (IOException e) {
log.debug("Error connecting to server - trying again. "+ e.getMessage());
try {
String addr = StringUtils.strip(StringUtils.readFile(socketFile));
if (addr != null && !addr.equals("")) {
String ip = addr.split(":")[0];
int port = Integer.parseInt(addr.split(":")[1]);
client = new SJQClient(ip, port, passwd);
}
} catch (IOException e1) {
log.debug("Error connecting to server - again - Failing... "+ e.getMessage());
throw e1;
}
}
}

Expand All @@ -87,10 +105,10 @@ public boolean submit(JobDef jobdef) throws RunnerException {
}

try {
String jobId = client.submitJob(jobdef.getName(), jobdef.getBody(),
String jobId = client.submitJob(jobdef.getName(), "#!"+ShellScriptRunner.defaultShell+"\n"+jobdef.getBody(),
((int)jobdef.getSettingInt("job.procs", 1)), jobdef.getSetting("job.mem"),
jobdef.getSetting("job.stderr"), jobdef.getSetting("job.stdout"),
jobdef.getSetting("job.wd"), null,
jobdef.getSetting("job.wd", new File(".").getCanonicalPath()), null,
IterUtils.map(jobdef.getDependencies(), new MapFunc<JobDependency, String>() {
@Override
public String map(JobDependency dep) {
Expand All @@ -103,7 +121,7 @@ public String map(JobDependency dep) {
log.debug(jobId + " " + line);
}
return jobId != null;
} catch (ClientException | AuthException e) {
} catch (ClientException | AuthException | IOException e) {
e.printStackTrace();
throw new RunnerException(e);
}
Expand Down Expand Up @@ -150,6 +168,8 @@ protected void setConfig(String k, String val) {
this.socketFile = new File(val);
} else if (k.equals("cgpipe.runner.sjq.passwdfile")) {
this.passwdFile = new File(val);
} else if (k.equals("cgpipe.runner.sjq.joblog")) {
this.jobLogFile = new File(val);
} else if (k.equals("cgpipe.runner.sjq.maxprocs")) {
this.maxProcs = Integer.parseInt(val);
} else if (k.equals("cgpipe.runner.sjq.port")) {
Expand Down
18 changes: 3 additions & 15 deletions src/java/io/compgen/cgpipe/runner/ShellScriptRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,17 @@

import io.compgen.cgpipe.exceptions.RunnerException;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ShellScriptRunner extends JobRunner {
private static String[] defaultShellPaths = new String[] {"/bin/bash", "/usr/bin/bash", "/usr/local/bin/bash", "/bin/sh"};

public static String findDefaultShell() {
for (String path: defaultShellPaths) {
if (new File(path).exists()) {
return path;
}
}
return null;
}

public String bin = findDefaultShell();
protected Log log = LogFactory.getLog(ShellScriptRunner.class);

private List<JobDef> jobs = new ArrayList<JobDef>();
private String shellPath = defaultShell;

@Override
public boolean submit(JobDef jobdef) {
Expand All @@ -46,7 +34,7 @@ public void innerDone() throws RunnerException {
for (JobDef job: jobs) {
if (!job.getBody().equals("")) {
if (!header) {
out += "#!"+bin+"\n";
out += "#!"+shellPath+"\n";
header = true;
}
out += job.getJobId()+"() {\n";
Expand All @@ -66,7 +54,7 @@ public void innerDone() throws RunnerException {
@Override
protected void setConfig(String k, String val) {
if (k.equals("cgpipe.runner.shell.bin")) {
bin = val;
shellPath = val;
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/test-scripts/misc.mvpt
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ output3: output2
touch output3

output1 "output2":
<% if foo == "bar" %>
touch output1.bar
<% else %>
touch output1.notbar
<% endif %>
<% if foo == "bar" %> touch output1.bar
<% else %> touch output1.notbar <% endif %>

<%
for i in 1..5
Expand Down

0 comments on commit eacfd72

Please sign in to comment.