Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HPC execution #192

Merged
merged 13 commits into from
Dec 10, 2021
66 changes: 66 additions & 0 deletions examples/docking-protein-protein/docking-protein-protein-hpc.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# ====================================================================
# Rigid-body docking example

# directory in which the scoring will be done
run_dir = "run1"

# ###
mode = 'hpc'
# concatenate models inside each job, concat = 5 each .job will produce 5 models
concat = 1
# Limit the number of concurrent submissions to the queue
queue_limit = 100
# cns_exec = "path/to/bin/cns" # optional
# ###

# molecules to be docked
molecules = [
"data/e2aP_1F3G.pdb",
"data/hpr_ensemble.pdb"
]

# ====================================================================
# Parameters for each stage are defined below, prefer full paths
#####################################################################
# WARNING: THE PARAMETERS HERE ARE ILLUSTRATIVE
# THE WORKFLOW IS WORK-IN-PROGRESS
#####################################################################
[topoaa]
autohis = false
[topoaa.input.mol1]
nhisd = 0
nhise = 1
hise_1 = 75
[topoaa.input.mol2]
nhisd = 1
hisd_1 = 76
nhise = 1
hise_1 = 15

[rigidbody]
ambig_fname = 'data/e2a-hpr_air.tbl'
sampling = 1000
noecv = true

[caprieval]
reference = 'data/e2a-hpr_1GGR.pdb'

[seletop]
select = 200

[flexref]
ambig_fname = 'data/e2a-hpr_air.tbl'
noecv = true

[caprieval]
reference = 'data/e2a-hpr_1GGR.pdb'

[emref]
ambig_fname = 'data/e2a-hpr_air.tbl'
noecv = true

[caprieval]
reference = 'data/e2a-hpr_1GGR.pdb'

# ====================================================================

214 changes: 214 additions & 0 deletions src/haddock/libs/libhpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""Module in charge of running tasks in HPC."""
import os
import re
import shlex
import subprocess
import time
from pathlib import Path

from haddock import log


STATE_REGEX = r"JobState=(\w*)"


class HPCWorker:
"""Defines the HPC Job."""

def __init__(self, tasks, num):
self.tasks = tasks
log.debug(f"HPCWorker ready with {len(self.tasks)}")
self.job_id = None
self.job_num = num
self.job_status = "unknown"
self.job_fname = ""

def run(self):
"""Execute the tasks."""
job_file = self.prepare_job_file(self.tasks)
cmd = f"sbatch {job_file}"
p = subprocess.run(shlex.split(cmd), capture_output=True)
self.job_id = int(p.stdout.decode("utf-8").split()[-1])
self.job_status = "submitted"

def update_status(self):
"""Retrieve the status of this worker."""
cmd = f"scontrol show jobid -dd {self.job_id}"
p = subprocess.run(shlex.split(cmd), capture_output=True)
out = p.stdout.decode("utf-8")
# err = p.stderr.decode('utf-8')
if out:
# TODO: Maybe a regex here is overkill
rvhonorato marked this conversation as resolved.
Show resolved Hide resolved
# https://regex101.com/r/M2vbAc/1
status = re.findall(STATE_REGEX, out)[0]
if status == "PENDING":
self.job_status = "submitted"
elif status == "RUNNING":
self.job_status = "running"
elif status == "SUSPENDED":
self.job_status = "hold"
elif status == "COMPLETING":
self.job_status = "running"
elif status == "COMPLETED":
self.job_status = "finished"
elif status == "FAILED":
rvhonorato marked this conversation as resolved.
Show resolved Hide resolved
self.job_status = "failed"
else:
self.job_status = "finished"

return self.job_status

def prepare_job_file(self, job_list):
"""Prepare a .job file for SLURM."""
job_name = "haddock3"
queue = "haddock"
moddir = job_list[0].modpath
module = job_list[0].cns_folder
run = job_list[0].config_path
toppar = job_list[0].toppar
module_name = job_list[0].modpath.name.split("_")[-1]
self.job_fname = Path(moddir, f"{module_name}_{self.job_num}.job")
out_fname = Path(moddir, f"{module_name}_{self.job_num}_job.out")
err_fname = Path(moddir, f"{module_name}_{self.job_num}_job.err")

header = f"#!/bin/sh{os.linesep}"
header += f"#SBATCH -J {job_name}{os.linesep}"
header += f"#SBATCH -p {queue}{os.linesep}"
header += f"#SBATCH --nodes=1{os.linesep}"
header += f"#SBATCH --tasks-per-node=1{os.linesep}"
header += f"#SBATCH --output={out_fname}{os.linesep}"
header += f"#SBATCH --error={err_fname}{os.linesep}"
header += f"#SBATCH --workdir={moddir}{os.linesep}"

envs = f"export MODDIR={moddir}{os.linesep}"
envs += f"export MODULE={module}{os.linesep}"
envs += f"export RUN={run}{os.linesep}"
envs += f"export TOPPAR={toppar}{os.linesep}"

body = envs

body += f"cd {moddir}" + os.linesep
for job in job_list:
cmd = (
f"{job.cns_exec} < {job.input_file} > {job.output_file}"
f"{os.linesep}"
)
body += cmd

with open(self.job_fname, "w") as job_fh:
job_fh.write(header)
job_fh.write(body)

return self.job_fname

def cancel(self):
"""Cancel the execution."""
bypass_statuses = ["finished", "failed"]
if self.update_status() not in bypass_statuses:
log.info(f"Canceling {self.job_fname.name} - {self.job_id}")
cmd = f"scancel {self.job_id}"
_ = subprocess.run(shlex.split(cmd), capture_output=True)


class HPCScheduler:
"""Schedules tasks to run in HPC."""

def __init__(self, task_list, queue_limit, concat):
self.num_tasks = len(task_list)
# FIXME: The defaults are hardcoded here
if not concat:
concat = 1
if not queue_limit:
queue_limit = 100
# =======
self.queue_limit = queue_limit
self.concat = concat

# split tasks according to concat level
if concat > 1:
log.info(
f"Concatenating, each .job will produce {concat} "
"(or less) models"
)
job_list = [
task_list[i:i + concat] for i in range(0, len(task_list), concat)
]

self.worker_list = [
HPCWorker(t, j) for j, t in enumerate(job_list, start=1)
]

log.debug(f"{self.num_tasks} HPC tasks ready.")

def run(self):
"""Run tasks in the Queue."""
# split by maximum number of submission so we do it in batches
adaptive_l = []
batch = [
self.worker_list[i:i + self.queue_limit]
for i in range(0, len(self.worker_list), self.queue_limit)
]
try:
for batch_num, job_list in enumerate(batch, start=1):
log.info(f"> Running batch {batch_num}/{len(batch)}")
start = time.time()
for worker in job_list:
worker.run()

# check if those finished
completed = False
while not completed:
for worker in job_list:
worker.update_status()
if worker.job_status != "finished":
log.info(
f">> {worker.job_fname.name}"
f" {worker.job_status}"
)

completed_count = sum(
w.job_status == "finished" for w in job_list
)
failed_count = sum(
w.job_status == "failed" for w in job_list
)

per = (
(completed_count + failed_count) / len(self.worker_list)
) * 100
log.info(f">> {per:.0f}% done")
if completed_count + failed_count == len(job_list):
completed = True
end = time.time()
elapsed = end - start
log.info(f'>> Took {elapsed:.2f}s')
adaptive_l.append(elapsed)
else:
if not adaptive_l:
# This is the first run, use pre-defined waits
if len(job_list) < 10:
sleep_timer = 10
elif len(job_list) < 50:
sleep_timer = 30
else:
sleep_timer = 60
else:
# We already know how long it took, use the average
sleep_timer = round(
sum(adaptive_l) / len(adaptive_l)
)
log.info(f">> Waiting... ({sleep_timer:.2f}s)")
time.sleep(sleep_timer)
log.info(f"> Batch {batch_num}/{len(batch)} done")

except KeyboardInterrupt as err:
self.terminate()
raise err

def terminate(self):
"""Terminate all jobs in the queue in a controlled way."""
log.info("Terminate signal recieved, removing jobs from the queue...")
for worker in self.worker_list:
worker.cancel()

log.info("The jobs in the queue were terminated in a controlled way")
6 changes: 6 additions & 0 deletions src/haddock/libs/libworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __init__(
run_dir=None,
cns_exec=None,
config_path=None,
mode=None,
concat=None,
queue_limit=None,
**ig):
# Create the list of steps contained in this workflow
self.steps = []
Expand All @@ -46,6 +49,9 @@ def __init__(
params.setdefault('ncores', ncores)
params.setdefault('cns_exec', cns_exec)
params.setdefault('config_path', config_path)
params.setdefault('mode', mode)
params.setdefault('concat', concat)
params.setdefault('queue_limit', queue_limit)

try:
_ = Step(
Expand Down
7 changes: 6 additions & 1 deletion src/haddock/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
values are their categories. Categories are the modules parent folders."""


general_parameters_affecting_modules = {'ncores', 'cns_exec'}
general_parameters_affecting_modules = {
'ncores', 'cns_exec', 'mode', 'concat', 'queue_limit'
}
"""These parameters are general parameters that may be applicable to modules
specifically. Therefore, they should be considered as part of the "default"
module's parameters. Usually, this set is used to filter parameters during
Expand Down Expand Up @@ -96,6 +98,9 @@ def run(self, **params):
self.update_params(**params)
self.params.setdefault('ncores', None)
self.params.setdefault('cns_exec', None)
self.params.setdefault('mode', None)
self.params.setdefault('concat', None)
self.params.setdefault('queue_limit', None)
self._run()
log.info(f'Module [{self.name}] finished.')

Expand Down
13 changes: 9 additions & 4 deletions src/haddock/modules/refinement/emref/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from haddock.gear.haddockmodel import HaddockModel
from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb
from haddock.libs.libhpc import HPCScheduler
from haddock.libs.libontology import ModuleIO
from haddock.libs.libparallel import Scheduler
from haddock.libs.libsubprocess import CNSJob
Expand Down Expand Up @@ -84,11 +85,15 @@ def _run(self):

idx += 1

# Run CNS engine
self.log(f"Running CNS engine with {len(jobs)} jobs")
engine = Scheduler(jobs, ncores=self.params["ncores"])
# Run CNS Jobs
self.log(f"Running CNS Jobs n={len(jobs)}")
if self.params['mode'] == 'hpc':
engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'],
concat=self.params["concat"])
else:
engine = Scheduler(jobs, ncores=self.params['ncores'])
engine.run()
self.log("CNS engine has finished")
self.log("CNS jobs have finished")

# Get the weights needed for the CNS module
_weight_keys = ("w_vdw", "w_elec", "w_desolv", "w_air", "w_bsa")
Expand Down
13 changes: 9 additions & 4 deletions src/haddock/modules/refinement/flexref/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from haddock.gear.haddockmodel import HaddockModel
from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb
from haddock.libs.libhpc import HPCScheduler
from haddock.libs.libontology import ModuleIO
from haddock.libs.libparallel import Scheduler
from haddock.libs.libsubprocess import CNSJob
Expand Down Expand Up @@ -83,11 +84,15 @@ def _run(self):

idx += 1

# Run CNS engine
self.log(f"Running CNS engine with {len(jobs)} jobs")
engine = Scheduler(jobs, ncores=self.params["ncores"])
# Run CNS Jobs
self.log(f"Running CNS Jobs n={len(jobs)}")
if self.params['mode'] == 'hpc':
engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'],
concat=self.params["concat"])
else:
engine = Scheduler(jobs, ncores=self.params['ncores'])
engine.run()
self.log("CNS engine has finished")
self.log("CNS jobs have finished")

# Get the weights from the defaults
_weight_keys = ("w_vdw", "w_elec", "w_desolv", "w_air", "w_bsa")
Expand Down