Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
46cb952
feat(hpc): add HPCDaskManager for SLURM cluster orchestration
harryswift01 May 12, 2026
8edd7d8
feat(hpc): include `dask-jobqueue` within `pyproject.toml`
harryswift01 May 12, 2026
0935f93
feat(hpc): include range for `dask-jobqueue` within `pyproject.toml`
harryswift01 May 12, 2026
4052e45
feat(hpc): fix dask importing warning
harryswift01 May 12, 2026
cd9241e
feat(parallel): add parallel helper functions
harryswift01 May 18, 2026
6135273
Merge remote-tracking branch 'origin/main' into 306-feature-dask-base…
harryswift01 May 18, 2026
98d95e1
Merge remote-tracking branch 'origin/main' into 306-feature-dask-base…
harryswift01 May 19, 2026
c2ac3e5
Merge remote-tracking branch 'origin/main' into 306-dask-parallel-imp…
harryswift01 May 22, 2026
e689e00
feat(parallel): wire frame-level Dask execution into workflow
harryswift01 May 27, 2026
1ee825c
tests(unit): tests for dask HPC integration
harryswift01 May 29, 2026
cbd9dca
tests(unit): additional unit tests for parallel frame execution
harryswift01 May 29, 2026
f8223b6
tests(unit): add additional tests for parallel and combine level dag …
harryswift01 May 29, 2026
b7cb020
tests(unit): add additional tests to argsparse for dask introduction
harryswift01 May 29, 2026
ee0b464
feat(parallel): add configurable Dask frame execution
harryswift01 May 29, 2026
89eb097
feat(parallel): add configurable Dask frame execution
harryswift01 May 29, 2026
978a57e
feat(parallel): add configurable Dask frame execution
harryswift01 May 29, 2026
5055b68
feat(parallel): add Dask frame execution and SLURM support
harryswift01 May 29, 2026
b3ef78f
tests(unit): ensure unit tests reference correct CodeEntropy SLURM
harryswift01 May 29, 2026
bf372a3
docs(parallel): document Dask and SLURM arguments
harryswift01 May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions CodeEntropy/config/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,96 @@ class ArgSpec:
help="Type of neighbor search to use."
"Default RAD; grid search is also available",
),
"parallel_frames": ArgSpec(
type=bool,
default=False,
help="Execute frame-local covariance calculations in parallel using Dask.",
),
"use_dask": ArgSpec(
type=bool,
default=False,
help="Enable local Dask frame parallelism.",
),
"dask_workers": ArgSpec(
type=int,
default=None,
help="Number of local Dask worker processes.",
),
"dask_threads_per_worker": ArgSpec(
type=int,
default=1,
help="Threads per local Dask worker. Use 1 for MDAnalysis trajectory safety.",
),
"hpc": ArgSpec(
type=bool,
default=False,
help="Use a SLURM-backed Dask cluster for parallel frame execution.",
),
"hpc_account": ArgSpec(
type=str,
default=None,
help="SLURM account/project code.",
),
"hpc_qos": ArgSpec(
type=str,
default=None,
help="Optional SLURM QoS.",
),
"hpc_constraint": ArgSpec(
type=str,
default=None,
help="Optional SLURM node constraint.",
),
"hpc_queue": ArgSpec(
type=str,
default=None,
help="SLURM partition/queue.",
),
"hpc_cores": ArgSpec(
type=int,
default=1,
help="Number of CPU cores per Dask worker job.",
),
"hpc_processes": ArgSpec(
type=int,
default=1,
help="Number of Dask worker processes per SLURM job.",
),
"hpc_memory": ArgSpec(
type=str,
default="4GB",
help="Memory requested per Dask worker job.",
),
"hpc_walltime": ArgSpec(
type=str,
default="01:00:00",
help="Walltime for each Dask worker job, formatted as HH:MM:SS.",
),
"hpc_nodes": ArgSpec(
type=int,
default=1,
help="Number of SLURM Dask worker jobs to launch.",
),
"submit": ArgSpec(
type=bool,
default=False,
help="Submit a master SLURM job instead of running immediately.",
),
"conda_path": ArgSpec(
type=str,
default="conda",
help="Path to conda executable used by SLURM worker prologue.",
),
"conda_exec": ArgSpec(
type=str,
default="conda",
help="Conda-compatible executable to use, usually conda or mamba.",
),
"conda_env": ArgSpec(
type=str,
default=None,
help="Conda environment name to activate on Dask workers.",
),
}


Expand Down Expand Up @@ -385,6 +475,7 @@ def validate_inputs(self, u: Any, args: argparse.Namespace) -> None:
self._check_input_bin_width(args)
self._check_input_temperature(args)
self._check_input_force_partitioning(args)
self._check_parallel_frame_options(args)

@staticmethod
def _check_input_start(u: Any, args: argparse.Namespace) -> None:
Expand Down Expand Up @@ -443,3 +534,50 @@ def _check_input_force_partitioning(self, args: argparse.Namespace) -> None:
args.force_partitioning,
default_value,
)

@staticmethod
def _check_parallel_frame_options(args: argparse.Namespace) -> None:
"""Validate local Dask, HPC Dask, and submit-related options."""
dask_workers = getattr(args, "dask_workers", None)
if dask_workers is not None and dask_workers < 1:
raise ValueError("'dask_workers' must be at least 1 if provided.")

dask_threads = getattr(args, "dask_threads_per_worker", 1)
if dask_threads < 1:
raise ValueError("'dask_threads_per_worker' must be at least 1.")

using_hpc = bool(getattr(args, "hpc", False))
submitting = bool(getattr(args, "submit", False))

if submitting and not using_hpc:
raise ValueError("'submit' requires 'hpc' to be enabled.")

if not using_hpc and not submitting:
return

if not getattr(args, "hpc_queue", None):
raise ValueError("'hpc_queue' must be provided when using HPC Dask.")

if getattr(args, "hpc_nodes", 1) < 1:
raise ValueError("'hpc_nodes' must be at least 1.")

if getattr(args, "hpc_cores", 1) < 1:
raise ValueError("'hpc_cores' must be at least 1.")

if getattr(args, "hpc_processes", 1) < 1:
raise ValueError("'hpc_processes' must be at least 1.")

if not getattr(args, "hpc_memory", None):
raise ValueError("'hpc_memory' must be provided when using HPC Dask.")

if not getattr(args, "hpc_walltime", None):
raise ValueError("'hpc_walltime' must be provided when using HPC Dask.")

if not getattr(args, "conda_env", None):
raise ValueError("'conda_env' must be provided when using HPC Dask.")

if not getattr(args, "conda_path", None):
raise ValueError("'conda_path' must be provided when using HPC Dask.")

if not getattr(args, "conda_exec", None):
raise ValueError("'conda_exec' must be provided when using HPC Dask.")
9 changes: 8 additions & 1 deletion CodeEntropy/config/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from rich.text import Text

from CodeEntropy.config.argparse import ConfigResolver
from CodeEntropy.core.dask_clusters import HPCDaskManager
from CodeEntropy.core.logging import LoggingConfig
from CodeEntropy.entropy.workflow import EntropyWorkflow
from CodeEntropy.levels.dihedrals import ConformationStateBuilder
Expand Down Expand Up @@ -223,8 +224,9 @@ def run_entropy_workflow(self) -> None:

This method:
- Sets up logging and prints the splash screen
- Loads YAML config from CWD and parses CLI args
- Loads YAML configuration from CWD and parses CLI args
- Merges args with YAML per-run config
- Optionally submits a master SLURM job and exits
- Builds the MDAnalysis Universe (with optional force merging)
- Validates user parameters
- Constructs dependencies and executes EntropyWorkflow
Expand Down Expand Up @@ -266,6 +268,11 @@ def run_entropy_workflow(self) -> None:

self._validate_required_args(args)

if getattr(args, "submit", False):
self._config_manager._check_parallel_frame_options(args)
HPCDaskManager(args).submit_master()
return

self.print_args_table(args)

universe_operations = UniverseOperations()
Expand Down
181 changes: 181 additions & 0 deletions CodeEntropy/core/dask_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
Helpers for setting up Dask clusters on HPC using SLURM.
"""

import os
import subprocess
import sys

import psutil
from dask.distributed import Client
from dask_jobqueue.slurm import SLURMCluster


class HPCDaskManager:
"""
Manage SLURM-backed Dask clusters and submission utilities for HPC environments.
"""

def __init__(self, args):
"""
Initialise HPCDaskManager with runtime arguments.

Args:
args: Parsed CLI arguments containing HPC and conda configuration.
"""
self.args = args

def check_slurm_env(self) -> None:
"""
Remove SLURM_CPU_BIND from environment if present.

Some HPC systems require this variable to be unset for correct CPU binding.
"""
os.environ.pop("SLURM_CPU_BIND", None)

def system_network_interface(self) -> str:
"""
Select the most appropriate network interface for HPC communication.

Returns:
str: Name of selected network interface.
"""
preferred_nics = ["bond0", "ib0", "hsn0", "eth0"]
interfaces = list(psutil.net_if_addrs().keys())

for iface in preferred_nics:
if iface in interfaces:
return iface

# fallback to first available interface
return interfaces[0]

def slurm_directives(self) -> tuple[list[str], list[str]]:
"""
Build SLURM job directives and skip list.

Returns:
Tuple[List[str], List[str]]:
- Extra SLURM directives
- Directives to skip
"""
args = self.args
extra: list[str] = []
skip: list[str] = ["--mem"]

if args.hpc_account:
extra.append(f'--account="{args.hpc_account}"')
if args.hpc_qos:
extra.append(f'--qos="{args.hpc_qos}"')
if args.hpc_constraint:
extra.append(f'--constraint="{args.hpc_constraint}"')

return extra, skip

def slurm_prologues(self) -> list[str]:
"""
Build SLURM job prologue commands for environment setup.

Returns:
List[str]: Shell commands executed before job start.
"""
args = self.args
prologue: list[str] = []

prologue.append(f'eval "$({args.conda_path} shell.bash hook)"')

if args.conda_exec == "mamba":
prologue.append(f'eval "$({args.conda_exec} shell hook --shell bash)"')

prologue.append(f"{args.conda_exec} activate {args.conda_env}")
prologue.append("export SLURM_CPU_FREQ_REQ=2250000")

return prologue

def configure_cluster(self) -> Client:
"""
Configure and launch a SLURM-backed Dask cluster.

Returns:
Client: Dask distributed client connected to cluster.
"""
args = self.args

extra, skip = self.slurm_directives()
prologue = self.slurm_prologues()
iface = self.system_network_interface()

self.check_slurm_env()

cluster = SLURMCluster(
cores=args.hpc_cores,
processes=args.hpc_processes,
memory=args.hpc_memory,
queue=args.hpc_queue,
job_directives_skip=skip,
job_extra_directives=extra,
python="srun python",
walltime=args.hpc_walltime,
shebang="#!/bin/bash --login",
local_directory="$PWD",
interface=iface,
job_script_prologue=prologue,
)

cluster.scale(jobs=args.hpc_nodes)

client = Client(cluster)

with open("dask-cluster-submit.sh", "w", encoding="utf-8") as f:
f.write(cluster.job_script())

return client

def submit_master(self) -> None:
"""
Submit a SLURM job that runs a master Dask orchestration process.

This generates a temporary SLURM script and submits it via `sbatch`.
"""
cli = list(sys.argv[1:])
if "--submit" in cli:
idx = cli.index("--submit")
cli.pop(idx)

if idx < len(cli) and str(cli[idx]).lower() in {"true", "false"}:
cli.pop(idx)

script_name = "CodeEntropy-master-submit.sh"

with open(script_name, "w", encoding="utf-8") as f:
f.write("#!/bin/bash --login\n\n")
f.write("#SBATCH --job-name=codeentropy-master\n")
f.write("#SBATCH --nodes=1\n")
f.write("#SBATCH --ntasks=1\n")
f.write("#SBATCH --cpus-per-task=2\n")
f.write(f"#SBATCH --time={self.args.hpc_walltime}\n")

if self.args.hpc_account:
f.write(f"#SBATCH --account={self.args.hpc_account}\n")

f.write(f"#SBATCH --partition={self.args.hpc_queue}\n")

if self.args.hpc_qos:
f.write(f"#SBATCH --qos={self.args.hpc_qos}\n")

f.write("\n")
f.write(f'eval "$({self.args.conda_path} shell.bash hook)"\n')

if self.args.conda_exec == "mamba":
f.write(f'eval "$({self.args.conda_exec} shell hook --shell bash)"\n')

f.write(f"{self.args.conda_exec} activate {self.args.conda_env}\n\n")
f.write(f"srun CodeEntropy {' '.join(cli)}")

self.check_slurm_env()

try:
result = subprocess.check_output(["bash", "-c", f"sbatch {script_name}"])
print(result.decode("utf-8"))
except subprocess.CalledProcessError as e:
print(e.output)
Loading