Skip to content

Commit

Permalink
Merge pull request #95 from marcmengel/more_dag_fixups
Browse files Browse the repository at this point in the history
Fixes for issue #94...
  • Loading branch information
marcmengel committed Aug 30, 2022
2 parents 5e24c66 + cee616b commit 6d152fb
Show file tree
Hide file tree
Showing 18 changed files with 85 additions and 14 deletions.
9 changes: 6 additions & 3 deletions bin/jobsub_submit
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def main():
d1 = os.path.join(PREFIX, "templates", "simple")
d2 = os.path.join(PREFIX, "templates", "dag")
parse_dagnabbit(d1, varg, submitdir, schedd_name)
varg["N"] = 1
render_files(d2, varg, submitdir, dlist=[d2, submitdir])
if not varg.get("no_submit", False):
os.chdir(varg["submitdir"])
Expand All @@ -210,8 +209,13 @@ def main():
do_dataset_defaults(varg)
d1 = os.path.join(PREFIX, "templates", "dataset_dag")
d2 = f"{PREFIX}/templates/simple"
# so we render the simple area (d2) with -N 1 because
# we are making a loop of 1..N in th dataset_dag area
# otherwise we get N submissions of N jobs -> N^2 jobs...
saveN = varg["N"]
varg["N"] = "1"
render_files(d2, varg, submitdir, dlist=[d1, d2])
varg["N"] = 1
varg["N"] = saveN
render_files(d1, varg, submitdir, dlist=[d1, d2, submitdir])
if not varg.get("no_submit", False):
os.chdir(varg["submitdir"])
Expand All @@ -220,7 +224,6 @@ def main():
d1 = os.path.join(PREFIX, "templates", "maxconcurrent_dag")
d2 = os.path.join(PREFIX, "templates", "simple")
render_files(d2, varg, submitdir, dlist=[d1, d2])
varg["N"] = 1
render_files(d1, varg, submitdir, dlist=[d1, d2, varg["dest"]])
if not varg.get("no_submit", False):
os.chdir(varg["submitdir"])
Expand Down
2 changes: 2 additions & 0 deletions lib/dagnabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def parse_dagnabbit(
) as csf:
csf.write(jinja_env.get_template("simple.sh").render(**thesevalues))
of.write(f"JOB {name} {name}.cmd\n")
of.write(f'VARS {name} JOBSUBJOBSECTION="{count}" nodename="$(JOB)"')

if in_serial:
if last_serial:
of.write(f"PARENT {last_serial} CHILD {name}\n")
Expand Down
1 change: 1 addition & 0 deletions lib/get_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def get_parser() -> argparse.ArgumentParser:
parser.add_argument(
"--dataset-definition",
"--dataset_definition",
"--dataset",
help="SAM dataset definition used in a Directed Acyclic Graph (DAG)",
)
parser.add_argument("--debug", type=int, default=0, help="Turn on debugging")
Expand Down
5 changes: 1 addition & 4 deletions templates/dataset_dag/dagbegin.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ transfer_error = True
transfer_executable= True
when_to_transfer_output = ON_EXIT_OR_EVICT
transfer_output_files = .empty_file
{%if cpu is defined and cpu %}request_cpus = {{cpu}}{%endif%}
{%if memory is defined and memory %}request_memory = {{memory}}{%endif%}
{%if disk is defined and disk %}request_disk = {{disk}}KB{%endif%}
{%if OS is defined and OS %}+DesiredOS={{OS}}{%endif%}
request_memory = 100mb
+JobsubClientDN="{{clientdn}}"
+JobsubClientIpAddress="{{ipaddr}}"
+Owner="{{user}}"
Expand Down
5 changes: 1 addition & 4 deletions templates/dataset_dag/dagend.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ transfer_error = True
transfer_executable= True
transfer_output_files=.empty_file
when_to_transfer_output = ON_EXIT_OR_EVICT
{%if cpu is defined and cpu %}request_cpus = {{cpu}}{%endif%}
{%if memory is defined and memory %}request_memory = {{memory}}{%endif%}
{%if disk is defined and disk %}request_disk = {{disk}}KB{%endif%}
{%if OS is defined and OS %}+DesiredOS={{OS}}{%endif%}
request_memory = 100mb
+JobsubClientDN="{{clientdn}}"
+JobsubClientIpAddress="{{ipaddr}}"
+Owner="{{user}}"
Expand Down
2 changes: 2 additions & 0 deletions templates/dataset_dag/dataset.dag
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ JOB SAM_START dagbegin.cmd

{%for i in range(N) %}
JOB WORKER_{{i}} simple.cmd
VARS WORKER_{{i}} JOBSUBJOBSECTION="{{i}}" nodename="$(JOB)"

PARENT SAM_START CHILD WORKER_{{i}}
PARENT WORKER_{{i}} CHILD SAM_END
{%endfor%}
Expand Down
7 changes: 6 additions & 1 deletion templates/simple/simple.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ arguments = {{exe_arguments|join(" ")}}
output = {{filebase}}.out
error = {{filebase}}.err
log = {{filebase}}.log
environment = CLUSTER=$(Cluster);PROCESS=$(Process);CONDOR_TMP={{outdir}};BEARER_TOKEN_FILE=.condor_creds/{{group}}.use;CONDOR_EXEC=/tmp;DAGMANJOBID=$(DAGManJobId);GRID_USER={{user}};JOBSUBJOBID=$(CLUSTER).$(PROCESS)@{{schedd}};EXPERIMENT={{group}};{{environment|join(';')}}

{%if not (( dag is defined and dag ) or (dataset_dag is defined and dataset_dag)) %}
JOBSUBJOBSECTION=$(Process)
{%endif%}

environment = CLUSTER=$(Cluster);PROCESS=$(Process);JOBSUBJOBSECTION=$(JOBSUBJOBSECTION);CONDOR_TMP={{outdir}};BEARER_TOKEN_FILE=.condor_creds/{{group}}.use;CONDOR_EXEC=/tmp;DAGMANJOBID=$(DAGManJobId);GRID_USER={{user}};JOBSUBJOBID=$(CLUSTER).$(PROCESS)@{{schedd}};EXPERIMENT={{group}};{{environment|join(';')}}
rank = Mips / 2 + Memory
job_lease_duration = 3600
notification = Never
Expand Down
4 changes: 4 additions & 0 deletions tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[pytest]
markers=
unit
integration
4 changes: 4 additions & 0 deletions tests/test_condor_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,29 @@ class TestCondorUnit:

# lib/condor.py routines...

@pytest.mark.unit
def test_get_schedd_1(self):
"""make sure we get our test schedd back with test_vargs"""
schedd = condor.get_schedd(TestUnit.test_vargs)
print("got schedd: {0}".format(schedd))
print("schedd name: {0}".format(schedd["Name"]))
assert schedd["Name"] == TestUnit.test_schedd

@pytest.mark.unit
def test_load_submit_file_1(self, get_submit_file):
"""make sure load_submit_file result has bits of the submit file"""
res = condor.load_submit_file(get_submit_file)
assert str(res[0]).find("universe = vanilla") >= 0
assert str(res[0]).find("executable = /bin/true") >= 0

@pytest.mark.unit
def test_submit_1(self, get_submit_file, needs_credentials):
"""actually submit a job with condor_submit"""
res = condor.submit(get_submit_file, TestUnit.test_vargs, TestUnit.test_schedd)
print("got: ", res)
assert res

@pytest.mark.unit
def test_submit_dag_1(self, get_dag_file, needs_credentials):
"""actually submit a dag with condor_submit_dag"""
# XXX fix me
Expand Down
1 change: 1 addition & 0 deletions tests/test_creds_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TestCredUnit:

# lib/creds.py routines...

@pytest.mark.unit
def test_get_creds_1(self):
"""get credentials, make sure the credentials files returned
exist"""
Expand Down
2 changes: 2 additions & 0 deletions tests/test_dagnabbit_unit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import pytest

#
# we assume everwhere our current directory is in the package
Expand All @@ -26,6 +27,7 @@ class TestDagnabbitUnit:
# lib/dagnabbit.py tests
#

@pytest.mark.unit
def test_parse_dagnabbit_dagTest(self):
"""test dagnabbit parser on old jobsub dagTest example"""
self.do_one_dagnabbit(
Expand Down
12 changes: 12 additions & 0 deletions tests/test_fake_ifdh_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,35 @@
import fake_ifdh


@pytest.mark.unit
def test_getTmp():
if os.environ.get("TMPDIR", None):
del os.environ["TMPDIR"]
res = fake_ifdh.getTmp()
assert res == "/tmp"


@pytest.mark.unit
def test_getTmp_override():
os.environ["TMPDIR"] = "/var/tmp"
res = fake_ifdh.getTmp()
assert res == "/var/tmp"


@pytest.mark.unit
def test_getExp_GROUP():
os.environ["GROUP"] = "samdev"
res = fake_ifdh.getExp()
assert res == "samdev"


@pytest.mark.unit
def test_getRole():
res = fake_ifdh.getRole()
assert res == fake_ifdh.DEFAULT_ROLE


@pytest.mark.unit
def test_getRole_override():
override_role = "Hamburgler"
res = fake_ifdh.getRole(override_role)
Expand All @@ -59,34 +64,40 @@ def fermilab_token(clear_token):
return fake_ifdh.getToken("Analysis")


@pytest.mark.unit
def test_checkToken_fail():
tokenfile = "/dev/null"
with pytest.raises(ValueError):
res = fake_ifdh.checkToken(tokenfile)


@pytest.mark.unit
def test_checkToken_success(fermilab_token):
res = fake_ifdh.checkToken(fermilab_token)
assert res


@pytest.mark.unit
def test_getToken_good(clear_token, fermilab_token):
assert os.path.exists(fermilab_token)


@pytest.mark.unit
def test_getToken_fail(clear_token):
with pytest.raises(PermissionError):
os.environ["GROUP"] = "bozo"
fake_ifdh.getToken("Analysis")


@pytest.mark.unit
def test_getProxy_good(clear_token):

os.environ["GROUP"] = "fermilab"
proxy = fake_ifdh.getProxy("Analysis")
assert os.path.exists(proxy)


@pytest.mark.unit
def test_getProxy_fail(clear_token):
try:
os.environ["GROUP"] = "bozo"
Expand All @@ -97,6 +108,7 @@ def test_getProxy_fail(clear_token):
assert False


@pytest.mark.unit
def test_cp():
dest = __file__ + ".copy"
fake_ifdh.cp(__file__, dest)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_get_parser_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class TestGetParserUnit:

# lib/get_parser.py routines...

@pytest.mark.unit
def test_get_parser_small(self):
"""
Try a few common arguments on a get_parser() generated parser
Expand All @@ -185,6 +186,7 @@ def test_get_parser_small(self):
assert "SAM_EXPERIMENT" in res.environment
assert res.group == TestUnit.test_group

@pytest.mark.unit
def test_check_all_test_args(self, find_all_arguments, all_test_args):
# make sure we have a test argument for all the arguments in
# the source, and that we find all the arguments in the source
Expand All @@ -202,6 +204,7 @@ def test_check_all_test_args(self, find_all_arguments, all_test_args):
arg = arg.lstrip("-")
assert arg in allargs

@pytest.mark.unit
def test_get_parser_all(self, find_all_arguments, all_test_args):
"""
Validate an all arguments list
Expand Down
5 changes: 5 additions & 0 deletions tests/test_jobsub_submit_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ class TestJobsubSubmitUnit:

# jobsub_submit functions

@pytest.mark.unit
def test_get_basefiles_1(self):
"""test the get_basefiles routine on our source directory,
we should be in it"""
dlist = [os.path.dirname(__file__)]
fl = jobsub_submit.get_basefiles(dlist)
assert os.path.basename(__file__) in fl

@pytest.mark.unit
def test_render_files_1(self):
"""test render files on the dataset_dag directory"""
srcdir = os.path.dirname(os.path.dirname(__file__)) + "/templates/dataset_dag"
Expand All @@ -47,6 +49,7 @@ def test_render_files_1(self):
jobsub_submit.render_files(srcdir, args, dest)
assert os.path.exists("%s/dagbegin.cmd" % dest)

@pytest.mark.unit
def test_render_files_undefined_vars(self, tmp_path):
"""Test rendering files when a template variable is undefined.
Should raise jinja2.exceptions.UndefinedError
Expand All @@ -58,11 +61,13 @@ def test_render_files_undefined_vars(self, tmp_path):
with pytest.raises(exceptions.UndefinedError, match="is undefined"):
jobsub_submit.render_files(srcdir, args, dest)

@pytest.mark.unit
def test_cleanup_1(self):
# cleanup doesn't actually do anything right now...
jobsub_submit.cleanup("")
assert True

@pytest.mark.unit
def test_do_dataset_defaults_1(self):
"""make sure do_dataset_defaults sets arguments its supposed to"""
varg = TestUnit.test_vargs.copy()
Expand Down
3 changes: 3 additions & 0 deletions tests/test_packages_unit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import pytest

#
# we assume everwhere our current directory is in the package
Expand All @@ -22,6 +23,7 @@ class TestPackagesUnit:

# lib/packages.py routines

@pytest.mark.unit
def test_pkg_find_1(self):
"""make sure we can find the poms_client ups package"""
sp1 = sys.path.copy()
Expand All @@ -32,6 +34,7 @@ def test_pkg_find_1(self):
assert os.path.exists(os.environ["POMS_CLIENT_DIR"])
__import__("poms_client")

@pytest.mark.unit
def test_pkg_orig_env_1(self):
"""make sure orig_env puts the environment back"""
packages.pkg_find("poms_client", "-g poms41")
Expand Down
Loading

0 comments on commit 6d152fb

Please sign in to comment.