From 4a1b231c26146a87e2f5bcf01c2e9381d7123bcb Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 12:12:42 +0100 Subject: [PATCH 01/12] implement HPC execution --- src/haddock/libs/libhpc.py | 196 ++++++++++++++++++ src/haddock/libs/libworkflow.py | 6 + src/haddock/modules/__init__.py | 7 +- .../modules/refinement/emref/__init__.py | 13 +- .../modules/refinement/flexref/__init__.py | 13 +- .../modules/refinement/mdref/__init__.py | 13 +- .../modules/sampling/rigidbody/__init__.py | 14 +- .../modules/topology/topoaa/__init__.py | 13 +- 8 files changed, 254 insertions(+), 21 deletions(-) create mode 100644 src/haddock/libs/libhpc.py diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py new file mode 100644 index 000000000..833abbf5d --- /dev/null +++ b/src/haddock/libs/libhpc.py @@ -0,0 +1,196 @@ +"""Module in charge of running tasks in HPC.""" +from haddock import log +from pathlib import Path +import subprocess +import os +import shlex +import re +import time + +STATE_REGEX = r"JobState=(\w*)" + + +class HPCWorker: + """Defines the HPC Job.""" + + def __init__(self, tasks): + self.tasks = tasks + log.debug(f"HPCWorker ready with {len(self.tasks)}") + self.job_id = None + self.job_status = "unknown" + self.job_fname = "" + + def run(self, identifier): + """Execute the tasks.""" + job_file = self.prepare_job_file(self.tasks, identifier) + cmd = f"sbatch {job_file}" + p = subprocess.run(shlex.split(cmd), capture_output=True) + self.job_id = int(p.stdout.decode("utf-8").split()[-1]) + self.job_status = "submitted" + + def update_status(self): + """Retrieve the status of this worker.""" + cmd = f"scontrol show jobid -dd {self.job_id}" + p = subprocess.run(shlex.split(cmd), capture_output=True) + out = p.stdout.decode("utf-8") + # err = p.stderr.decode('utf-8') + if out: + # TODO: Maybe a regex here is overkill + # https://regex101.com/r/M2vbAc/1 + status = re.findall(STATE_REGEX, out)[0] + if status == "PENDING": + self.job_status = "submitted" + elif status == "RUNNING": + self.job_status = "running" + elif status == "SUSPENDED": + self.job_status = "hold" + elif status == "COMPLETING": + self.job_status = "running" + elif status == "COMPLETED": + self.job_status = "finished" + elif status == "FAILED": + self.job_status = "failed" + else: + self.job_status = "finished" + + return self.job_status + + def prepare_job_file(self, job_list, id): + job_name = "haddock3" + queue = "haddock" + moddir = job_list[0].modpath + module = job_list[0].cns_folder + run = job_list[0].config_path + toppar = job_list[0].toppar + module_name = job_list[0].modpath.name.split("_")[-1] + self.job_fname = Path(moddir, f"{module_name}_{id}.job") + out_fname = Path(moddir, f"{module_name}_{id}.out") + err_fname = Path(moddir, f"{module_name}_{id}.err") + + header = f"#!/bin/sh{os.linesep}" + header += f"#SBATCH -J {job_name}{os.linesep}" + header += f"#SBATCH -p {queue}{os.linesep}" + header += f"#SBATCH --nodes=1{os.linesep}" + header += f"#SBATCH --tasks-per-node=1{os.linesep}" + header += f"#SBATCH --output={out_fname}{os.linesep}" + header += f"#SBATCH --error={err_fname}{os.linesep}" + header += f"#SBATCH --workdir={moddir}{os.linesep}" + + envs = f"export MODDIR={moddir}{os.linesep}" + envs += f"export MODULE={module}{os.linesep}" + envs += f"export RUN={run}{os.linesep}" + envs += f"export TOPPAR={toppar}{os.linesep}" + + body = envs + + body += f"cd {moddir}" + os.linesep + for job in job_list: + cmd = ( + f"{job.cns_exec} < {job.input_file} > {job.output_file}" + f"{os.linesep}" + ) + body += cmd + + with open(self.job_fname, "w") as job_fh: + job_fh.write(header) + job_fh.write(body) + + return self.job_fname + + def cancel(self): + """Cancel the execution.""" + bypass_statuses = ["finished", "failed"] + if self.update_status() not in bypass_statuses: + log.info(f"Canceling {self.job_fname.name} - {self.job_id}") + cmd = f"scancel {self.job_id}" + _ = subprocess.run(shlex.split(cmd), capture_output=True) + + +class HPCScheduler: + """Schedules tasks to run in HPC.""" + + def __init__(self, task_list, queue_limit, concat): + self.num_tasks = len(task_list) + # FIXME: The defaults are hardcoded here + if not concat: + concat = 1 + if not queue_limit: + queue_limit = 100 + # ======= + self.queue_limit = queue_limit + self.concat = concat + + # split tasks according to concat level + if concat > 1: + log.info(f"Concatenating, each .job will produce {concat} models") + job_list = [ + task_list[i:i + concat] for i in range(0, len(task_list), concat) + ] + + self.worker_list = [HPCWorker(t) for t in job_list] + + log.debug(f"{self.num_tasks} HPC tasks ready.") + + def run(self): + """Run tasks in the Queue.""" + # split by maximum number of submission so we do it in batches + batch = [ + self.worker_list[i:i + self.queue_limit] + for i in range(0, len(self.worker_list), self.queue_limit) + ] + try: + for batch_num, worker_list in enumerate(batch, start=1): + log.info(f"> Running batch {batch_num}/{len(batch)}") + for i, worker in enumerate(worker_list, start=1): + worker.run(i) + + # check if those finished + completed = False + while not completed: + for worker in worker_list: + worker.update_status() + if worker.job_status != "finished": + log.info( + f">> {worker.job_fname.name}" + f" {worker.job_status}" + ) + + completed_count = sum( + w.job_status == "finished" for w in worker_list + ) + failed_count = sum( + w.job_status == "failed" for w in worker_list + ) + + per = ( + (completed_count + failed_count) / len(self.worker_list) + ) * 100 + log.info(f">> {per:.0f}% done") + if completed_count + failed_count == len(worker_list): + completed = True + else: + sleep_timer = 0 + if len(worker_list) < 10: + # this is a small batch, wait just a little + sleep_timer = 10 + elif len(worker_list) < 50: + # this is a bit larger, wait longer + sleep_timer = 30 + else: + # this can be large, wait more! + sleep_timer = 60 + log.info(f">> Waiting... ({sleep_timer}s)") + time.sleep(sleep_timer) + log.info(f"> Batch {batch_num}/{len(batch)} done") + + except KeyboardInterrupt as err: + self.terminate() + raise err + + def terminate(self): + """Terminate all jobs in the queue in a controlled way.""" + log.info("Terminate signal recieved, removing jobs from the queue...") + for worker in self.worker_list: + worker.cancel() + + log.info("The jobs in the queue were terminated in a controlled way") diff --git a/src/haddock/libs/libworkflow.py b/src/haddock/libs/libworkflow.py index d8c6c6e6e..2aae64ec4 100644 --- a/src/haddock/libs/libworkflow.py +++ b/src/haddock/libs/libworkflow.py @@ -35,6 +35,9 @@ def __init__( run_dir=None, cns_exec=None, config_path=None, + mode=None, + concat=None, + queue_limit=None, **ig): # Create the list of steps contained in this workflow self.steps = [] @@ -46,6 +49,9 @@ def __init__( params.setdefault('ncores', ncores) params.setdefault('cns_exec', cns_exec) params.setdefault('config_path', config_path) + params.setdefault('mode', mode) + params.setdefault('concat', concat) + params.setdefault('queue_limit', queue_limit) try: _ = Step( diff --git a/src/haddock/modules/__init__.py b/src/haddock/modules/__init__.py index 05e4d6f45..efc7b8f93 100644 --- a/src/haddock/modules/__init__.py +++ b/src/haddock/modules/__init__.py @@ -24,7 +24,9 @@ values are their categories. Categories are the modules parent folders.""" -general_parameters_affecting_modules = {'ncores', 'cns_exec'} +general_parameters_affecting_modules = { + 'ncores', 'cns_exec', 'mode', 'concat', 'queue_limit' + } """These parameters are general parameters that may be applicable to modules specifically. Therefore, they should be considered as part of the "default" module's parameters. Usually, this set is used to filter parameters during @@ -96,6 +98,9 @@ def run(self, **params): self.update_params(**params) self.params.setdefault('ncores', None) self.params.setdefault('cns_exec', None) + self.params.setdefault('mode', None) + self.params.setdefault('concat', None) + self.params.setdefault('queue_limit', None) self._run() log.info(f'Module [{self.name}] finished.') diff --git a/src/haddock/modules/refinement/emref/__init__.py b/src/haddock/modules/refinement/emref/__init__.py index 5c97b9724..9a92b31d4 100644 --- a/src/haddock/modules/refinement/emref/__init__.py +++ b/src/haddock/modules/refinement/emref/__init__.py @@ -3,6 +3,7 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb +from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob @@ -84,11 +85,15 @@ def _run(self): idx += 1 - # Run CNS engine - self.log(f"Running CNS engine with {len(jobs)} jobs") - engine = Scheduler(jobs, ncores=self.params["ncores"]) + # Run CNS Jobs + self.log(f"Running CNS Jobs n={len(jobs)}") + if self.params['mode'] == 'hpc': + engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], + concat=self.params["concat"]) + else: + engine = Scheduler(jobs, ncores=self.params['ncores']) engine.run() - self.log("CNS engine has finished") + self.log("CNS jobs have finished") # Get the weights needed for the CNS module _weight_keys = ("w_vdw", "w_elec", "w_desolv", "w_air", "w_bsa") diff --git a/src/haddock/modules/refinement/flexref/__init__.py b/src/haddock/modules/refinement/flexref/__init__.py index 929d6e4c6..2524deff9 100644 --- a/src/haddock/modules/refinement/flexref/__init__.py +++ b/src/haddock/modules/refinement/flexref/__init__.py @@ -3,6 +3,7 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb +from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob @@ -83,11 +84,15 @@ def _run(self): idx += 1 - # Run CNS engine - self.log(f"Running CNS engine with {len(jobs)} jobs") - engine = Scheduler(jobs, ncores=self.params["ncores"]) + # Run CNS Jobs + self.log(f"Running CNS Jobs n={len(jobs)}") + if self.params['mode'] == 'hpc': + engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], + concat=self.params["concat"]) + else: + engine = Scheduler(jobs, ncores=self.params['ncores']) engine.run() - self.log("CNS engine has finished") + self.log("CNS jobs have finished") # Get the weights from the defaults _weight_keys = ("w_vdw", "w_elec", "w_desolv", "w_air", "w_bsa") diff --git a/src/haddock/modules/refinement/mdref/__init__.py b/src/haddock/modules/refinement/mdref/__init__.py index f2823099f..5d677a843 100644 --- a/src/haddock/modules/refinement/mdref/__init__.py +++ b/src/haddock/modules/refinement/mdref/__init__.py @@ -3,6 +3,7 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb +from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob @@ -81,11 +82,15 @@ def _run(self): idx += 1 - # Run CNS engine - self.log(f"Running CNS engine with {len(jobs)} jobs") - engine = Scheduler(jobs, ncores=self.params["ncores"]) + # Run CNS Jobs + self.log(f"Running CNS Jobs n={len(jobs)}") + if self.params['mode'] == 'hpc': + engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], + concat=self.params["concat"]) + else: + engine = Scheduler(jobs, ncores=self.params['ncores']) engine.run() - self.log("CNS engine has finished") + self.log("CNS jobs have finished") # Get the weights from the defaults _weight_keys = ("w_vdw", "w_elec", "w_desolv", "w_air", "w_bsa") diff --git a/src/haddock/modules/sampling/rigidbody/__init__.py b/src/haddock/modules/sampling/rigidbody/__init__.py index 4a09c5c77..b9eb932b7 100644 --- a/src/haddock/modules/sampling/rigidbody/__init__.py +++ b/src/haddock/modules/sampling/rigidbody/__init__.py @@ -3,6 +3,7 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input +from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO, PDBFile from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob @@ -56,6 +57,7 @@ def _run(self): # Prepare the jobs idx = 1 structure_list = [] + self.log("Preparing jobs...") for combination in models_to_dock: for _i in range(sampling_factor): @@ -89,11 +91,15 @@ def _run(self): idx += 1 - # Run CNS engine - self.log(f"Running CNS engine with {len(jobs)} jobs") - engine = Scheduler(jobs, ncores=self.params["ncores"]) + # Run CNS Jobs + self.log(f"Running CNS Jobs n={len(jobs)}") + if self.params['mode'] == 'hpc': + engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], + concat=self.params["concat"]) + else: + engine = Scheduler(jobs, ncores=self.params['ncores']) engine.run() - self.log("CNS engine has finished") + self.log("CNS jobs have finished") # Get the weights according to CNS parameters _weight_keys = ("w_vdw", "w_elec", "w_desolv", "w_air", "w_bsa") diff --git a/src/haddock/modules/topology/topoaa/__init__.py b/src/haddock/modules/topology/topoaa/__init__.py index 0ce017ec9..d5fd2a738 100644 --- a/src/haddock/modules/topology/topoaa/__init__.py +++ b/src/haddock/modules/topology/topoaa/__init__.py @@ -9,6 +9,7 @@ prepare_output, prepare_single_input, ) +from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import Format, ModuleIO, PDBFile, TopologyFile from haddock.libs.libparallel import Scheduler from haddock.libs.libstructure import make_molecules @@ -146,11 +147,15 @@ def _run(self): jobs.append(job) - # Run CNS engine - self.log(f"Running CNS engine with {len(jobs)} jobs") - engine = Scheduler(jobs, ncores=self.params['ncores']) + # Run CNS Jobs + self.log(f"Running CNS Jobs n={len(jobs)}") + if self.params['mode'] == 'hpc': + engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], + concat=self.params["concat"]) + else: + engine = Scheduler(jobs, ncores=self.params['ncores']) engine.run() - self.log("CNS engine has finished") + self.log("CNS jobs have finished") # Check for generated output, fail it not all expected files # are found From 25abb36bebccee02ff73ace08b14592dbd0fcd50 Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 12:12:48 +0100 Subject: [PATCH 02/12] Create docking-protein-protein-hpc.cfg --- .../docking-protein-protein-hpc.cfg | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 examples/docking-protein-protein/docking-protein-protein-hpc.cfg diff --git a/examples/docking-protein-protein/docking-protein-protein-hpc.cfg b/examples/docking-protein-protein/docking-protein-protein-hpc.cfg new file mode 100644 index 000000000..f2e53204b --- /dev/null +++ b/examples/docking-protein-protein/docking-protein-protein-hpc.cfg @@ -0,0 +1,66 @@ +# ==================================================================== +# Rigid-body docking example + +# directory in which the scoring will be done +run_dir = "run1" + +# ### +mode = 'hpc' +# concatenate models inside each job, concat = 5 each .job will produce 5 models +concat = 1 +# Limit the number of concurrent submissions to the queue +queue_limit = 100 +# cns_exec = "path/to/bin/cns" # optional +# ### + +# molecules to be docked +molecules = [ + "data/e2aP_1F3G.pdb", + "data/hpr_ensemble.pdb" + ] + +# ==================================================================== +# Parameters for each stage are defined below, prefer full paths +##################################################################### +# WARNING: THE PARAMETERS HERE ARE ILLUSTRATIVE +# THE WORKFLOW IS WORK-IN-PROGRESS +##################################################################### +[topoaa] +autohis = false +[topoaa.input.mol1] +nhisd = 0 +nhise = 1 +hise_1 = 75 +[topoaa.input.mol2] +nhisd = 1 +hisd_1 = 76 +nhise = 1 +hise_1 = 15 + +[rigidbody] +ambig_fname = 'data/e2a-hpr_air.tbl' +sampling = 1000 +noecv = true + +[caprieval] +reference = 'data/e2a-hpr_1GGR.pdb' + +[seletop] +select = 200 + +[flexref] +ambig_fname = 'data/e2a-hpr_air.tbl' +noecv = true + +[caprieval] +reference = 'data/e2a-hpr_1GGR.pdb' + +[emref] +ambig_fname = 'data/e2a-hpr_air.tbl' +noecv = true + +[caprieval] +reference = 'data/e2a-hpr_1GGR.pdb' + +# ==================================================================== + From 75ae72bbac9c7f9797fa587c20f1a12d265168a1 Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 12:57:37 +0100 Subject: [PATCH 03/12] add adaptive timer and fix typo in percentage calculation --- src/haddock/libs/libhpc.py | 59 ++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index 833abbf5d..1f4261708 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -13,16 +13,17 @@ class HPCWorker: """Defines the HPC Job.""" - def __init__(self, tasks): + def __init__(self, tasks, num): self.tasks = tasks log.debug(f"HPCWorker ready with {len(self.tasks)}") self.job_id = None + self.job_num = num self.job_status = "unknown" self.job_fname = "" - def run(self, identifier): + def run(self): """Execute the tasks.""" - job_file = self.prepare_job_file(self.tasks, identifier) + job_file = self.prepare_job_file(self.tasks) cmd = f"sbatch {job_file}" p = subprocess.run(shlex.split(cmd), capture_output=True) self.job_id = int(p.stdout.decode("utf-8").split()[-1]) @@ -55,7 +56,7 @@ def update_status(self): return self.job_status - def prepare_job_file(self, job_list, id): + def prepare_job_file(self, job_list): job_name = "haddock3" queue = "haddock" moddir = job_list[0].modpath @@ -63,9 +64,9 @@ def prepare_job_file(self, job_list, id): run = job_list[0].config_path toppar = job_list[0].toppar module_name = job_list[0].modpath.name.split("_")[-1] - self.job_fname = Path(moddir, f"{module_name}_{id}.job") - out_fname = Path(moddir, f"{module_name}_{id}.out") - err_fname = Path(moddir, f"{module_name}_{id}.err") + self.job_fname = Path(moddir, f"{module_name}_{self.job_num}.job") + out_fname = Path(moddir, f"{module_name}_{self.job_num}.out") + err_fname = Path(moddir, f"{module_name}_{self.job_num}.err") header = f"#!/bin/sh{os.linesep}" header += f"#SBATCH -J {job_name}{os.linesep}" @@ -122,18 +123,24 @@ def __init__(self, task_list, queue_limit, concat): # split tasks according to concat level if concat > 1: - log.info(f"Concatenating, each .job will produce {concat} models") + log.info( + f"Concatenating, each .job will produce {concat} " + "(or less) models" + ) job_list = [ task_list[i:i + concat] for i in range(0, len(task_list), concat) ] - self.worker_list = [HPCWorker(t) for t in job_list] + self.worker_list = [ + HPCWorker(t, j) for j, t in enumerate(job_list, start=1) + ] log.debug(f"{self.num_tasks} HPC tasks ready.") def run(self): """Run tasks in the Queue.""" # split by maximum number of submission so we do it in batches + adaptive_l = [] batch = [ self.worker_list[i:i + self.queue_limit] for i in range(0, len(self.worker_list), self.queue_limit) @@ -141,8 +148,9 @@ def run(self): try: for batch_num, worker_list in enumerate(batch, start=1): log.info(f"> Running batch {batch_num}/{len(batch)}") - for i, worker in enumerate(worker_list, start=1): - worker.run(i) + start = time.time() + for worker in worker_list: + worker.run() # check if those finished completed = False @@ -163,23 +171,30 @@ def run(self): ) per = ( - (completed_count + failed_count) / len(self.worker_list) + (completed_count + failed_count) / self.num_tasks ) * 100 log.info(f">> {per:.0f}% done") if completed_count + failed_count == len(worker_list): completed = True + end = time.time() + elapsed = end - start + log.info(f">> Took {elapsed:.2f}s") + adaptive_l.append(elapsed) else: - sleep_timer = 0 - if len(worker_list) < 10: - # this is a small batch, wait just a little - sleep_timer = 10 - elif len(worker_list) < 50: - # this is a bit larger, wait longer - sleep_timer = 30 + if not adaptive_l: + # This is the first run, use pre-defined waits + if len(worker_list) < 10: + sleep_timer = 10 + elif len(worker_list) < 50: + sleep_timer = 30 + else: + sleep_timer = 60 else: - # this can be large, wait more! - sleep_timer = 60 - log.info(f">> Waiting... ({sleep_timer}s)") + # We already know how long it took, use the average + sleep_timer = round( + sum(adaptive_l) / len(adaptive_l) + ) + log.info(f">> Waiting... ({sleep_timer:.2f}s)") time.sleep(sleep_timer) log.info(f"> Batch {batch_num}/{len(batch)} done") From c1aef92c508ce5906c6de2c3a0a207e06c10ef50 Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 13:31:22 +0100 Subject: [PATCH 04/12] Update libhpc.py --- src/haddock/libs/libhpc.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index 1f4261708..6171c698d 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -65,8 +65,8 @@ def prepare_job_file(self, job_list): toppar = job_list[0].toppar module_name = job_list[0].modpath.name.split("_")[-1] self.job_fname = Path(moddir, f"{module_name}_{self.job_num}.job") - out_fname = Path(moddir, f"{module_name}_{self.job_num}.out") - err_fname = Path(moddir, f"{module_name}_{self.job_num}.err") + out_fname = Path(moddir, f"{module_name}_{self.job_num}_job.out") + err_fname = Path(moddir, f"{module_name}_{self.job_num}_job.err") header = f"#!/bin/sh{os.linesep}" header += f"#SBATCH -J {job_name}{os.linesep}" @@ -141,21 +141,23 @@ def run(self): """Run tasks in the Queue.""" # split by maximum number of submission so we do it in batches adaptive_l = [] + completed_count = 0 + failed_count = 0 batch = [ self.worker_list[i:i + self.queue_limit] for i in range(0, len(self.worker_list), self.queue_limit) ] try: - for batch_num, worker_list in enumerate(batch, start=1): + for batch_num, job_list in enumerate(batch, start=1): log.info(f"> Running batch {batch_num}/{len(batch)}") start = time.time() - for worker in worker_list: + for worker in job_list: worker.run() # check if those finished completed = False while not completed: - for worker in worker_list: + for worker in job_list: worker.update_status() if worker.job_status != "finished": log.info( @@ -163,29 +165,29 @@ def run(self): f" {worker.job_status}" ) - completed_count = sum( - w.job_status == "finished" for w in worker_list + completed_count += sum( + w.job_status == "finished" for w in job_list ) - failed_count = sum( - w.job_status == "failed" for w in worker_list + failed_count += sum( + w.job_status == "failed" for w in job_list ) per = ( - (completed_count + failed_count) / self.num_tasks + (completed_count + failed_count) / len(self.worker_list) ) * 100 log.info(f">> {per:.0f}% done") - if completed_count + failed_count == len(worker_list): + if completed_count + failed_count == len(job_list): completed = True end = time.time() elapsed = end - start - log.info(f">> Took {elapsed:.2f}s") + log.info(f'>> Took {elapsed:.2f}s') adaptive_l.append(elapsed) else: if not adaptive_l: # This is the first run, use pre-defined waits - if len(worker_list) < 10: + if len(job_list) < 10: sleep_timer = 10 - elif len(worker_list) < 50: + elif len(job_list) < 50: sleep_timer = 30 else: sleep_timer = 60 From 98897e372c0d9e133c39fc276b50b44aaa4a4743 Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 13:37:26 +0100 Subject: [PATCH 05/12] linting --- src/haddock/libs/libhpc.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index 6171c698d..eee68d019 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -1,11 +1,13 @@ """Module in charge of running tasks in HPC.""" -from haddock import log -from pathlib import Path -import subprocess import os -import shlex import re +import shlex +import subprocess import time +from pathlib import Path + +from haddock import log + STATE_REGEX = r"JobState=(\w*)" @@ -57,6 +59,7 @@ def update_status(self): return self.job_status def prepare_job_file(self, job_list): + """Prepare a .job file for SLURM.""" job_name = "haddock3" queue = "haddock" moddir = job_list[0].modpath From 958ea7c1d71faf8a81713ae921b87a427ec5e818 Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 13:41:57 +0100 Subject: [PATCH 06/12] fix the complete check --- src/haddock/libs/libhpc.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index eee68d019..855f80314 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -144,8 +144,6 @@ def run(self): """Run tasks in the Queue.""" # split by maximum number of submission so we do it in batches adaptive_l = [] - completed_count = 0 - failed_count = 0 batch = [ self.worker_list[i:i + self.queue_limit] for i in range(0, len(self.worker_list), self.queue_limit) @@ -168,10 +166,10 @@ def run(self): f" {worker.job_status}" ) - completed_count += sum( + completed_count = sum( w.job_status == "finished" for w in job_list ) - failed_count += sum( + failed_count = sum( w.job_status == "failed" for w in job_list ) From 572b66a2adcf35b9975f3861caddce7053cc8ba7 Mon Sep 17 00:00:00 2001 From: Rodrigo V Honorato Date: Thu, 9 Dec 2021 16:13:39 +0100 Subject: [PATCH 07/12] Update libhpc.py --- src/haddock/libs/libhpc.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index 855f80314..ba08d06e3 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -11,6 +11,15 @@ STATE_REGEX = r"JobState=(\w*)" +JOB_STATUS_DIC = { + "PENDING": "submitted", + "RUNNING": "running", + "SUSPENDED": "hold", + "COMPLETING": "running", + "COMPLETED": "finished", + "FAILED": "failed", + } + class HPCWorker: """Defines the HPC Job.""" @@ -41,18 +50,7 @@ def update_status(self): # TODO: Maybe a regex here is overkill # https://regex101.com/r/M2vbAc/1 status = re.findall(STATE_REGEX, out)[0] - if status == "PENDING": - self.job_status = "submitted" - elif status == "RUNNING": - self.job_status = "running" - elif status == "SUSPENDED": - self.job_status = "hold" - elif status == "COMPLETING": - self.job_status = "running" - elif status == "COMPLETED": - self.job_status = "finished" - elif status == "FAILED": - self.job_status = "failed" + self.job_status = JOB_STATUS_DIC[status] else: self.job_status = "finished" From e7cf9e2891a435f972c793d987916ba64662bf5d Mon Sep 17 00:00:00 2001 From: joaomcteixeira Date: Fri, 10 Dec 2021 13:47:13 +0100 Subject: [PATCH 08/12] Merging benchmark functionalities with hpc Also, modularized functions for better testing. TODO testing if it reproduces what @rvhonorate implemented --- src/haddock/clis/cli_bm.py | 90 +-------------- src/haddock/libs/libhpc.py | 229 ++++++++++++++++++++++++++++--------- 2 files changed, 174 insertions(+), 145 deletions(-) diff --git a/src/haddock/clis/cli_bm.py b/src/haddock/clis/cli_bm.py index f0fac7e95..331d28013 100644 --- a/src/haddock/clis/cli_bm.py +++ b/src/haddock/clis/cli_bm.py @@ -48,6 +48,7 @@ from pathlib import Path from haddock import log +from haddock.libs.libhpc import create_job_header_funcs # first character allowed for benchmark test cases, we use digits and @@ -312,88 +313,6 @@ def create_job( return job_header + job_body + job_tail -def create_torque_header( - job_name, - work_dir, - stdout_path, - stderr_path, - queue='medium', - ncores=48, - ): - """ - Create HADDOCK3 Alcazar job file. - - Parameters - ---------- - job_name : str - The name of the job. - - work_dir : pathlib.Path - The working dir of the example. That is, the directory where - `input`, `jobs`, and `logs` reside. Injected in `create_job_header`. - - **job_params - According to `job_setup`. - - Return - ------ - str - Torque-based job file for HADDOCK3 benchmarking. - """ - header = \ -f"""#!/usr/bin/env tcsh -#PBS -N {job_name} -#PBS -q {queue} -#PBS -l nodes=1:ppn={str(ncores)} -#PBS -S /bin/tcsh -#PBS -o {stdout_path} -#PBS -e {stderr_path} -#PBS -wd {work_dir} -""" - return header - - -def create_slurm_header( - job_name, - work_dir, - stdout_path, - stderr_path, - queue='medium', - ncores=48, - ): - """ - Create HADDOCK3 Slurm Batch job file. - - Parameters - ---------- - job_name : str - The name of the job. - - work_dir : pathlib.Path - The working dir of the example. That is, the directory where - `input`, `jobs`, and `logs` reside. Injected in `create_job_header`. - - **job_params - According to `job_setup`. - - Return - ------ - str - Slurm-based job file for HADDOCK3 benchmarking. - """ - header = \ -f"""#!/usr/bin/env bash -#SBATCH -J {job_name} -#SBATCH -p {queue} -#SBATCH --nodes=1 -#SBATCH --tasks-per-node={str(ncores)} -#SBATCH --output={stdout_path} -#SBATCH --error={stderr_path} -#SBATCH --workdir={work_dir} -""" - return header - - def setup_haddock3_job(available_flag, running_flag, conf_f): """ Write body for the job script. @@ -589,13 +508,6 @@ def make_daemon_job( # helper dictionaries -# the different job submission queues -create_job_header_funcs = { - 'torque': create_torque_header, - 'slurm': create_slurm_header, - } - - # the different scenarios covered benchmark_scenarios = { 'true-interface': create_cfg_scn_1, diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index ba08d06e3..65e4cd940 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -20,22 +20,66 @@ "FAILED": "failed", } +HPCScheduler_CONCAT_DEFAULT = 1 +HPCWorker_QUEUE_LIMIT_DEFAULT = 100 + class HPCWorker: """Defines the HPC Job.""" - def __init__(self, tasks, num): + def __init__( + self, + tasks, + num, + job_id=None, + queue_type='slurm', + ): self.tasks = tasks log.debug(f"HPCWorker ready with {len(self.tasks)}") - self.job_id = None self.job_num = num + self.job_id = job_id self.job_status = "unknown" - self.job_fname = "" + + self.moddir = tasks[0].modpath + self.module_name = self.moddir.name.split('_')[-1] + self.config_path = tasks[0].config_path + self.toppar = tasks[0].toppar + self.cns_folder = tasks[0].cns_folder + self.job_fname = Path(self.moddir, f'{self.module_name}_{num}.job') + self.queue_type = queue_type + + def prepare_job_file(self, queue_type='slurm'): + job_file_contents = create_job_header_funcs[queue_type]( + job_name='haddock3', + queue='haddock', + ncores=1, + work_dir=self.moddir, + stdout_path=self.job_fname.with_suffix('.out'), + stderr_path=self.job_fname.with_suffix('.err'), + ) + + job_file_contents += create_CNS_export_envvars( + MODDIR=self.moddir, + MODULE=self.cns_folder, + RUN=self.config_path, + TOPPAR=self.toppar, + ) + + job_file_contents += f"cd {self.moddir}{os.linesep}" + for job in self.tasks: + cmd = ( + f"{job.cns_exec} < {job.input_file} > {job.output_file}" + f"{os.linesep}" + ) + job_file_contents += cmd + + self.job_fname.write_text(job_file_contents) + def run(self): """Execute the tasks.""" - job_file = self.prepare_job_file(self.tasks) - cmd = f"sbatch {job_file}" + self.prepare_job_file(self.queue_type) + cmd = f"sbatch {self.job_fname}" p = subprocess.run(shlex.split(cmd), capture_output=True) self.job_id = int(p.stdout.decode("utf-8").split()[-1]) self.job_status = "submitted" @@ -56,52 +100,9 @@ def update_status(self): return self.job_status - def prepare_job_file(self, job_list): - """Prepare a .job file for SLURM.""" - job_name = "haddock3" - queue = "haddock" - moddir = job_list[0].modpath - module = job_list[0].cns_folder - run = job_list[0].config_path - toppar = job_list[0].toppar - module_name = job_list[0].modpath.name.split("_")[-1] - self.job_fname = Path(moddir, f"{module_name}_{self.job_num}.job") - out_fname = Path(moddir, f"{module_name}_{self.job_num}_job.out") - err_fname = Path(moddir, f"{module_name}_{self.job_num}_job.err") - - header = f"#!/bin/sh{os.linesep}" - header += f"#SBATCH -J {job_name}{os.linesep}" - header += f"#SBATCH -p {queue}{os.linesep}" - header += f"#SBATCH --nodes=1{os.linesep}" - header += f"#SBATCH --tasks-per-node=1{os.linesep}" - header += f"#SBATCH --output={out_fname}{os.linesep}" - header += f"#SBATCH --error={err_fname}{os.linesep}" - header += f"#SBATCH --workdir={moddir}{os.linesep}" - - envs = f"export MODDIR={moddir}{os.linesep}" - envs += f"export MODULE={module}{os.linesep}" - envs += f"export RUN={run}{os.linesep}" - envs += f"export TOPPAR={toppar}{os.linesep}" - - body = envs - - body += f"cd {moddir}" + os.linesep - for job in job_list: - cmd = ( - f"{job.cns_exec} < {job.input_file} > {job.output_file}" - f"{os.linesep}" - ) - body += cmd - - with open(self.job_fname, "w") as job_fh: - job_fh.write(header) - job_fh.write(body) - - return self.job_fname - def cancel(self): + def cancel(self, bypass_statuses=("finished", "failed")): """Cancel the execution.""" - bypass_statuses = ["finished", "failed"] if self.update_status() not in bypass_statuses: log.info(f"Canceling {self.job_fname.name} - {self.job_id}") cmd = f"scancel {self.job_id}" @@ -111,14 +112,13 @@ def cancel(self): class HPCScheduler: """Schedules tasks to run in HPC.""" - def __init__(self, task_list, queue_limit, concat): + def __init__( + self, + task_list, + queue_limit=HPCWorker_QUEUE_LIMIT_DEFAULT, + concat=HPCScheduler_CONCAT_DEFAULT, + ): self.num_tasks = len(task_list) - # FIXME: The defaults are hardcoded here - if not concat: - concat = 1 - if not queue_limit: - queue_limit = 100 - # ======= self.queue_limit = queue_limit self.concat = concat @@ -210,3 +210,120 @@ def terminate(self): worker.cancel() log.info("The jobs in the queue were terminated in a controlled way") + + +def create_slurm_header( + job_name='haddock3_slurm_job', + work_dir='.', + stdout_path='haddock3_job.out', + stderr_path='haddock3_job.err', + queue='medium', + ncores=48, + ): + """ + Create HADDOCK3 Slurm Batch job file. + + Parameters + ---------- + job_name : str + The name of the job. + + work_dir : pathlib.Path + The working dir of the example. That is, the directory where + `input`, `jobs`, and `logs` reside. Injected in `create_job_header`. + + **job_params + According to `job_setup`. + + Return + ------ + str + Slurm-based job file for HADDOCK3 benchmarking. + """ + header = \ +f"""#!/usr/bin/env bash +#SBATCH -J {job_name} +#SBATCH -p {queue} +#SBATCH --nodes=1 +#SBATCH --tasks-per-node={str(ncores)} +#SBATCH --output={stdout_path} +#SBATCH --error={stderr_path} +#SBATCH --workdir={work_dir} + +""" # noqa: E128 + return header + + +def create_torque_header( + job_name='haddock3_torque_job', + work_dir='.', + stdout_path='haddock3_job.out', + stderr_path='haddock3_job.err', + queue='medium', + ncores=48, + ): + """ + Create HADDOCK3 Alcazar job file. + + Parameters + ---------- + job_name : str + The name of the job. + + work_dir : pathlib.Path + The working dir of the example. That is, the directory where + `input`, `jobs`, and `logs` reside. Injected in `create_job_header`. + + **job_params + According to `job_setup`. + + Return + ------ + str + Torque-based job file for HADDOCK3 benchmarking. + """ + header = \ +f"""#!/usr/bin/env tcsh +#PBS -N {job_name} +#PBS -q {queue} +#PBS -l nodes=1:ppn={str(ncores)} +#PBS -S /bin/tcsh +#PBS -o {stdout_path} +#PBS -e {stderr_path} +#PBS -wd {work_dir} + +""" # noqa: E128 + return header + + +def create_CNS_export_envvars(**envvars): + """Create a string exporting envvars needed for CNS. + + Parameters + ---------- + envvars : dict + A dictionary containing envvariables where keys are var names + and values are the values. + + Returns + ------- + str + In the form of: + export VAR1=VALUE1 + export VAR2=VALUE2 + export VAR3=VALUE3 + + """ + exports = os.linesep.join( + f'export {key.upper()}={value}' + for key, value in envvars.items() + ) + + return exports + os.linesep + os.linesep + + +# the different job submission queues +create_job_header_funcs = { + 'torque': create_torque_header, + 'slurm': create_slurm_header, + } From 3ac7ce93298def0cb6c216a389b96968946e1956 Mon Sep 17 00:00:00 2001 From: joaomcteixeira Date: Fri, 10 Dec 2021 14:08:19 +0100 Subject: [PATCH 09/12] add get_engine factory --- src/haddock/libs/libhpc.py | 2 +- src/haddock/libs/libworkflow.py | 6 +-- src/haddock/modules/__init__.py | 40 +++++++++++++++++++ .../modules/refinement/emref/__init__.py | 7 +--- .../modules/refinement/flexref/__init__.py | 7 +--- .../modules/refinement/mdref/__init__.py | 7 +--- .../modules/sampling/rigidbody/__init__.py | 7 +--- .../modules/topology/topoaa/__init__.py | 11 ++--- 8 files changed, 55 insertions(+), 32 deletions(-) diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index 65e4cd940..a2d7b8648 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -238,7 +238,7 @@ def create_slurm_header( Return ------ str - Slurm-based job file for HADDOCK3 benchmarking. + Slurm-based job file for HADDOCK3. """ header = \ f"""#!/usr/bin/env bash diff --git a/src/haddock/libs/libworkflow.py b/src/haddock/libs/libworkflow.py index 2aae64ec4..d6ee00c73 100644 --- a/src/haddock/libs/libworkflow.py +++ b/src/haddock/libs/libworkflow.py @@ -36,9 +36,9 @@ def __init__( cns_exec=None, config_path=None, mode=None, - concat=None, - queue_limit=None, - **ig): + concat=HPCScheduler_CONCAT_DEFAULT, + queue_limit=HPCWorker_QUEUE_LIMIT_DEFAULT, + **ignore): # Create the list of steps contained in this workflow self.steps = [] for num_stage, (stage_name, params) in enumerate(content.items()): diff --git a/src/haddock/modules/__init__.py b/src/haddock/modules/__init__.py index efc7b8f93..d2ec86ee0 100644 --- a/src/haddock/modules/__init__.py +++ b/src/haddock/modules/__init__.py @@ -2,13 +2,16 @@ import contextlib import os from abc import ABC, abstractmethod +from functools import partial from pathlib import Path from haddock import log as log from haddock.core.defaults import MODULE_IO_FILE from haddock.core.exceptions import StepError from haddock.gear.config_reader import read_config +from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO +from haddock.libs.libparallel import Scheduler from haddock.libs.libutil import recursive_dict_update @@ -170,3 +173,40 @@ def working_directory(path): yield finally: os.chdir(prev_cwd) + + +def get_engine(mode, params): + """ + Create an engine to run the jobs. + + Parameters + ---------- + mode : str + The type of engine to create + + params : dict + A dictionary containing parameters for the engine. + `get_engine` will retrieve from `params` only those parameters + needed and ignore the others. + """ + # a bit of a factory pattern here + # this might end up in another module but for now its fine here + if mode == 'hpc': + return partial( + HPCScheduler, + queue_limit=params['queue_limit'], + concat=params['concat'], + ) + + elif mode == 'local': + return partial( + Scheduler, + ncores=params['ncores'], + ) + + else: + available_engines = ('hpc', 'local') + raise ValueError( + f"Scheduler `mode` {mode!r} not recognized. " + f"Available options are {', '.join(available_engines)}" + ) diff --git a/src/haddock/modules/refinement/emref/__init__.py b/src/haddock/modules/refinement/emref/__init__.py index 9a92b31d4..b89c997c0 100644 --- a/src/haddock/modules/refinement/emref/__init__.py +++ b/src/haddock/modules/refinement/emref/__init__.py @@ -87,11 +87,8 @@ def _run(self): # Run CNS Jobs self.log(f"Running CNS Jobs n={len(jobs)}") - if self.params['mode'] == 'hpc': - engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], - concat=self.params["concat"]) - else: - engine = Scheduler(jobs, ncores=self.params['ncores']) + Engine = get_engine(self.params['mode'], self.params) + engine = Engine(jobs) engine.run() self.log("CNS jobs have finished") diff --git a/src/haddock/modules/refinement/flexref/__init__.py b/src/haddock/modules/refinement/flexref/__init__.py index 2524deff9..4daab90ad 100644 --- a/src/haddock/modules/refinement/flexref/__init__.py +++ b/src/haddock/modules/refinement/flexref/__init__.py @@ -86,11 +86,8 @@ def _run(self): # Run CNS Jobs self.log(f"Running CNS Jobs n={len(jobs)}") - if self.params['mode'] == 'hpc': - engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], - concat=self.params["concat"]) - else: - engine = Scheduler(jobs, ncores=self.params['ncores']) + Engine = get_engine(self.params['mode'], self.params) + engine = Engine(jobs) engine.run() self.log("CNS jobs have finished") diff --git a/src/haddock/modules/refinement/mdref/__init__.py b/src/haddock/modules/refinement/mdref/__init__.py index 5d677a843..ec36b8a29 100644 --- a/src/haddock/modules/refinement/mdref/__init__.py +++ b/src/haddock/modules/refinement/mdref/__init__.py @@ -84,11 +84,8 @@ def _run(self): # Run CNS Jobs self.log(f"Running CNS Jobs n={len(jobs)}") - if self.params['mode'] == 'hpc': - engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], - concat=self.params["concat"]) - else: - engine = Scheduler(jobs, ncores=self.params['ncores']) + Engine = get_engine(self.params['mode'], self.params) + engine = Engine(jobs) engine.run() self.log("CNS jobs have finished") diff --git a/src/haddock/modules/sampling/rigidbody/__init__.py b/src/haddock/modules/sampling/rigidbody/__init__.py index b9eb932b7..58c512b36 100644 --- a/src/haddock/modules/sampling/rigidbody/__init__.py +++ b/src/haddock/modules/sampling/rigidbody/__init__.py @@ -93,11 +93,8 @@ def _run(self): # Run CNS Jobs self.log(f"Running CNS Jobs n={len(jobs)}") - if self.params['mode'] == 'hpc': - engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], - concat=self.params["concat"]) - else: - engine = Scheduler(jobs, ncores=self.params['ncores']) + Engine = get_engine(self.params['mode'], self.params) + engine = Engine(jobs) engine.run() self.log("CNS jobs have finished") diff --git a/src/haddock/modules/topology/topoaa/__init__.py b/src/haddock/modules/topology/topoaa/__init__.py index d5fd2a738..3e0bb6edd 100644 --- a/src/haddock/modules/topology/topoaa/__init__.py +++ b/src/haddock/modules/topology/topoaa/__init__.py @@ -9,12 +9,10 @@ prepare_output, prepare_single_input, ) -from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import Format, ModuleIO, PDBFile, TopologyFile -from haddock.libs.libparallel import Scheduler from haddock.libs.libstructure import make_molecules from haddock.libs.libsubprocess import CNSJob -from haddock.modules import BaseHaddockModule +from haddock.modules import BaseHaddockModule, get_engine RECIPE_PATH = Path(__file__).resolve().parent @@ -149,11 +147,8 @@ def _run(self): # Run CNS Jobs self.log(f"Running CNS Jobs n={len(jobs)}") - if self.params['mode'] == 'hpc': - engine = HPCScheduler(jobs, queue_limit=self.params['queue_limit'], - concat=self.params["concat"]) - else: - engine = Scheduler(jobs, ncores=self.params['ncores']) + Engine = get_engine(self.params['mode'], self.params) + engine = Engine(jobs) engine.run() self.log("CNS jobs have finished") From de97525c1dba1b2630e123a20ab31908f9a94f3f Mon Sep 17 00:00:00 2001 From: joaomcteixeira Date: Fri, 10 Dec 2021 14:14:48 +0100 Subject: [PATCH 10/12] correct imports --- src/haddock/libs/libworkflow.py | 4 ++++ src/haddock/modules/refinement/emref/__init__.py | 4 +--- src/haddock/modules/refinement/flexref/__init__.py | 4 +--- src/haddock/modules/refinement/mdref/__init__.py | 4 +--- src/haddock/modules/sampling/rigidbody/__init__.py | 4 +--- 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/haddock/libs/libworkflow.py b/src/haddock/libs/libworkflow.py index d6ee00c73..f7545de47 100644 --- a/src/haddock/libs/libworkflow.py +++ b/src/haddock/libs/libworkflow.py @@ -8,6 +8,10 @@ from haddock.core.exceptions import HaddockError, StepError from haddock.gear.config_reader import get_module_name from haddock.libs.libutil import zero_fill +from haddock.libs.libhpc import ( + HPCScheduler_CONCAT_DEFAULT, + HPCWorker_QUEUE_LIMIT_DEFAULT, + ) from haddock.modules import modules_category diff --git a/src/haddock/modules/refinement/emref/__init__.py b/src/haddock/modules/refinement/emref/__init__.py index b89c997c0..da5481e1f 100644 --- a/src/haddock/modules/refinement/emref/__init__.py +++ b/src/haddock/modules/refinement/emref/__init__.py @@ -3,11 +3,9 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb -from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO -from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob -from haddock.modules import BaseHaddockModule +from haddock.modules import BaseHaddockModule, get_engine RECIPE_PATH = Path(__file__).resolve().parent diff --git a/src/haddock/modules/refinement/flexref/__init__.py b/src/haddock/modules/refinement/flexref/__init__.py index 4daab90ad..e7e79cbe7 100644 --- a/src/haddock/modules/refinement/flexref/__init__.py +++ b/src/haddock/modules/refinement/flexref/__init__.py @@ -3,11 +3,9 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb -from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO -from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob -from haddock.modules import BaseHaddockModule +from haddock.modules import BaseHaddockModule, get_engine RECIPE_PATH = Path(__file__).resolve().parent diff --git a/src/haddock/modules/refinement/mdref/__init__.py b/src/haddock/modules/refinement/mdref/__init__.py index ec36b8a29..da0a3b6ae 100644 --- a/src/haddock/modules/refinement/mdref/__init__.py +++ b/src/haddock/modules/refinement/mdref/__init__.py @@ -3,11 +3,9 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input, prepare_expected_pdb -from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO -from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob -from haddock.modules import BaseHaddockModule +from haddock.modules import BaseHaddockModule, get_engine RECIPE_PATH = Path(__file__).resolve().parent diff --git a/src/haddock/modules/sampling/rigidbody/__init__.py b/src/haddock/modules/sampling/rigidbody/__init__.py index 58c512b36..e5c6e48d8 100644 --- a/src/haddock/modules/sampling/rigidbody/__init__.py +++ b/src/haddock/modules/sampling/rigidbody/__init__.py @@ -3,11 +3,9 @@ from haddock.gear.haddockmodel import HaddockModel from haddock.libs.libcns import prepare_cns_input -from haddock.libs.libhpc import HPCScheduler from haddock.libs.libontology import ModuleIO, PDBFile -from haddock.libs.libparallel import Scheduler from haddock.libs.libsubprocess import CNSJob -from haddock.modules import BaseHaddockModule +from haddock.modules import BaseHaddockModule, get_engine RECIPE_PATH = Path(__file__).resolve().parent From b1d3f19ae9eb9f010ccfc617b1342b32c9b7ef11 Mon Sep 17 00:00:00 2001 From: joaomcteixeira Date: Fri, 10 Dec 2021 15:58:38 +0100 Subject: [PATCH 11/12] lint --- src/haddock/clis/cli_bm.py | 4 ++-- src/haddock/libs/libhpc.py | 13 +++++++++++-- src/haddock/libs/libworkflow.py | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/haddock/clis/cli_bm.py b/src/haddock/clis/cli_bm.py index 331d28013..29ea2160d 100644 --- a/src/haddock/clis/cli_bm.py +++ b/src/haddock/clis/cli_bm.py @@ -486,8 +486,8 @@ def make_daemon_job( ): """Make a daemon-ready job.""" job_header = create_job_func( - job_name, - workdir, + job_name=job_name, + work_dir=workdir, stdout_path=stdout_path, stderr_path=stderr_path, queue=queue, diff --git a/src/haddock/libs/libhpc.py b/src/haddock/libs/libhpc.py index a2d7b8648..b4891f756 100644 --- a/src/haddock/libs/libhpc.py +++ b/src/haddock/libs/libhpc.py @@ -34,6 +34,16 @@ def __init__( job_id=None, queue_type='slurm', ): + """ + Define the HPC job. + + Parameters + ---------- + tasks : list of libs.libcns.CNSJob objects + + num : int + The number of the worker. + """ self.tasks = tasks log.debug(f"HPCWorker ready with {len(self.tasks)}") self.job_num = num @@ -49,6 +59,7 @@ def __init__( self.queue_type = queue_type def prepare_job_file(self, queue_type='slurm'): + """Prepare the job file for all the jobs in the task list.""" job_file_contents = create_job_header_funcs[queue_type]( job_name='haddock3', queue='haddock', @@ -75,7 +86,6 @@ def prepare_job_file(self, queue_type='slurm'): self.job_fname.write_text(job_file_contents) - def run(self): """Execute the tasks.""" self.prepare_job_file(self.queue_type) @@ -100,7 +110,6 @@ def update_status(self): return self.job_status - def cancel(self, bypass_statuses=("finished", "failed")): """Cancel the execution.""" if self.update_status() not in bypass_statuses: diff --git a/src/haddock/libs/libworkflow.py b/src/haddock/libs/libworkflow.py index f7545de47..f648e938c 100644 --- a/src/haddock/libs/libworkflow.py +++ b/src/haddock/libs/libworkflow.py @@ -7,11 +7,11 @@ from haddock import log from haddock.core.exceptions import HaddockError, StepError from haddock.gear.config_reader import get_module_name -from haddock.libs.libutil import zero_fill from haddock.libs.libhpc import ( HPCScheduler_CONCAT_DEFAULT, HPCWorker_QUEUE_LIMIT_DEFAULT, ) +from haddock.libs.libutil import zero_fill from haddock.modules import modules_category From 17bf453c1d6b64a7a78f01e6522dc1af80ce76a0 Mon Sep 17 00:00:00 2001 From: joaomcteixeira Date: Fri, 10 Dec 2021 16:12:11 +0100 Subject: [PATCH 12/12] add default variable for mode --- src/haddock/libs/libworkflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/haddock/libs/libworkflow.py b/src/haddock/libs/libworkflow.py index f648e938c..42d056bfa 100644 --- a/src/haddock/libs/libworkflow.py +++ b/src/haddock/libs/libworkflow.py @@ -39,7 +39,7 @@ def __init__( run_dir=None, cns_exec=None, config_path=None, - mode=None, + mode='local', concat=HPCScheduler_CONCAT_DEFAULT, queue_limit=HPCWorker_QUEUE_LIMIT_DEFAULT, **ignore):