Skip to content

Commit

Permalink
Merge 83ccb84 into 97e0f0b
Browse files Browse the repository at this point in the history
  • Loading branch information
Duncan Macleod committed Jan 12, 2017
2 parents 97e0f0b + 83ccb84 commit b5036c2
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ install:
- python setup.py build

script:
- coverage run --source=omicron --omit="omicron/tests/*,omicron/version.py" -m py.test -v omicron/
- coverage run --source=omicron --omit="omicron/tests/*,omicron/_version.py" -m py.test -v omicron/
- pip install .

after_success:
Expand Down
4 changes: 2 additions & 2 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ if not os.path.isdir(condir):
os.makedirs(condir)

# check dagman lock file
running = condor.dag_is_running(os.path.join(condir, 'omicron.dag'), group)
running = condor.dag_is_running(os.path.join(condir, 'omicron.dag'))
if running and args.reattach:
logger.info('Detected omicron.dag already running %s, will reattach'
% rundir)
Expand Down Expand Up @@ -922,7 +922,7 @@ else:

for i in range(args.submit_rescue_dag + 1):
if args.reattach: # find ID of existing DAG
dagid = condor.find_dagman_id(group, classad="OmicronDAGMan")
dagid = condor.find_job(OmicronDAGMan=group)
logger.info("Found existing condor ID = %d" % dagid)
else: # or submit DAG
dagmanargs = set()
Expand Down
123 changes: 88 additions & 35 deletions omicron/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def get_dag_status(dagmanid, schedd=None, detailed=True):
states = ['total', 'done', 'queued', 'ready', 'unready', 'failed']
classads = ['DAG_Nodes%s' % s.title() for s in states]
try:
job = schedd.query('ClusterId == %d' % dagmanid, classads)[0]
job = find_job(ClusterId=dagmanid, schedd=schedd, attr_list=classads)
status = dict()
for s, c in zip(states, classads):
try:
Expand Down Expand Up @@ -250,7 +250,7 @@ def get_dag_status(dagmanid, schedd=None, detailed=True):
status['held'] = 0
status['running'] = 0
status['idle'] = 0
nodes = schedd.query('DAGManJobId == %d' % dagmanid)
nodes = find_jobs(DAGManJobId=dagmanid, schedd=schedd)
for node in nodes:
s = get_job_status(node)
if s == JOB_STATUS_MAP['held']:
Expand Down Expand Up @@ -436,10 +436,13 @@ def find_dagman_id(group, classad="OmicronDAGMan", user=getuser(),
----------
group : `str`
name of group to find, actually this is just the value of the classad
classad : `str`, optional
the defining classad used to identify the correct DAGMan process
user : `str`, optional
the name of the submitting user, defaults to the current user
schedd : `htcondor.Schedd`, optional
open connection to the scheduler, one will be created if needed
Expand All @@ -454,31 +457,26 @@ def find_dagman_id(group, classad="OmicronDAGMan", user=getuser(),
if not exactly 1 matching condor process is found, or that process
is not in a good state
"""
if schedd is None:
schedd = htcondor.Schedd()
jobs = schedd.query('%s == "%s" && Owner == "%s"' % (classad, group, user),
['JobStatus', 'ClusterId'])
if len(jobs) == 0:
raise RuntimeError("No %s jobs found for group %r" % (classad, group))
elif len(jobs) > 1:
raise RuntimeError("Multiple %s jobs found for group %r"
% (classad, group))
clusterid = jobs[0]['ClusterId']
if get_job_status(jobs[0]) >= 3:
constraints = {classad: group}
if user is not None:
constraints['Owner'] = user
job = find_job(schedd=schedd, attr_list=['JobStatus', 'ClusterId'],
**constraints)
clusterid = job['ClusterId']
if get_job_status(job) >= 3:
raise RuntimeError("DAGMan cluster %d found, but in state %r"
% JOB_STATUS[jobs[0]['JobStatus']])
% JOB_STATUS[job['JobStatus']])
return clusterid


def dag_is_running(dagfile, group=None, classad="OmicronDAGMan",
user=getuser()):
def dag_is_running(dagfile, schedd=None):
"""Return whether a DAG is running
This method will return `True` if any of the following match
- {dagfile}.lock file is found
- {dagfile}.condor.sub is found and a matching OmicronDAGMan process
is found in the condor queue
- a job is found in the condor queue with the UserLog classad matching
{dagfile}.dagman.log
Otherwise, the return is `False`
Expand All @@ -487,9 +485,6 @@ def dag_is_running(dagfile, group=None, classad="OmicronDAGMan",
dagfile : `str`
the path of the DAG you want to analyse
group : `str`, optional
the name of the Omicron channel group being processed by this DAG
Raises
------
RuntimeError
Expand All @@ -498,15 +493,15 @@ def dag_is_running(dagfile, group=None, classad="OmicronDAGMan",
"""
if os.path.isfile('%s.lock' % dagfile):
return True
if group is not None and os.path.isfile('%s.condor.sub' % dagfile):
try:
find_dagman_id(group, classad=classad, user=user)
except RuntimeError as e:
if str(e).startswith('No %s jobs' % classad):
return False
raise
else:
return True
userlog = '%s.dagman.log' % dagfile
try:
find_job(UserLog=userlog, schedd=schedd)
except RuntimeError as e:
if str(e).startswith('No jobs found'):
return False
raise
else:
return True
return False


Expand All @@ -527,14 +522,72 @@ def get_job_status(job, schedd=None):
the integer (`long`) status code for this job
"""
if not isinstance(job, ClassAd):
# connect to scheduler
if schedd is None:
schedd = htcondor.Schedd()
# get status
job = list(schedd.query('ClusterId == %s' % job))[0]
job = find_job(ClusterId=job, schedd=schedd, attr_list=['JobStatus'])
return job['JobStatus']


def find_jobs(schedd=None, attr_list=None, **constraints):
"""Query the condor queue for jobs matching the constraints
Parameters
----------
schedd : `htcondor.Schedd`, optional
open scheduler connection
attr_list : `list` of `str`
list of attributes to return for each job, defaults to all
all other keyword arguments should be ClassAd == value constraints to
apply to the scheduler query
Returns
-------
jobs : `list` of `classad.ClassAd`
the job listing for each job found
"""
if schedd is None:
schedd = htcondor.Schedd()
qstr = ' && '.join(['%s == %r' % (k, v) for
k, v in constraints.items()]).replace("'", '"')
if not attr_list:
attr_list = []
return list(schedd.query(qstr, attr_list))


def find_job(schedd=None, attr_list=None, **constraints):
"""Query the condor queue for a single job matching the constraints
Parameters
----------
schedd : `htcondor.Schedd`, optional
open scheduler connection
attr_list : `list` of `str`
list of attributes to return for each job, defaults to all
all other keyword arguments should be ClassAd == value constraints to
apply to the scheduler query
Returns
-------
classad : `classad.ClassAd`
the job listing for the found job
Raises
------
RuntimeError
if not exactly one job is found matching the constraints
"""
jobs = find_jobs(schedd=schedd, attr_list=attr_list, **constraints)
if len(jobs) == 0:
raise RuntimeError("No jobs found matching constraints %r"
% constraints)
elif len(jobs) > 1:
raise RuntimeError("Multiple jobs found matching constraints %r"
% constraints)
return jobs[0]


# -- custom jobs --------------------------------------------------------------

class OmicronProcessJob(pipeline.CondorDAGJob):
Expand Down
5 changes: 5 additions & 0 deletions omicron/tests/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@
import unittest2 as unittest
else:
import unittest

try:
from unittest import mock
except ImportError:
import mock
8 changes: 8 additions & 0 deletions omicron/tests/mock_classad.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python

"""This module is a mock of schedd.py, purely for testing
"""


class ClassAd(object):
pass
32 changes: 32 additions & 0 deletions omicron/tests/mock_htcondor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python

"""This module is a mock of htcondor.py, purely for testing
"""


class Schedd(object):
"""Mock of the `htcondor.Schedd` object
"""

_jobs = []

def query(self, constraints, attr_list=[], **kwargs):
x = {}
for con in constraints.split(' && '):
try:
a, b = con.split(' == ')
except ValueError:
break
else:
x[a] = eval(b)
match = []
def match(job):
print(job, x)
for key in x:
if x[key] != job[key]:
return False
return True
return iter(filter(match, self._jobs))

def history(self, *args, **kwargs):
return
113 changes: 113 additions & 0 deletions omicron/tests/test_condor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2017)
#
# This file is part of PyOmicron.
#
# PyOmicron is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyOmicron is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyOmicron. If not, see <http://www.gnu.org/licenses/>.

"""Tests for omicron.io
"""

# mock up condor modules for testing purposes
# (so we don't need to have htcondor installed)
import sys
import mock_htcondor
import mock_classad
sys.modules['htcondor'] = mock_htcondor
sys.modules['classad'] = mock_classad

import utils
from compat import (unittest, mock)

from omicron import condor


# -- mock utilities -----------------------------------------------------------

def mock_which(exe):
return '/path/to/executable'


def mock_shell_factory(output):
def shell(*args, **kwargs):
return output
return shell


def mock_schedd_factory(jobs):
Schedd = mock_htcondor.Schedd
Schedd._jobs = jobs
return Schedd


# -- tests --------------------------------------------------------------------

class CondorTests(unittest.TestCase):

def test_submit_dag(self):
shell_ = mock_shell_factory('1 job(s) submitted to cluster 12345')
with mock.patch('omicron.condor.which', mock_which):
with mock.patch('omicron.condor.shell', shell_):
dagid = condor.submit_dag('test.dag')
with utils.capture(condor.submit_dag, 'test.dag', '-append',
'+OmicronDAGMan="GW"') as output:
cmd = output.split('\n')[0]
self.assertEqual(cmd, '$ /path/to/executable -append '
'+OmicronDAGMan="GW" test.dag')
self.assertEqual(dagid, 12345)
shell_ = mock_shell_factory('Error')
with mock.patch('omicron.condor.which', mock_which):
with mock.patch('omicron.condor.shell', shell_):
self.assertRaises(AttributeError, condor.submit_dag, 'test.dag')

def test_find_jobs(self):
schedd_ = mock_schedd_factory([{'ClusterId': 1}, {'ClusterId': 2}])
with mock.patch('htcondor.Schedd', schedd_):
jobs = condor.find_jobs()
self.assertListEqual(jobs, [{'ClusterId': 1}, {'ClusterId': 2}])
jobs = condor.find_jobs(ClusterId=1)
self.assertListEqual(jobs, [{'ClusterId': 1}])
jobs = condor.find_jobs(ClusterId=3)
self.assertListEqual(jobs, [])

def test_find_job(self):
schedd_ = mock_schedd_factory([{'ClusterId': 1}, {'ClusterId': 2}])
with mock.patch('htcondor.Schedd', schedd_):
job = condor.find_job(ClusterId=1)
self.assertDictEqual(job, {'ClusterId': 1})
# check 0 jobs returned throws the right error
with self.assertRaises(RuntimeError) as e:
condor.find_job(ClusterId=3)
self.assertTrue(str(e.exception).startswith('No jobs found'))
# check multiple jobs returned throws the right error
with self.assertRaises(RuntimeError) as e:
condor.find_job()
self.assertTrue(str(e.exception).startswith('Multiple jobs found'))

def test_get_job_status(self):
schedd_ = mock_schedd_factory([{'ClusterId': 1, 'JobStatus': 4}])
with mock.patch('htcondor.Schedd', schedd_):
status = condor.get_job_status(1)
self.assertEqual(status, 4)

def test_dag_is_running(self):
self.assertFalse(condor.dag_is_running('test.dag'))
with mock.patch('os.path.isfile') as isfile:
isfile.return_value = True
self.assertTrue(condor.dag_is_running('test.dag'))
schedd_ = mock_schedd_factory(
[{'UserLog': 'test.dag.dagman.log'}])
with mock.patch('htcondor.Schedd', schedd_):
self.assertTrue(condor.dag_is_running('test.dag'))
self.assertFalse(condor.dag_is_running('test2.dag'))
Loading

0 comments on commit b5036c2

Please sign in to comment.