diff --git a/.travis.yml b/.travis.yml index 88d2670..e8da5f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,7 +32,7 @@ install: - python setup.py build script: - - coverage run --source=omicron --omit="omicron/tests/*,omicron/version.py" -m py.test -v omicron/ + - coverage run --source=omicron --omit="omicron/tests/*,omicron/_version.py" -m py.test -v omicron/ - pip install . after_success: diff --git a/bin/omicron-process b/bin/omicron-process index 4c123eb..6f861c9 100644 --- a/bin/omicron-process +++ b/bin/omicron-process @@ -416,7 +416,7 @@ if not os.path.isdir(condir): os.makedirs(condir) # check dagman lock file -running = condor.dag_is_running(os.path.join(condir, 'omicron.dag'), group) +running = condor.dag_is_running(os.path.join(condir, 'omicron.dag')) if running and args.reattach: logger.info('Detected omicron.dag already running %s, will reattach' % rundir) @@ -922,7 +922,7 @@ else: for i in range(args.submit_rescue_dag + 1): if args.reattach: # find ID of existing DAG - dagid = condor.find_dagman_id(group, classad="OmicronDAGMan") + dagid = condor.find_job(OmicronDAGMan=group) logger.info("Found existing condor ID = %d" % dagid) else: # or submit DAG dagmanargs = set() diff --git a/omicron/condor.py b/omicron/condor.py index 4fbce26..c343f4d 100644 --- a/omicron/condor.py +++ b/omicron/condor.py @@ -205,7 +205,7 @@ def get_dag_status(dagmanid, schedd=None, detailed=True): states = ['total', 'done', 'queued', 'ready', 'unready', 'failed'] classads = ['DAG_Nodes%s' % s.title() for s in states] try: - job = schedd.query('ClusterId == %d' % dagmanid, classads)[0] + job = find_job(ClusterId=dagmanid, schedd=schedd, attr_list=classads) status = dict() for s, c in zip(states, classads): try: @@ -250,7 +250,7 @@ def get_dag_status(dagmanid, schedd=None, detailed=True): status['held'] = 0 status['running'] = 0 status['idle'] = 0 - nodes = schedd.query('DAGManJobId == %d' % dagmanid) + nodes = find_jobs(DAGManJobId=dagmanid, schedd=schedd) for node in nodes: s = get_job_status(node) if s == JOB_STATUS_MAP['held']: @@ -436,10 +436,13 @@ def find_dagman_id(group, classad="OmicronDAGMan", user=getuser(), ---------- group : `str` name of group to find, actually this is just the value of the classad + classad : `str`, optional the defining classad used to identify the correct DAGMan process + user : `str`, optional the name of the submitting user, defaults to the current user + schedd : `htcondor.Schedd`, optional open connection to the scheduler, one will be created if needed @@ -454,31 +457,26 @@ def find_dagman_id(group, classad="OmicronDAGMan", user=getuser(), if not exactly 1 matching condor process is found, or that process is not in a good state """ - if schedd is None: - schedd = htcondor.Schedd() - jobs = schedd.query('%s == "%s" && Owner == "%s"' % (classad, group, user), - ['JobStatus', 'ClusterId']) - if len(jobs) == 0: - raise RuntimeError("No %s jobs found for group %r" % (classad, group)) - elif len(jobs) > 1: - raise RuntimeError("Multiple %s jobs found for group %r" - % (classad, group)) - clusterid = jobs[0]['ClusterId'] - if get_job_status(jobs[0]) >= 3: + constraints = {classad: group} + if user is not None: + constraints['Owner'] = user + job = find_job(schedd=schedd, attr_list=['JobStatus', 'ClusterId'], + **constraints) + clusterid = job['ClusterId'] + if get_job_status(job) >= 3: raise RuntimeError("DAGMan cluster %d found, but in state %r" - % JOB_STATUS[jobs[0]['JobStatus']]) + % JOB_STATUS[job['JobStatus']]) return clusterid -def dag_is_running(dagfile, group=None, classad="OmicronDAGMan", - user=getuser()): +def dag_is_running(dagfile, schedd=None): """Return whether a DAG is running This method will return `True` if any of the following match - {dagfile}.lock file is found - - {dagfile}.condor.sub is found and a matching OmicronDAGMan process - is found in the condor queue + - a job is found in the condor queue with the UserLog classad matching + {dagfile}.dagman.log Otherwise, the return is `False` @@ -487,9 +485,6 @@ def dag_is_running(dagfile, group=None, classad="OmicronDAGMan", dagfile : `str` the path of the DAG you want to analyse - group : `str`, optional - the name of the Omicron channel group being processed by this DAG - Raises ------ RuntimeError @@ -498,15 +493,15 @@ def dag_is_running(dagfile, group=None, classad="OmicronDAGMan", """ if os.path.isfile('%s.lock' % dagfile): return True - if group is not None and os.path.isfile('%s.condor.sub' % dagfile): - try: - find_dagman_id(group, classad=classad, user=user) - except RuntimeError as e: - if str(e).startswith('No %s jobs' % classad): - return False - raise - else: - return True + userlog = '%s.dagman.log' % dagfile + try: + find_job(UserLog=userlog, schedd=schedd) + except RuntimeError as e: + if str(e).startswith('No jobs found'): + return False + raise + else: + return True return False @@ -527,14 +522,72 @@ def get_job_status(job, schedd=None): the integer (`long`) status code for this job """ if not isinstance(job, ClassAd): - # connect to scheduler - if schedd is None: - schedd = htcondor.Schedd() - # get status - job = list(schedd.query('ClusterId == %s' % job))[0] + job = find_job(ClusterId=job, schedd=schedd, attr_list=['JobStatus']) return job['JobStatus'] +def find_jobs(schedd=None, attr_list=None, **constraints): + """Query the condor queue for jobs matching the constraints + + Parameters + ---------- + schedd : `htcondor.Schedd`, optional + open scheduler connection + + attr_list : `list` of `str` + list of attributes to return for each job, defaults to all + + all other keyword arguments should be ClassAd == value constraints to + apply to the scheduler query + + Returns + ------- + jobs : `list` of `classad.ClassAd` + the job listing for each job found + """ + if schedd is None: + schedd = htcondor.Schedd() + qstr = ' && '.join(['%s == %r' % (k, v) for + k, v in constraints.items()]).replace("'", '"') + if not attr_list: + attr_list = [] + return list(schedd.query(qstr, attr_list)) + + +def find_job(schedd=None, attr_list=None, **constraints): + """Query the condor queue for a single job matching the constraints + + Parameters + ---------- + schedd : `htcondor.Schedd`, optional + open scheduler connection + + attr_list : `list` of `str` + list of attributes to return for each job, defaults to all + + all other keyword arguments should be ClassAd == value constraints to + apply to the scheduler query + + Returns + ------- + classad : `classad.ClassAd` + the job listing for the found job + + Raises + ------ + RuntimeError + if not exactly one job is found matching the constraints + """ + jobs = find_jobs(schedd=schedd, attr_list=attr_list, **constraints) + if len(jobs) == 0: + raise RuntimeError("No jobs found matching constraints %r" + % constraints) + elif len(jobs) > 1: + raise RuntimeError("Multiple jobs found matching constraints %r" + % constraints) + return jobs[0] + + # -- custom jobs -------------------------------------------------------------- class OmicronProcessJob(pipeline.CondorDAGJob): diff --git a/omicron/tests/compat.py b/omicron/tests/compat.py index 01df2ff..0f71524 100644 --- a/omicron/tests/compat.py +++ b/omicron/tests/compat.py @@ -25,3 +25,8 @@ import unittest2 as unittest else: import unittest + +try: + from unittest import mock +except ImportError: + import mock diff --git a/omicron/tests/mock_classad.py b/omicron/tests/mock_classad.py new file mode 100644 index 0000000..ffc5b90 --- /dev/null +++ b/omicron/tests/mock_classad.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +"""This module is a mock of schedd.py, purely for testing +""" + + +class ClassAd(object): + pass diff --git a/omicron/tests/mock_htcondor.py b/omicron/tests/mock_htcondor.py new file mode 100644 index 0000000..bec4a4b --- /dev/null +++ b/omicron/tests/mock_htcondor.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python + +"""This module is a mock of htcondor.py, purely for testing +""" + + +class Schedd(object): + """Mock of the `htcondor.Schedd` object + """ + + _jobs = [] + + def query(self, constraints, attr_list=[], **kwargs): + x = {} + for con in constraints.split(' && '): + try: + a, b = con.split(' == ') + except ValueError: + break + else: + x[a] = eval(b) + match = [] + def match(job): + print(job, x) + for key in x: + if x[key] != job[key]: + return False + return True + return iter(filter(match, self._jobs)) + + def history(self, *args, **kwargs): + return diff --git a/omicron/tests/test_condor.py b/omicron/tests/test_condor.py new file mode 100644 index 0000000..cd70053 --- /dev/null +++ b/omicron/tests/test_condor.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# Copyright (C) Duncan Macleod (2017) +# +# This file is part of PyOmicron. +# +# PyOmicron is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PyOmicron is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with PyOmicron. If not, see . + +"""Tests for omicron.io +""" + +# mock up condor modules for testing purposes +# (so we don't need to have htcondor installed) +import sys +import mock_htcondor +import mock_classad +sys.modules['htcondor'] = mock_htcondor +sys.modules['classad'] = mock_classad + +import utils +from compat import (unittest, mock) + +from omicron import condor + + +# -- mock utilities ----------------------------------------------------------- + +def mock_which(exe): + return '/path/to/executable' + + +def mock_shell_factory(output): + def shell(*args, **kwargs): + return output + return shell + + +def mock_schedd_factory(jobs): + Schedd = mock_htcondor.Schedd + Schedd._jobs = jobs + return Schedd + + +# -- tests -------------------------------------------------------------------- + +class CondorTests(unittest.TestCase): + + def test_submit_dag(self): + shell_ = mock_shell_factory('1 job(s) submitted to cluster 12345') + with mock.patch('omicron.condor.which', mock_which): + with mock.patch('omicron.condor.shell', shell_): + dagid = condor.submit_dag('test.dag') + with utils.capture(condor.submit_dag, 'test.dag', '-append', + '+OmicronDAGMan="GW"') as output: + cmd = output.split('\n')[0] + self.assertEqual(cmd, '$ /path/to/executable -append ' + '+OmicronDAGMan="GW" test.dag') + self.assertEqual(dagid, 12345) + shell_ = mock_shell_factory('Error') + with mock.patch('omicron.condor.which', mock_which): + with mock.patch('omicron.condor.shell', shell_): + self.assertRaises(AttributeError, condor.submit_dag, 'test.dag') + + def test_find_jobs(self): + schedd_ = mock_schedd_factory([{'ClusterId': 1}, {'ClusterId': 2}]) + with mock.patch('htcondor.Schedd', schedd_): + jobs = condor.find_jobs() + self.assertListEqual(jobs, [{'ClusterId': 1}, {'ClusterId': 2}]) + jobs = condor.find_jobs(ClusterId=1) + self.assertListEqual(jobs, [{'ClusterId': 1}]) + jobs = condor.find_jobs(ClusterId=3) + self.assertListEqual(jobs, []) + + def test_find_job(self): + schedd_ = mock_schedd_factory([{'ClusterId': 1}, {'ClusterId': 2}]) + with mock.patch('htcondor.Schedd', schedd_): + job = condor.find_job(ClusterId=1) + self.assertDictEqual(job, {'ClusterId': 1}) + # check 0 jobs returned throws the right error + with self.assertRaises(RuntimeError) as e: + condor.find_job(ClusterId=3) + self.assertTrue(str(e.exception).startswith('No jobs found')) + # check multiple jobs returned throws the right error + with self.assertRaises(RuntimeError) as e: + condor.find_job() + self.assertTrue(str(e.exception).startswith('Multiple jobs found')) + + def test_get_job_status(self): + schedd_ = mock_schedd_factory([{'ClusterId': 1, 'JobStatus': 4}]) + with mock.patch('htcondor.Schedd', schedd_): + status = condor.get_job_status(1) + self.assertEqual(status, 4) + + def test_dag_is_running(self): + self.assertFalse(condor.dag_is_running('test.dag')) + with mock.patch('os.path.isfile') as isfile: + isfile.return_value = True + self.assertTrue(condor.dag_is_running('test.dag')) + schedd_ = mock_schedd_factory( + [{'UserLog': 'test.dag.dagman.log'}]) + with mock.patch('htcondor.Schedd', schedd_): + self.assertTrue(condor.dag_is_running('test.dag')) + self.assertFalse(condor.dag_is_running('test2.dag')) diff --git a/omicron/tests/utils.py b/omicron/tests/utils.py new file mode 100644 index 0000000..fad9235 --- /dev/null +++ b/omicron/tests/utils.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# Copyright (C) Duncan Macleod (2017) +# +# This file is part of PyOmicron. +# +# PyOmicron is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PyOmicron is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with PyOmicron. If not, see . + +"""Python-version compatibility utils for Omicron tests +""" + +import sys +from six.moves import StringIO +from contextlib import contextmanager + + +@contextmanager +def capture(command, *args, **kwargs): + out, sys.stdout = sys.stdout, StringIO() + try: + command(*args, **kwargs) + sys.stdout.seek(0) + yield sys.stdout.read() + finally: + sys.stdout = out