Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tej package: documentation and improvements #1105

Merged
merged 8 commits into from
Aug 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/usersguide/example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Intermediate Concepts and VisTrails Packages
example_webservices
example_itk
persistent_archive
tej
vistrails_server
latex
example_scikit_learn
Binary file added doc/usersguide/figures/tej/pipeline.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
72 changes: 72 additions & 0 deletions doc/usersguide/tej.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
.. _chap-tej:

***********************************
Running commands on a remote server
***********************************

The ``tej`` tool provides a way to start job on any remote server through SSH, associate it with an identifier, and monitor its status. When the job is complete, it can download the resulting files through SCP.

In VisTrails, the subpipeline's signature is used as the job identifier. This means that once you have run your pipeline one and the job has been submitted, running it again will match it to the existing job, and will either wait for the job to complete or download the existing results without running it again.

Referencing a queue
===================

The first thing you need to do is setup a ``Queue`` module that indicates which server to connect to (and optionally, where on the filesystem should the jobs be stored).

No setup is required on the server (though VisTrails/tej needs to be able to connect to it via SSH, so you might want to setup public key authentication), the directory will be created on the server with the necessary structure and helpers.

Submitting a job
================

The ``SubmitJob`` module upload a job to a server if it doesn't exist there already (checking for the same subpipeline) and returns a Job object suitable for downloading its results. Regardless of whether the job is created or it already existed, VisTrails will wait for it to complete before carrying on executing your workflow; if you click "cancel" however, it will add the job to the job monitor and keep tabs on the server, alerting you when the job is done so you can resume executing the workflow.

The job is simply a directory that will be uploaded to the server, with a start.sh script that will be executed there (or whatever name is set on the ``script`` input port). Remember to use relative paths in there so that different jobs don't overwrite their files.

A different module, ``SubmitShellJob``, makes it easy to submit a job consisting of a single shell script that you can enter directory in the module configuration window. Its output (stdout, stderr) is downloaded and returned as files on the corresponding output ports.

Downloading output files
========================

You can connect SubmitJob's output to ``DownloadFile`` modules to retrieve generated files from the server and use them in the following steps of your pipeline. The module only needs a ``filename`` parameter, which is relative to the job's directory. The ``DownloadDirectory`` module works in the same way but downloads a whole subdirectory recursively.

Example
=======

In this example, we'll submit a simple Python script to a server via SSH. That script searches for the largest prime factor of a given number and prints it to the console.

.. topic:: Try it Now!

First, create the Python script. You can use the ``String`` module, entering the script in the configuration widgets; connect it to a ``WriteFile`` module to get a file suitable for uploading.

::

#!/usr/bin/env python
with open('input') as fp:
number = int(fp.read().strip())

largest = None
n = 2
while n <= number:
while number % n == 0:
number /= n
if number == 1:
largest = n
break
n += 1

with open('output', 'w') as fp:
if largest is not None:
fp.write("%d" % largest)

As you can see, this script reads the target number from a file, ``input``, and writes the result to another file, ``output``. You can create the ``input`` file from an Integer using the ``StringFormat`` module (setting the `format` to ``{target}`` for example).

Add a ``DownloadFile`` module to download ``output`` and print the file with ``StandardOutput`` for example.

The end result should look like this :vtl:`(or open it from here) <tej-primes.vt>`:

.. figure:: figures/tej/pipeline.png
:align: center

Running it will start the job on the server. The job monitor window will pop up to indicate that it knows about the remote job, and that it is currently running. Clicking the "check" button or re-running the workflow will update the status, and eventually run the rest of the pipeline when the job is done, displaying the result.

Because the job identifier is computed from the signature of the subpipeline consisting of ``SubmitJob`` and its upstream modules, anyone running the same job on the same server will hit the same job, and will reuse your results without triggering a recomputation. But if you change the script, or choose a different target number to factorize, a new job will be submitted, that will not affect the result seen by other users and other workflows.
1 change: 1 addition & 0 deletions doc/usersguide/vtl/tej-primes.vtl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<vtlink filename="@examples/tej-primes.vt"/>
Binary file added examples/tej-primes.vt
Binary file not shown.
183 changes: 152 additions & 31 deletions vistrails/packages/tej/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io
import logging
import os
import shutil
import urllib
import sys

from vistrails.core import debug
from vistrails.core.bundles import py_import
Expand All @@ -23,6 +25,11 @@


class ServerLogger(tej.ServerLogger):
"""Subclass of tej server logger that can be toggled on and off.

This is used to hide server messages from the VisTrails console when
running tej commands.
"""
def __init__(self):
self.hidden = False
tej.ServerLogger.__init__(self)
Expand All @@ -45,6 +52,8 @@ def message(self, data):


class RemoteQueue(tej.RemoteQueue):
"""Subclass of tej's RemoteQueue that uses our `ServerLogger`.
"""
def server_logger(self):
return ServerLogger

Expand Down Expand Up @@ -94,7 +103,80 @@ def compute(self):
self.set_output('queue', QueueCache.get(destination_str, queue))


class AssembleDirectoryMixin(object):
"""A mixin for assembling a directory from paths passed on port specs.

Modules using this mixin should use the DirectoryConfigurationWidget
configuration widget to setup the input port specs.
:meth:`assemble_directory` returns a PathObject for the temporary directory
with the paths copied into it.
"""
def __init__(self):
self.input_ports_order = []

def transfer_attrs(self, module):
super(AssembleDirectoryMixin, self).transfer_attrs(module)
self.input_ports_order = [p.name for p in module.input_port_specs]

def assemble_directory(self, base=None, do_copy=False):
"""Create the directory using the optional `base` and the port specs.

:type base: PathObject

This creates a directory that contains all the contents of `base` (if
provided, else start empty), and all the additional files and
directories provided as input port specs (configured using the
`DirectoryConfigurationWidget`).

If there are no input port specs and `do_copy` is False, this method
will just return the currect location of `base` without copying it to a
temporary directory.
"""
if self.input_ports_order or do_copy:
directory = self.interpreter.filePool.create_directory(
prefix='vt_tmp_makedirectory_')

# Copy everything in base
if base is not None:
for name in os.listdir(base.name):
src = os.path.join(base.name, name)
dst = os.path.join(directory.name, name)
if os.path.isdir(src):
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
# Copy from port specs
for name in self.input_ports_order:
shutil.copy(self.get_input(name).name,
os.path.join(directory.name, name))
return directory
else:
return base


class MakeDirectory(AssembleDirectoryMixin, Module):
"""Creates a temporary directory and puts the given files in it.
"""
_settings = ModuleSettings(configure_widget=(
'%s.widgets' % this_pkg, 'DirectoryConfigurationWidget'))
_output_ports = [('directory', '(basic:Directory)')]

def __init__(self):
AssembleDirectoryMixin.__init__(self)
Module.__init__(self)

def compute(self):
directory = self.assemble_directory()
self.set_output('directory', directory)


class RemoteJob(object):
"""This implements the JobMonitor's JobHandle interface.

These objects are returned to the JobMonitor via the ModuleSuspended
exceptions; they are used by the JobMonitor to figure out the status of the
submitted job.
"""
def __init__(self, queue, job_id):
self.queue = queue
self.job_id = job_id
Expand All @@ -117,7 +199,8 @@ class Job(Module):
the creating module would have failed/suspended.

You probably won't use this module directly since it references a
pre-existing job by name.
pre-existing job by name; just use one of the SubmitJob modules that
compute the upstream's signature.
"""
_input_ports = [('id', '(basic:String)'),
('queue', Queue)]
Expand Down Expand Up @@ -156,7 +239,9 @@ class BaseSubmitJob(JobMixin, Module):
is finished, you can obtain files from it.
"""
_settings = ModuleSettings(abstract=True)
_input_ports = [('queue', Queue)]
_input_ports = [('queue', Queue),
('print_error', '(basic:Boolean)',
{'optional': True, 'defaults': "['False']"})]
_output_ports = [('job', '(org.vistrails.extra.tej:Job)'),
('exitcode', '(basic:Integer)')]

Expand Down Expand Up @@ -196,7 +281,8 @@ def job_finish(self, params):
Gets the exit code from the server.
"""
queue = QueueCache.get(params['destination'], params['queue'])
status, target, arg = queue.status(params['job_id'])
with ServerLogger.hide_output():
status, target, arg = queue.status(params['job_id'])
assert status == tej.RemoteQueue.JOB_DONE
params['exitcode'] = int(arg)
return params
Expand All @@ -208,14 +294,41 @@ def job_set_results(self, params):
self.set_output('exitcode', params['exitcode'])
self.set_output('job', RemoteJob(queue, params['job_id']))

if self.get_input('print_error') and params['exitcode'] != 0:
# If it failed and 'print_error' is set, download and print stderr
# then fail
destination = self.interpreter.filePool.create_file(
prefix='vt_tmp_job_err_').name
with ServerLogger.hide_output():
queue.download(params['job_id'],
'_stderr',
destination=destination,
recursive=False)
with open(destination, 'r') as fin:
chunk = fin.read(4096)
sys.stderr.write(chunk)
while len(chunk) == 4096:
chunk = fin.read(4096)
if chunk:
sys.stderr.write(chunk)
raise ModuleError(self,
"Job failed with status %d" % params['exitcode'])


class SubmitJob(BaseSubmitJob):
class SubmitJob(AssembleDirectoryMixin, BaseSubmitJob):
"""Submits a generic job (a directory).
"""
_input_ports = [('job', '(basic:Directory)'),
_settings = ModuleSettings(configure_widget=(
'%s.widgets' % this_pkg, 'DirectoryConfigurationWidget'))
_input_ports = [('job', '(basic:Directory)',
{'optional': True}),
('script', '(basic:String)',
{'optional': True, 'defaults': "['start.sh']"})]

def __init__(self):
AssembleDirectoryMixin.__init__(self)
Module.__init__(self)

def job_start(self, params):
"""Sends the directory and submits the job.
"""
Expand All @@ -230,14 +343,25 @@ def job_start(self, params):
else:
return params

job_dir = self.get_input('job').name
if not os.path.exists(job_dir):
raise ModuleError(self, "Directory doesn't exist")
if self.has_input('job'):
job_dir = self.get_input('job')
if not os.path.exists(job_dir.name):
raise ModuleError(self, "Directory doesn't exist")
else:
job_dir = None

# Use AssembleDirectoryMixin to get additional files from port specs
job_dir = self.assemble_directory(job_dir, False)

# Check that the script exists
script = self.get_input('script')
if not os.path.exists(os.path.join(job_dir.name, script)):
raise ModuleError(self, "Script does not exist")

# Alright, submit a new job
queue.submit(params['job_id'],
job_dir,
self.get_input('script'))
job_dir.name,
script)
return params


Expand All @@ -250,8 +374,6 @@ class SubmitShellJob(BaseSubmitJob):
_output_ports = [('stderr', '(basic:File)'),
('stdout', '(basic:File)')]

_job_interpreter = '/bin/sh'

def job_start(self, params):
"""Creates a temporary job with the given source, upload and submit it.
"""
Expand All @@ -276,15 +398,9 @@ def job_start(self, params):
kwargs = {'mode': 'wb'}
else:
kwargs = {'mode': 'w', 'newline': '\n'}
with io.open(os.path.join(directory, 'vistrails_source.sh'),
with io.open(os.path.join(directory, 'start.sh'),
**kwargs) as fp:
fp.write(source)
with io.open(os.path.join(directory, 'start.sh'), 'w',
newline='\n') as fp:
fp.write(u'%s '
u'vistrails_source.sh '
u'>_stdout.txt '
u'2>_stderr.txt\n' % self._job_interpreter)

queue.submit(params['job_id'], directory)

Expand All @@ -298,12 +414,12 @@ def job_set_results(self, params):
temp_dir = self.interpreter.filePool.create_directory(
prefix='vt_tmp_shelljobout_').name
queue = QueueCache.get(params['destination'], params['queue'])
queue.download(params['job_id'], ['_stderr.txt', '_stdout.txt'],
queue.download(params['job_id'], ['_stderr', '_stdout'],
directory=temp_dir)
self.set_output('stderr',
PathObject(os.path.join(temp_dir, '_stderr.txt')))
PathObject(os.path.join(temp_dir, '_stderr')))
self.set_output('stdout',
PathObject(os.path.join(temp_dir, '_stdout.txt')))
PathObject(os.path.join(temp_dir, '_stdout')))


class DownloadFile(Module):
Expand All @@ -319,10 +435,11 @@ def compute(self):

destination = self.interpreter.filePool.create_file(
prefix='vt_tmp_shelljobout_')
job.queue.download(job.job_id,
self.get_input('filename'),
destination=destination.name,
recursive=False)
with ServerLogger.hide_output():
job.queue.download(job.job_id,
self.get_input('filename'),
destination=destination.name,
recursive=False)

self.set_output('file', destination)

Expand All @@ -341,10 +458,11 @@ def compute(self):
destination = self.interpreter.filePool.create_directory(
prefix='vt_tmp_shelljobout_').name
target = os.path.join(destination, 'dir')
job.queue.download(job.job_id,
self.get_input('pathname'),
destination=target,
recursive=True)
with ServerLogger.hide_output():
job.queue.download(job.job_id,
self.get_input('pathname'),
destination=target,
recursive=True)

self.set_output('directory', PathObject(target))

Expand All @@ -353,6 +471,8 @@ def compute(self):


class _VisTrailsTejLogHandler(logging.Handler):
"""Custom handler for the 'tej' logger that re-logs to VisTrails.
"""
def emit(self, record):
msg = "tej: %s" % self.format(record)
if record.levelno >= logging.CRITICAL:
Expand Down Expand Up @@ -387,4 +507,5 @@ def finalize():

_modules = [Queue,
Job, BaseSubmitJob, SubmitJob, SubmitShellJob,
DownloadFile, DownloadDirectory]
DownloadFile, DownloadDirectory,
MakeDirectory]