Skip to content

Commit

Permalink
Merge pull request #51 from tfesenbecker/master
Browse files Browse the repository at this point in the history
Extension of job attributes and htcondor job readout
  • Loading branch information
eileen-kuehn committed Nov 2, 2019
2 parents 05ee74b + 9b56e5b commit 9e7173b
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 13 deletions.
4 changes: 4 additions & 0 deletions lapis/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Job(object):
"walltime",
"requested_walltime",
"queue_date",
"requested_inputfiles",
"used_inputfiles",
"in_queue_since",
"in_queue_until",
"_name",
Expand Down Expand Up @@ -59,6 +61,8 @@ def __init__(
self.resources[key] = self.used_resources[key]
self.walltime = used_resources.pop("walltime")
self.requested_walltime = resources.pop("walltime", None)
self.requested_inputfiles = resources.pop("inputfiles", None)
self.used_inputfiles = used_resources.pop("inputfiles", None)
self.queue_date = queue_date
assert in_queue_since >= 0, "Queue time cannot be negative"
self.in_queue_since = in_queue_since
Expand Down
50 changes: 38 additions & 12 deletions lapis/job_io/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import csv
import json
import logging

from lapis.job import Job
from copy import deepcopy


def htcondor_job_reader(
Expand Down Expand Up @@ -31,36 +33,60 @@ def htcondor_job_reader(
"DiskUsage_RAW": 1.024 / 1024 / 1024,
},
):
htcondor_reader = csv.DictReader(iterable, delimiter=" ", quotechar="'")

for row in htcondor_reader:
if float(row[used_resource_name_mapping["walltime"]]) <= 0:
input_file_type = iterable.name.split(".")[-1].lower()
if input_file_type == "json":
htcondor_reader = json.load(iterable)
elif input_file_type == "csv":
htcondor_reader = csv.DictReader(iterable, delimiter=" ", quotechar="'")
else:
logging.getLogger("implementation").error(
"Invalid input file %s. Job input file can not be read." % iterable.name
)
for entry in htcondor_reader:
if float(entry[used_resource_name_mapping["walltime"]]) <= 0:
logging.getLogger("implementation").warning(
"removed job from htcondor import (%s)", row
"removed job from htcondor import (%s)", entry
)
continue
resources = {}
for key, original_key in resource_name_mapping.items():
try:
resources[key] = float(row[original_key]) * unit_conversion_mapping.get(
original_key, 1
)
resources[key] = float(
entry[original_key]
) * unit_conversion_mapping.get(original_key, 1)
except ValueError:
pass

used_resources = {
"cores": (
(float(row["RemoteSysCpu"]) + float(row["RemoteUserCpu"]))
/ float(row[used_resource_name_mapping["walltime"]])
(float(entry["RemoteSysCpu"]) + float(entry["RemoteUserCpu"]))
/ float(entry[used_resource_name_mapping["walltime"]])
)
* unit_conversion_mapping.get(used_resource_name_mapping["cores"], 1)
}
for key in ["memory", "walltime", "disk"]:
original_key = used_resource_name_mapping[key]
used_resources[key] = float(
row[original_key]
entry[original_key]
) * unit_conversion_mapping.get(original_key, 1)

try:
resources["inputfiles"] = deepcopy(entry["Inputfiles"])
used_resources["inputfiles"] = deepcopy(entry["Inputfiles"])
for filename, filespecs in entry["Inputfiles"].items():
if "usedsize" in filespecs:
del resources["inputfiles"][filename]["usedsize"]
if "filesize" in filespecs:
if "usedsize" not in filespecs:
used_resources["inputfiles"][filename]["usedsize"] = filespecs[
"filesize"
]
del used_resources["inputfiles"][filename]["filesize"]

except KeyError:
pass
yield Job(
resources=resources,
used_resources=used_resources,
queue_date=float(row[used_resource_name_mapping["queuetime"]]),
queue_date=float(entry[used_resource_name_mapping["queuetime"]]),
)
113 changes: 113 additions & 0 deletions lapis_tests/data/job_list_minimal.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
[
{
"QDate": 1567169672,
"RequestCpus": 1,
"RequestWalltime": 60,
"RequestMemory": 2000,
"RequestDisk": 6000000,
"RemoteWallClockTime": 100.0,
"'Number of Allocated Processors'": 1,
"MemoryUsage": 2867,
"DiskUsage_RAW": 41898,
"RemoteSysCpu": 10.0,
"RemoteUserCpu": 40.0,
"CPUEfficiency": 0.7,
"Inputfiles": {
"a.root": {
"filesize": 25000,
"usedsize": 20000
},
"b.root": {
"filesize": 25000,
"usedsize": 20000
},
"c.root": {
"filesize": 25000,
"usedsize": 20000
}
}
},
{
"QDate": 1567155456,
"RequestCpus": 1,
"RequestWalltime": 60,
"RequestMemory": 2000,
"RequestDisk": 6000000,
"RemoteWallClockTime": 77.0,
"'Number of Allocated Processors'": 1,
"MemoryUsage": 1207,
"DiskUsage_RAW": 45562,
"RemoteSysCpu": 7.0,
"RemoteUserCpu": 50.0,
"CPUEfficiency": 0.7,
"Inputfiles": {
"a.root": {
"filesize": 25000,
"usedsize": 20000
},
"b.root": {
"filesize": 25000,
"usedsize": 20000
},
"c.root": {
"filesize": 25000,
"usedsize": 20000
}
}
},
{
"QDate": 1567155456,
"RequestCpus": 1,
"RequestWalltime": 60,
"RequestMemory": 2000,
"RequestDisk": 6000000,
"RemoteWallClockTime": 63.0,
"'Number of Allocated Processors'": 1,
"MemoryUsage": 1207,
"DiskUsage_RAW": 45562,
"RemoteSysCpu": 3.0,
"RemoteUserCpu": 40.0,
"Inputfiles": {
"a.root": {
"filesize": 25000,
"usedsize": 20000
},
"b.root": {
"filesize": 25000,
"usedsize": 20000
},
"c.root": {
"filesize": 25000,
"usedsize": 20000
}
}
},
{
"QDate": 1567155456,
"RequestCpus": 1,
"RequestWalltime": 60,
"RequestMemory": 2000,
"RequestDisk": 6000000,
"RemoteWallClockTime": -100.0,
"'Number of Allocated Processors'": 1,
"MemoryUsage": 1207,
"DiskUsage_RAW": 45562,
"RemoteSysCpu": 7.0,
"RemoteUserCpu": 40.3,
"CPUEfficiency": 4.7,
"Inputfiles": {
"a.root": {
"filesize": 25000,
"usedsize": 20000
},
"b.root": {
"filesize": 25000,
"usedsize": 20000
},
"c.root": {
"filesize": 25000,
"usedsize": 20000
}
}
}
]
28 changes: 27 additions & 1 deletion lapis_tests/job_io/test_htcondor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os

import json
from lapis.job_io.htcondor import htcondor_job_reader


Expand All @@ -19,3 +19,29 @@ def test_simple_read(self):
# ensure that one job was removed by importer (wrong walltime given)
lines = sum(1 for _ in input_file)
assert jobs == (lines - 2)

def test_read_with_inputfiles(self):
with open(
os.path.join(
os.path.dirname(__file__), "..", "data", "job_list_minimal.json"
)
) as input_file:
jobs = 0
for job in htcondor_job_reader(input_file):
assert job is not None
jobs += 1
if "inputfiles" in job.resources.keys():
assert "filesize" in job.resources["inputfiles"].keys()
if "inputfiles" in job.used_resources.keys():
assert "usedsize" in job.used_resources["inputfiles"].keys()

assert jobs > 0

with open(
os.path.join(
os.path.dirname(__file__), "..", "data", "job_list_minimal.json"
)
) as input_file:
readout = json.load(input_file)
lines = sum(1 for _ in readout)
assert jobs == (lines - 1)

0 comments on commit 9e7173b

Please sign in to comment.