Skip to content

Commit

Permalink
Normalise code with Black (snakemake default).
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Wortmann committed Feb 29, 2020
1 parent 5c9271e commit e391425
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
__pycache__
30 changes: 18 additions & 12 deletions {{cookiecutter.profile_name}}/slurm-status.py
Expand Up @@ -5,6 +5,7 @@
import sys
import time
import logging

logger = logging.getLogger("__name__")

STATUS_ATTEMPTS = 20
Expand All @@ -14,7 +15,10 @@
for i in range(STATUS_ATTEMPTS):
try:
sacct_res = sp.check_output(shlex.split("sacct -P -b -j {} -n".format(jobid)))
res = {x.split("|")[0]: x.split("|")[1] for x in sacct_res.decode().strip().split("\n")}
res = {
x.split("|")[0]: x.split("|")[1]
for x in sacct_res.decode().strip().split("\n")
}
break
except sp.CalledProcessError as e:
logger.error("sacct process error")
Expand All @@ -23,7 +27,9 @@
pass
# Try getting job with scontrol instead in case sacct is misconfigured
try:
sctrl_res = sp.check_output(shlex.split("scontrol -o show job {}".format(jobid)))
sctrl_res = sp.check_output(
shlex.split("scontrol -o show job {}".format(jobid))
)
m = re.search("JobState=(\w+)", sctrl_res.decode())
res = {jobid: m.group(1)}
break
Expand All @@ -38,26 +44,26 @@

status = res[jobid]

if (status == "BOOT_FAIL"):
if status == "BOOT_FAIL":
print("failed")
elif (status == "OUT_OF_MEMORY"):
elif status == "OUT_OF_MEMORY":
print("failed")
elif (status.startswith("CANCELLED")):
elif status.startswith("CANCELLED"):
print("failed")
elif (status == "COMPLETED"):
elif status == "COMPLETED":
print("success")
elif (status == "DEADLINE"):
elif status == "DEADLINE":
print("failed")
elif (status == "FAILED"):
elif status == "FAILED":
print("failed")
elif (status == "NODE_FAIL"):
elif status == "NODE_FAIL":
print("failed")
elif (status == "PREEMPTED"):
elif status == "PREEMPTED":
print("failed")
elif (status == "TIMEOUT"):
elif status == "TIMEOUT":
print("failed")
# Unclear whether SUSPENDED should be treated as running or failed
elif (status == "SUSPENDED"):
elif status == "SUSPENDED":
print("failed")
else:
print("running")
4 changes: 2 additions & 2 deletions {{cookiecutter.profile_name}}/slurm-submit.py
Expand Up @@ -29,14 +29,14 @@
sbatch_options.update(slurm_utils.adjust_to_partition(**sbatch_options))

# 4) default_cluster_config for particular rule
sbatch_options.update(default_cluster_config.get(job_properties['rule'], {}))
sbatch_options.update(default_cluster_config.get(job_properties["rule"], {}))

# 5) cluster_config options
sbatch_options.update(job_properties.get("cluster", {}))


# ensure sbatch output dirs exist
for o in ('output', "error"):
for o in ("output", "error"):
slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None


Expand Down
49 changes: 28 additions & 21 deletions {{cookiecutter.profile_name}}/slurm_utils.py
Expand Up @@ -13,10 +13,10 @@
ADJUST_TO_PARTIION = bool("{{cookiecutter.adjust_to_partition}}")

RESOURCE_MAPPING = {
"time": ("time", "runtime", "walltime"),
"mem": ("mem", "mem_mb", "ram", "memory"),
"mem_per_cpu": ("mem_per_cpu", "mem_per_thread"),
}
"time": ("time", "runtime", "walltime"),
"mem": ("mem", "mem_mb", "ram", "memory"),
"mem_per_cpu": ("mem_per_cpu", "mem_per_thread"),
}


def parse_jobscript():
Expand All @@ -29,7 +29,7 @@ def parse_jobscript():
def sbatch_defaults():
"""Unpack SBATCH_DEFAULTS."""
d = SBATCH_DEFAULTS.split() if type(SBATCH_DEFAULTS) else SBATCH_DEFAULTS
args = {k.strip().strip('-'): v.strip() for k, v in [a.split("=") for a in d]}
args = {k.strip().strip("-"): v.strip() for k, v in [a.split("=") for a in d]}
return args


Expand Down Expand Up @@ -87,7 +87,7 @@ def adjust_to_partition(**arg_dict):
if not ADJUST_TO_PARTIION:
return adjusted_args

partition = arg_dict.get('partition', None) or _get_default_partition()
partition = arg_dict.get("partition", None) or _get_default_partition()
constraint = arg_dict.get("constraint", None)
ntasks = int(arg_dict.get("ntasks", 1))
nodes = int(arg_dict.get("nodes", 1))
Expand Down Expand Up @@ -138,34 +138,41 @@ def _get_cluster_configuration(partition):
"""Retrieve cluster configuration for a partition."""
# Retrieve partition info; we tacitly assume we only get one response
cmd = " ".join(
["sinfo -e -O \"partition,cpus,memory,time,size,maxcpuspernode\"",
"-h -p {}".format(partition)])
[
'sinfo -e -O "partition,cpus,memory,time,size,maxcpuspernode"',
"-h -p {}".format(partition),
]
)
res = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
m = re.search("(?P<partition>\S+)\s+(?P<cpus>\d+)\s+(?P<memory>\S+)\s+((?P<days>\d+)-)?(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)\s+(?P<size>\S+)\s+(?P<maxcpus>\S+)",
res.stdout.decode())
m = re.search(
"(?P<partition>\S+)\s+(?P<cpus>\d+)\s+(?P<memory>\S+)\s+((?P<days>\d+)-)?(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)\s+(?P<size>\S+)\s+(?P<maxcpus>\S+)",
res.stdout.decode(),
)
d = m.groupdict()
if not 'days' in d or not d['days']:
d['days'] = 0
d["time"] = int(d['days']) * 24 * 60 + \
int(d['hours']) * 60 + int(d['minutes']) + \
math.ceil(int(d['seconds']) / 60)
if not "days" in d or not d["days"]:
d["days"] = 0
d["time"] = (
int(d["days"]) * 24 * 60
+ int(d["hours"]) * 60
+ int(d["minutes"])
+ math.ceil(int(d["seconds"]) / 60)
)
return d


def _get_features_and_memory(partition):
"""Retrieve features and memory for a partition in the cluster
configuration. """
cmd = " ".join(
["sinfo -e -O \"memory,features_act\"",
"-h -p {}".format(partition)])
cmd = " ".join(['sinfo -e -O "memory,features_act"', "-h -p {}".format(partition)])
res = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
mem_feat = []
for x in res.stdout.decode().split("\n"):
if not re.search("^\d+", x):
continue
m = re.search("^(?P<mem>\d+)\s+(?P<feat>\S+)", x)
mem_feat.append({'mem': m.groupdict()["mem"],
'features': m.groupdict()["feat"].split(",")})
mem_feat.append(
{"mem": m.groupdict()["mem"], "features": m.groupdict()["feat"].split(",")}
)
return mem_feat


Expand All @@ -181,7 +188,7 @@ def _get_available_memory(mem_feat, constraints=None):
"""
if constraints is None:
return min([int(x['mem']) for x in mem_feat])
return min([int(x["mem"]) for x in mem_feat])
try:
constraint_set = set(constraints.split(","))
for x in mem_feat:
Expand Down

0 comments on commit e391425

Please sign in to comment.