Skip to content

Commit

Permalink
Merge pull request #177 from jmchilton/sync_runner_util
Browse files Browse the repository at this point in the history
Sync "recent" galaxy runner util changes.
  • Loading branch information
jmchilton committed Apr 26, 2019
2 parents 7d977af + 29a93c8 commit 9ab6683
Show file tree
Hide file tree
Showing 19 changed files with 340 additions and 74 deletions.
3 changes: 1 addition & 2 deletions pulsar/managers/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
functionality shared between Galaxy and the Pulsar.
"""
from galaxy.util.bunch import Bunch

from .kill import kill_pid

__all__ = ['kill_pid', 'Bunch']
__all__ = ('kill_pid', 'Bunch')
34 changes: 17 additions & 17 deletions pulsar/managers/util/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""
"""
import json
from glob import glob
from inspect import getsourcefile
from os import pardir
from os.path import abspath
from os.path import basename
from os.path import join
from os import getcwd
from os.path import (
basename,
join
)

DEFAULT_SHELL_PLUGIN = 'LocalShell'

Expand All @@ -22,13 +23,12 @@ class CliInterface(object):
def __init__(self, code_dir='lib'):
"""
"""
def __load(module_prefix, d, code_dir):
module_pattern = join(join(code_dir, module_prefix), '*.py')
def __load(module_path, d):
module_pattern = join(join(getcwd(), code_dir, *module_path.split('.')), '*.py')
for file in glob(module_pattern):
if basename(file).startswith('_'):
continue
file = file.split(code_dir)[1]
module_name = '%s.%s' % (module_prefix.replace("/", "."), basename(file).rsplit('.py', 1)[0])
module_name = '%s.%s' % (module_path, basename(file).rsplit('.py', 1)[0])
module = __import__(module_name)
for comp in module_name.split(".")[1:]:
module = getattr(module, comp)
Expand All @@ -40,13 +40,11 @@ def __load(module_prefix, d, code_dir):

self.cli_shells = {}
self.cli_job_interfaces = {}
self.active_cli_shells = {}

module_prefix = self.__module__
module_prefix = join(*module_prefix.split("."))
module_path = abspath(join(getsourcefile(CliInterface), pardir))
code_dir = module_path.split(module_prefix)[0]
__load('%s/shell' % module_prefix, self.cli_shells, code_dir)
__load('%s/job' % module_prefix, self.cli_job_interfaces, code_dir)
__load('%s.shell' % module_prefix, self.cli_shells)
__load('%s.job' % module_prefix, self.cli_job_interfaces)

def get_plugins(self, shell_params, job_params):
"""
Expand All @@ -59,16 +57,18 @@ def get_plugins(self, shell_params, job_params):

def get_shell_plugin(self, shell_params):
shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN)
shell = self.cli_shells[shell_plugin](**shell_params)
return shell
requested_shell_settings = json.dumps(shell_params, sort_keys=True)
if requested_shell_settings not in self.active_cli_shells:
self.active_cli_shells[requested_shell_settings] = self.cli_shells[shell_plugin](**shell_params)
return self.active_cli_shells[requested_shell_settings]

def get_job_interface(self, job_params):
job_plugin = job_params.get('plugin', None)
if not job_plugin:
raise ValueError(ERROR_MESSAGE_NO_JOB_PLUGIN)
job_plugin_class = self.cli_job_interfaces.get(job_plugin, None)
if not job_plugin_class:
raise ValueError(ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN % (job_plugin, self.cli_job_interfaces.keys()))
raise ValueError(ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN % (job_plugin, list(self.cli_job_interfaces.keys())))
job_interface = job_plugin_class(**job_params)

return job_interface
Expand Down
21 changes: 19 additions & 2 deletions pulsar/managers/util/cli/job/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""
Abstract base class for cli job plugins.
"""
from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod
)

import six


@six.add_metaclass(ABCMeta)
class BaseJobExec(object):
__metaclass__ = ABCMeta

@abstractmethod
def __init__(self, **params):
Expand Down Expand Up @@ -56,3 +61,15 @@ def parse_single_status(self, status, job_id):
"""
Parse the status of output from get_single_status command.
"""

def get_failure_reason(self, job_id):
"""
Return the failure reason for the given job_id.
"""
return None

def parse_failure_reason(self, reason, job_id):
"""
Parses the failure reason, assigning it against a
"""
return None
129 changes: 129 additions & 0 deletions pulsar/managers/util/cli/job/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger

try:
from galaxy.model import Job
job_states = Job.states
except ImportError:
# Not in Galaxy, map Galaxy job states to Pulsar ones.
from pulsar.util import enum
job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed")
from ..job import BaseJobExec

log = getLogger(__name__)

argmap = {
'memory': '-M', # There is code in job_script_kwargs relying on this name's setting
'cores': '-n',
'queue': '-q',
'working_dir': '-cwd',
'project': '-P'
}


class LSF(BaseJobExec):

def __init__(self, **params):
self.params = {}
for k, v in params.items():
self.params[k] = v

def job_script_kwargs(self, ofile, efile, job_name):
scriptargs = {'-o': ofile,
'-e': efile,
'-J': job_name}

# Map arguments using argmap.
for k, v in self.params.items():
if k == 'plugin':
continue
try:
if k == 'memory':
# Memory requires both -m and -R rusage[mem=v] request
scriptargs['-R'] = "\"rusage[mem=%s]\"" % v
if not k.startswith('-'):
k = argmap[k]
scriptargs[k] = v
except Exception:
log.warning('Unrecognized long argument passed to LSF CLI plugin: %s' % k)

# Generated template.
template_scriptargs = ''
for k, v in scriptargs.items():
template_scriptargs += '#BSUB %s %s\n' % (k, v)
return dict(headers=template_scriptargs)

def submit(self, script_file):
# bsub returns Job <9147983> is submitted to default queue <research-rh7>.
# This should be really handled outside with something like
# parse_external. Currently CLI runner expect this to just send it in the last position
# of the string.
return "bsub <%s | awk '{ print $2}' | sed 's/[<>]//g'" % script_file

def delete(self, job_id):
return 'bkill %s' % job_id

def get_status(self, job_ids=None):
return "bjobs -a -o \"id stat\" -noheader" # check this

def get_single_status(self, job_id):
return "bjobs -o stat -noheader " + job_id

def parse_status(self, status, job_ids):
# Get status for each job, skipping header.
rval = {}
for line in status.splitlines():
job_id, state = line.split()
if job_id in job_ids:
# map job states to Galaxy job states.
rval[job_id] = self._get_job_state(state)
return rval

def parse_single_status(self, status, job_id):
if not status:
# Job not found in LSF, most probably finished and forgotten.
# lsf outputs: Job <num> is not found -- but that is on the stderr
# Note: a very old failed job job will not be shown here either,
# which would be badly handled here. So this only works well when Galaxy
# is constantly monitoring the jobs. The logic here is that DONE jobs get forgotten
# faster than failed jobs.
log.warning("Job id '%s' not found LSF status check" % job_id)
return job_states.OK
return self._get_job_state(status)

def get_failure_reason(self, job_id):
return "bjobs -l " + job_id

def parse_failure_reason(self, reason, job_id):
# LSF will produce the following in the job output file:
# TERM_MEMLIMIT: job killed after reaching LSF memory usage limit.
# Exited with exit code 143.
for line in reason.splitlines():
if "TERM_MEMLIMIT" in line:
from galaxy.jobs import JobState
return JobState.runner_states.MEMORY_LIMIT_REACHED
return None

def _get_job_state(self, state):
# based on:
# https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/job_state_lsf.html
# https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html
try:
return {
'EXIT': job_states.ERROR,
'RUN': job_states.RUNNING,
'PEND': job_states.QUEUED,
'DONE': job_states.OK,
'PSUSP': job_states.ERROR,
'USUSP': job_states.ERROR,
'SSUSP': job_states.ERROR,
'UNKWN': job_states.ERROR,
'WAIT': job_states.QUEUED,
'ZOMBI': job_states.ERROR
}.get(state)
except KeyError:
raise KeyError("Failed to map LSF status code [%s] to job state." % state)


__all__ = ('LSF',)
5 changes: 2 additions & 3 deletions pulsar/managers/util/cli/job/slurm.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.

from ..job import BaseJobExec

from logging import getLogger

try:
Expand All @@ -13,6 +10,8 @@
from pulsar.util import enum
job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed")

from ..job import BaseJobExec

log = getLogger(__name__)

argmap = {
Expand Down
1 change: 1 addition & 0 deletions pulsar/managers/util/cli/job/slurm_torque.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re

from .torque import Torque

__all__ = ('SlurmTorque',)
Expand Down
1 change: 0 additions & 1 deletion pulsar/managers/util/cli/job/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

ERROR_MESSAGE_UNRECOGNIZED_ARG = 'Unrecognized long argument passed to Torque CLI plugin: %s'


argmap = {'destination': '-q',
'Execution_Time': '-a',
'Account_Name': '-A',
Expand Down
9 changes: 7 additions & 2 deletions pulsar/managers/util/cli/shell/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""
Abstract base class for runners which execute commands via a shell.
"""
from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod
)

import six


@six.add_metaclass(ABCMeta)
class BaseShellExec(object):
__metaclass__ = ABCMeta

@abstractmethod
def __init__(self, *args, **kwargs):
Expand Down
24 changes: 18 additions & 6 deletions pulsar/managers/util/cli/shell/local.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from logging import getLogger
from subprocess import (
PIPE,
Popen
)
from tempfile import TemporaryFile
from time import sleep
from subprocess import Popen, PIPE

import six

from ..shell import BaseShellExec
from ....util import Bunch, kill_pid
from ....util import (
Bunch,
kill_pid
)

from logging import getLogger
log = getLogger(__name__)

TIMEOUT_ERROR_MESSAGE = u'Execution timed out'
Expand All @@ -18,30 +26,34 @@ class LocalShell(BaseShellExec):
"""
>>> shell = LocalShell()
>>> def exec_python(script, **kwds): return shell.execute('python -c "%s"' % script, **kwds)
>>> def exec_python(script, **kwds): return shell.execute(['python', '-c', script], **kwds)
>>> exec_result = exec_python("from __future__ import print_function; print('Hello World')")
>>> exec_result.stderr == u''
True
>>> exec_result.stdout.strip() == u'Hello World'
True
>>> exec_result = exec_python("import time; time.sleep(90)", timeout=.3, timeout_check_interval=.1)
>>> exec_result = exec_python("import time; time.sleep(90)", timeout=1, timeout_check_interval=.1)
>>> exec_result.stdout == u''
True
>>> exec_result.stderr == 'Execution timed out'
True
>>> exec_result.returncode == TIMEOUT_RETURN_CODE
True
>>> shell.execute('echo hi').stdout == "hi\\n"
True
"""

def __init__(self, **kwds):
pass

def execute(self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds):
is_cmd_string = isinstance(cmd, six.string_types)
outf = TemporaryFile()
p = Popen(cmd, shell=True, stdin=None, stdout=outf, stderr=PIPE)
p = Popen(cmd, stdin=None, stdout=outf, stderr=PIPE, shell=is_cmd_string)
# poll until timeout

for i in range(int(timeout / timeout_check_interval)):
sleep(0.1) # For fast returning commands
r = p.poll()
if r is not None:
break
Expand Down
Loading

0 comments on commit 9ab6683

Please sign in to comment.