Skip to content
Permalink
Browse files

Added RemoteQ module RunJob

packages/RemoteQ/init.py:
  - RunJob: *new
    Whereas RunCommand waits until it finishes, RunJob runs a job
    over SSH asynchronously. It can be detached, returns stdout/stderr,
    and fails with a ModuleError if return code is not 0.
  • Loading branch information...
rexissimus committed Oct 14, 2014
1 parent 64f7c59 commit 3f9aebd3289d7b27b6bdace29402fc0393685e59
Showing with 82 additions and 2 deletions.
  1. +82 −2 vistrails/packages/RemoteQ/init.py
@@ -43,7 +43,7 @@
from remoteq.pipelines.shell import FileCommander as BQMachine
from remoteq.core.stack import select_machine, end_machine, use_machine, \
current_machine
from remoteq.batch.commandline import PBS, PBSScript
from remoteq.batch.commandline import PBS, PBSScript, Subshell
from remoteq.batch.directories import CreateDirectory
from remoteq.batch.files import TransferFiles

@@ -169,6 +169,10 @@ def get_default_machine(self):
return server, port, username, password

class RunCommand(RQModule):
""" Runs a command over SSH and waits until it finishes
"""

_input_ports = [('machine', Machine),
('command', '(edu.utah.sci.vistrails.basic:String)',True),
]
@@ -198,6 +202,70 @@ def compute(self):
self.set_output("output", result)
self.set_output("machine", machine)

class RunJob(RQModule):
""" Run an asynchronous command that can be detached and polled.
This is preferable over RunCommand for long-running operations
"""

_input_ports = [('machine', Machine),
('command', '(edu.utah.sci.vistrails.basic:String)', True),
('working_directory', '(edu.utah.sci.vistrails.basic:String)'),
]

_output_ports = [('stdout', '(edu.utah.sci.vistrails.basic:String)'),
('stderr', '(edu.utah.sci.vistrails.basic:String)'),
]

job = None
def readInputs(self):
d = {}
if not self.has_input('command'):
raise ModuleError(self, "No command specified")
d['command'] = self.get_input('command').strip()
d['working_directory'] = self.get_input('working_directory') \
if self.has_input('working_directory') else '.'
return d

def startJob(self, params):
work_dir = params['working_directory']
self.machine = self.get_machine()
use_machine(self.machine)
self.job = Subshell("remote", params['command'], work_dir)
self.job.run()
ret = self.job._ret
if ret:
try:
job_id = int(ret.split('\n')[0])
except ValueError:
end_machine()
raise ModuleError(self, "Error submitting job: %s" % ret)
self.set_job_machine(params, self.machine)
return params

def getMonitor(self, params):
if not self.job:
self.startJob(params)
return self.job

def finishJob(self, params):
params['stdout'] = self.job.standard_output()
params['stderr'] = self.job.standard_error()
if self.job.failed():
self.job._pushw()
code = self.job.terminal.cat("%s.failed" %
self.job._identifier_filename)
self.job._popw()
end_machine()
raise ModuleError(self,
"Command failed with error code %s: %s" %
(code.strip(), params['stderr'].strip()))
end_machine()
return params

def setResults(self, params):
self.set_output('stdout', params['stdout'])
self.set_output('stderr', params['stderr'])

class PBSJob(RQModule):
_input_ports = [('machine', Machine),
('command', '(edu.utah.sci.vistrails.basic:String)', True),
@@ -276,6 +344,10 @@ def compute(self):
[f.split(' ')[-1] for f in files.split('\n')[1:]])

class RunPBSScript(RQModule):
""" Run a pbs script by submitting it to a PBS scheduler
"""

_input_ports = [('machine', Machine),
('command', '(edu.utah.sci.vistrails.basic:String)', True),
('working_directory', '(edu.utah.sci.vistrails.basic:String)'),
@@ -354,6 +426,10 @@ def setResults(self, params):
self.set_output('stderr', params['stderr'])

class SyncDirectories(RQModule):
""" Copy all files in a directory to/from a remote server using SFTP
"""

_input_ports = [('machine', Machine),
('local_directory', '(edu.utah.sci.vistrails.basic:String)'),
('remote_directory', '(edu.utah.sci.vistrails.basic:String)'),
@@ -391,6 +467,10 @@ def compute(self):
self.set_output("machine", machine)

class CopyFile(RQModule):
""" Copy a single file to/from a remote server using SFTP
"""

_input_ports = [('machine', Machine),
('local_file', '(edu.utah.sci.vistrails.basic:String)'),
('remote_file', '(edu.utah.sci.vistrails.basic:String)'),
@@ -447,7 +527,7 @@ def handle_module_upgrade_request(controller, module_id, pipeline):

def initialize():
global _modules
_modules = [Machine, RQModule, RunPBSScript, RunCommand,
_modules = [Machine, RQModule, RunPBSScript, RunCommand, RunJob,
SyncDirectories, CopyFile]
import base
import hdfs

0 comments on commit 3f9aebd

Please sign in to comment.
You can’t perform that action at this time.