Skip to content

Commit

Permalink
feat(service): add sinfo command to retrieve cluster load
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle committed Nov 17, 2023
1 parent 83065ef commit 3905bcf
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 107 deletions.
2 changes: 2 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class SlurmConfig:
default_time_limit: int = 0
default_json_db_name: str = ""
slurm_script_path: str = ""
partition: str = ""
max_cores: int = 64
antares_versions_on_remote_server: List[str] = field(default_factory=list)
enable_nb_cores_detection: bool = False
Expand Down Expand Up @@ -288,6 +289,7 @@ def from_dict(cls, data: JSON) -> "SlurmConfig":
default_time_limit=data.get("default_time_limit", defaults.default_time_limit),
default_json_db_name=data.get("default_json_db_name", defaults.default_json_db_name),
slurm_script_path=data.get("slurm_script_path", defaults.slurm_script_path),
partition=data["partition"],
antares_versions_on_remote_server=data.get(
"antares_versions_on_remote_server",
defaults.antares_versions_on_remote_server,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def _init_launcher_parameters(self, local_workspace: Optional[Path] = None) -> M
json_dir=local_workspace or self.slurm_config.local_workspace,
default_json_db_name=self.slurm_config.default_json_db_name,
slurm_script_path=self.slurm_config.slurm_script_path,
partition=self.slurm_config.partition,
antares_versions_on_remote_server=self.slurm_config.antares_versions_on_remote_server,
default_ssh_dict={
"username": self.slurm_config.username,
Expand Down
7 changes: 7 additions & 0 deletions antarest/launcher/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,10 @@ class JobCreationDTO(BaseModel):

class LauncherEnginesDTO(BaseModel):
engines: List[str]


class LauncherLoadDTO(BaseModel):
allocated_cpu_rate: float
cluster_load_rate: float
nb_queued_jobs: int
launcher_status: str
73 changes: 37 additions & 36 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import json
import logging
import os
import shutil
Expand Down Expand Up @@ -33,11 +32,14 @@
JobLogType,
JobResult,
JobStatus,
LauncherLoadDTO,
LauncherParametersDTO,
LogType,
XpansionParametersDTO,
)
from antarest.launcher.repository import JobResultRepository
from antarest.launcher.ssh_client import calculates_slurm_load
from antarest.launcher.ssh_config import SSHConfigDTO
from antarest.study.service import StudyService
from antarest.study.storage.utils import assert_permission, extract_output_name, find_single_output_path

Expand Down Expand Up @@ -606,43 +608,42 @@ def download_output(self, job_id: str, params: RequestParameters) -> FileDownloa
)
raise JobNotFound()

def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
all_running_jobs = self.job_result_repository.get_running()
local_running_jobs = []
slurm_running_jobs = []
for job in all_running_jobs:
if job.launcher == "slurm":
slurm_running_jobs.append(job)
elif job.launcher == "local":
local_running_jobs.append(job)
def get_load(self) -> LauncherLoadDTO:
# slurm load calcul
if self.config.launcher.default == "slurm":
if slurm_config := self.config.launcher.slurm:
ssh_config = SSHConfigDTO(
config_path=Path(),
username=slurm_config.username,
hostname=slurm_config.hostname,
port=slurm_config.port,
private_key_file=slurm_config.private_key_file,
key_password=slurm_config.key_password,
password=slurm_config.password,
)
partition = slurm_config.partition
slurm_load = calculates_slurm_load(ssh_config, partition)
return LauncherLoadDTO(
allocated_cpu_rate=slurm_load[0],
cluster_load_rate=slurm_load[1],
nb_queued_jobs=slurm_load[2],
launcher_status="SUCCESS",
)
else:
logger.warning(f"Unknown job launcher {job.launcher}")

load = {}

slurm_config = self.config.launcher.slurm
if slurm_config is not None:
if from_cluster:
raise NotImplementedError("Cluster load not implemented yet")
default_cpu = slurm_config.nb_cores.default
slurm_used_cpus = 0
for job in slurm_running_jobs:
obj = json.loads(job.launcher_params) if job.launcher_params else {}
launch_params = LauncherParametersDTO(**obj)
slurm_used_cpus += launch_params.nb_cpu or default_cpu
load["slurm"] = slurm_used_cpus / slurm_config.max_cores
raise KeyError("Default launcher is slurm but it is not registered in the config file")

local_config = self.config.launcher.local
if local_config is not None:
default_cpu = local_config.nb_cores.default
local_used_cpus = 0
for job in local_running_jobs:
obj = json.loads(job.launcher_params) if job.launcher_params else {}
launch_params = LauncherParametersDTO(**obj)
local_used_cpus += launch_params.nb_cpu or default_cpu
load["local"] = local_used_cpus / local_config.nb_cores.max

return load
# local load calcul
local_used_cpus = sum(
LauncherParametersDTO.parse_raw(job.launcher_params or "{}").nb_cpu or 1
for job in self.job_result_repository.get_running()
)
cluster_load_approximation = min(1.0, float(local_used_cpus) / (os.cpu_count() or 1))
return LauncherLoadDTO(
allocated_cpu_rate=cluster_load_approximation,
cluster_load_rate=cluster_load_approximation,
nb_queued_jobs=0,
launcher_status="SUCCESS",
)

def get_solver_versions(self, solver: str) -> List[str]:
"""
Expand Down
108 changes: 108 additions & 0 deletions antarest/launcher/ssh_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import contextlib
import socket
from typing import Any, List, Tuple

import paramiko

from antarest.launcher.ssh_config import SSHConfigDTO


@contextlib.contextmanager # type: ignore
def ssh_client(ssh_config: SSHConfigDTO) -> paramiko.SSHClient: # type: ignore
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=ssh_config.hostname,
port=ssh_config.port,
username=ssh_config.username,
pkey=paramiko.RSAKey.from_private_key_file(filename=str(ssh_config.private_key_file)),
timeout=600,
allow_agent=False,
)
with contextlib.closing(client):
yield client


class SlurmError(Exception):
pass


def execute_command(ssh_config: SSHConfigDTO, args: List[str]) -> Any:
command = " ".join(args)
try:
with ssh_client(ssh_config) as client: # type: ignore
stdin, stdout, stderr = client.exec_command(command, timeout=10)
output = stdout.read().decode("utf-8").strip()
error = stderr.read().decode("utf-8").strip()
except (
paramiko.AuthenticationException,
paramiko.SSHException,
socket.timeout,
socket.error,
) as e:
raise SlurmError("Can't retrieve SLURM information") from e
if error:
raise SlurmError(f"Can't retrieve SLURM information: {error}")
return output


def parse_cpu_used(sinfo_output: str) -> float:
cpu_info_splitted = sinfo_output.split("/")
cpu_used_count = int(cpu_info_splitted[0])
cpu_inactive_count = int(cpu_info_splitted[1])
return 100 * cpu_used_count / (cpu_used_count + cpu_inactive_count)


def parse_cpu_load(sinfo_output: str) -> float:
lines = sinfo_output.splitlines()
cpus_used = 0.0
cpus_available = 0.0
for line in lines:
values = line.split()
if "N/A" in values:
continue
cpus_used += float(values[0])
cpus_available += float(values[1])
ratio = cpus_used / max(cpus_available, 1)
return 100 * min(1.0, ratio)


def calculates_slurm_load(ssh_config: SSHConfigDTO, partition: str) -> Tuple[float, float, int]:
# allocated cpus
sinfo_cpus_used = execute_command(
ssh_config,
["sinfo", "--partition", partition, "-O", "NodeAIOT", "--noheader"],
)
allocated_cpus = parse_cpu_used(sinfo_cpus_used)
# cluster load
sinfo_cpus_load = execute_command(
ssh_config,
[
"sinfo",
"--partition",
partition,
"-N",
"-O",
"CPUsLoad,CPUs",
"--noheader",
],
)
cluster_load = parse_cpu_load(sinfo_cpus_load)
# queued jobs
queued_jobs = int(
execute_command(
ssh_config,
[
"squeue",
"--partition",
partition,
"--noheader",
"-t",
"pending",
"|",
"wc",
"-l",
],
)
)
return allocated_cpus, cluster_load, queued_jobs
21 changes: 21 additions & 0 deletions antarest/launcher/ssh_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pathlib
from typing import Any, Dict, Optional

import paramiko
from pydantic import BaseModel, root_validator


class SSHConfigDTO(BaseModel):
config_path: pathlib.Path
username: str
hostname: str
port: int = 22
private_key_file: Optional[pathlib.Path] = None
key_password: Optional[str] = ""
password: Optional[str] = ""

@root_validator()
def validate_connection_information(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if "private_key_file" not in values and "password" not in values:
raise paramiko.AuthenticationException("SSH config needs at least a private key or a password")
return values
28 changes: 21 additions & 7 deletions antarest/launcher/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@
from antarest.core.jwt import JWTUser
from antarest.core.requests import RequestParameters
from antarest.core.utils.web import APITag
from antarest.launcher.model import JobCreationDTO, JobResultDTO, LauncherEnginesDTO, LauncherParametersDTO, LogType
from antarest.launcher.model import (
JobCreationDTO,
JobResultDTO,
LauncherEnginesDTO,
LauncherLoadDTO,
LauncherParametersDTO,
LogType,
)
from antarest.launcher.service import LauncherService
from antarest.launcher.ssh_client import SlurmError
from antarest.login.auth import Auth

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -185,14 +193,20 @@ def get_engines() -> Any:
"/launcher/load",
tags=[APITag.launcher],
summary="Get the cluster load in usage percent",
response_model=LauncherLoadDTO,
)
def get_load(
from_cluster: bool = False,
current_user: JWTUser = Depends(auth.get_current_user),
) -> Dict[str, float]:
params = RequestParameters(user=current_user)
def get_load() -> LauncherLoadDTO:
logger.info("Fetching launcher load")
return service.get_load(from_cluster)
try:
return service.get_load()
except SlurmError as e:
logger.warning(e, exc_info=e)
return LauncherLoadDTO(
allocated_cpus=0.0,
cluster_load=0.0,
queued_jobs=0,
launcher_status=f"FAILED: {e}",
)

@bp.get(
"/launcher/versions",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jsonref~=0.2
MarkupSafe~=2.0.1
numpy~=1.22.1
pandas~=1.4.0
paramiko~=2.12.0
plyer~=2.0.0
psycopg2-binary==2.9.4
py7zr~=0.20.6
Expand Down
1 change: 1 addition & 0 deletions resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ launcher:
# max: 24
# default_json_db_name: launcher_db.json
# slurm_script_path: /path/to/launchantares_v1.1.3.sh
# partition: specific_partition_for_the_resource_allocation
# db_primary_key: name
# antares_versions_on_remote_server :
# - "610"
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import io
import os
from http import HTTPStatus
from pathlib import Path
from unittest.mock import ANY

from starlette.testclient import TestClient

from antarest.core.model import PublicMode
from antarest.launcher.model import LauncherLoadDTO
from antarest.study.business.adequacy_patch_management import PriceTakingOrder
from antarest.study.business.area_management import AreaType, LayerInfoDTO
from antarest.study.business.areas.properties_management import AdequacyPatchMode
Expand Down Expand Up @@ -345,6 +347,14 @@ def test_main(client: TestClient, admin_access_token: str, study_id: str) -> Non
headers={"Authorization": f'Bearer {fred_credentials["access_token"]}'},
)
job_id = res.json()["job_id"]

load = client.get("/v1/launcher/load", headers=admin_headers).content
launcher_load = LauncherLoadDTO.parse_raw(load)
assert launcher_load.allocated_cpu_rate == 1 / (os.cpu_count() or 1)
assert launcher_load.cluster_load_rate == 1 / (os.cpu_count() or 1)
assert launcher_load.nb_queued_jobs == 0
assert launcher_load.launcher_status == "SUCCESS"

res = client.get(
f"/v1/launcher/jobs?study_id={study_id}",
headers={"Authorization": f'Bearer {fred_credentials["access_token"]}'},
Expand Down

0 comments on commit 3905bcf

Please sign in to comment.