Skip to content

Commit

Permalink
Merge branch 'RESTAPI-1049-Get-partitions' into 'master'
Browse files Browse the repository at this point in the history
Restapi 1049 get partitions

See merge request firecrest/firecrest!292
  • Loading branch information
ekouts committed Apr 29, 2024
2 parents 1733f2c + 390226b commit d174ce3
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 15 deletions.
10 changes: 4 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add the endpoints `/compute/nodes` and `/compute/nodes/{nodeName}` to retrieve information about nodes in the scheduling queue.
- Added the endpoints `/compute/nodes` and `/compute/nodes/{nodeName}` to retrieve information about nodes in the scheduling queue.
- Added endpoints `POST /utilities/compress`, `POST /utilities/extract`, `POST /storage/xfer-internal/compress` and `POST /storage/xfer-internal/extract` for file compression and extraction.
- Added recurisive option to ls utilities command `&recursive=true`.
- Added the endpoint `/compute/partitions` to retrieve information about partitions in the scheduling queue.
- Added grep support for tail and head command. `&grep=pattern`
- Add `examples` directory for practical use cases of FirecREST.
- Added `examples` directory for practical use cases of FirecREST.

### Changed

Expand All @@ -32,10 +34,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- On task response metadata, the `task_url` value is now relative to the `/tasks` endpoint

- Add the endpoints `/compute/nodes` and `/compute/nodes/{nodeName}` to retrieve information about nodes in the scheduling queue.
- Added endpoints `POST /utilities/compress`, `POST /utilities/extract`, `POST /storage/xfer-internal/compress` and `POST /storage/xfer-internal/extract` for file compression and extraction.
- Added recurisive option to ls utilities command `&recursive=true`.

### Fixed

- Fixed error on pipeline when releasing production version.
Expand Down
47 changes: 47 additions & 0 deletions doc/openapi/firecrest-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,53 @@ paths:
description: User does not have permissions to access machine
schema:
type: integer
'/compute/partitions':
parameters:
- in: header
name: X-Machine-Name
description: The system name
required: true
schema:
type: string
get:
summary: Retrieves information about all partitions
description: Information about partitions in the scheduling queue.
tags:
- Compute
parameters:
- name: partitions
in: query
description: Comma-separated list of partitions to retrieve
schema:
type: array
items:
type: string
responses:
'200':
description: Task created
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Success'
'400':
description: Task creation error
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Error'
headers:
X-Machine-Does-Not-Exist:
description: Machine does not exist
schema:
type: integer
X-Machine-Not-Available:
description: Machine is not available
schema:
type: integer
X-Permission-Denied:
description: User does not have permissions to access machine
schema:
type: integer
'/storage/xfer-internal/rsync':
parameters:
- in: header
Expand Down
47 changes: 47 additions & 0 deletions doc/openapi/firecrest-developers-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1904,6 +1904,53 @@ paths:
description: User does not have permissions to access machine
schema:
type: integer
'/compute/partitions':
parameters:
- in: header
name: X-Machine-Name
description: The system name
required: true
schema:
type: string
get:
summary: Retrieves information about all partitions
description: Information about partitions in the scheduling queue.
tags:
- Compute
parameters:
- name: partitions
in: query
description: Comma-separated list of partitions to retrieve
schema:
type: array
items:
type: string
responses:
'200':
description: Task created
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Success'
'400':
description: Task creation error
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Error'
headers:
X-Machine-Does-Not-Exist:
description: Machine does not exist
schema:
type: integer
X-Machine-Not-Available:
description: Machine is not available
schema:
type: integer
X-Permission-Denied:
description: User does not have permissions to access machine
schema:
type: integer
'/storage/xfer-internal/rsync':
parameters:
- in: header
Expand Down
17 changes: 17 additions & 0 deletions src/common/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,23 @@ def parse_nodes_output(self, output):
"""
pass

@abc.abstractmethod
def get_partitions(self, partition_names=None):
"""Return the partitions of the system.
"""
pass

@abc.abstractmethod
def parse_partitions_output(self, output, partition_names=None):
"""Parses the partitions command. Should return records with:
* PartitionName
* State
* TotalCPUS
* TotalNodes
* Default
"""
pass

@abc.abstractmethod
def check_job_time(self, job_time):
""" Try to parse correctly the HH:MM:SS time format for the passed job_time argument. Accepted formats:
Expand Down
34 changes: 34 additions & 0 deletions src/common/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,40 @@ def parse_nodes_output(self, output):

return list(nodes)

def get_partitions(self, partition_names=None):
return "scontrol -a show -o partitions"

def parse_partitions_output(self, output, partition_names=None):
partitions_set = set(partition_names) if partition_names else None
partitions_descriptions = output.splitlines()
partitions = []
attributes = [
"PartitionName",
"State",
"TotalCPUs",
"TotalNodes",
"Default",
]
for part_descr in partitions_descriptions:
node_info = {}
for attr_name in attributes:
attr_match = re.search(rf'{attr_name}=(\S+)', part_descr)
if attr_match:
node_info[attr_name] = attr_match.group(1)
else:
raise ValueError(
f"Could not parse attribute '{attr_name}' in "
f"'{part_descr}'"
)

if (
partitions_set is None or
node_info["PartitionName"] in partitions_set
):
partitions.append(node_info)

return list(partitions)

def is_valid_accounting_time(self, sacct_time):
# HH:MM[:SS] [AM|PM]
# MMDD[YY] or MM/DD[/YY] or MM.DD[.YY]
Expand Down
114 changes: 112 additions & 2 deletions src/compute/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@

# Internal microservices communication
## certificator
CERTIFICATOR_HOST = os.environ.get("F7T_CERTIFICATOR_HOST","127.0.0.1")
CERTIFICATOR_HOST = os.environ.get("F7T_CERTIFICATOR_HOST","127.0.0.1")
CERTIFICATOR_PORT = os.environ.get("F7T_CERTIFICATOR_PORT","5000")
CERTIFICATOR_URL = f"{F7T_SCHEME_PROTOCOL}://{CERTIFICATOR_HOST}:{CERTIFICATOR_PORT}"
## tasks
TASKS_HOST = os.environ.get("F7T_TASKS_HOST","127.0.0.1")
TASKS_HOST = os.environ.get("F7T_TASKS_HOST","127.0.0.1")
TASKS_PORT = os.environ.get("F7T_TASKS_PORT","5003")
TASKS_URL = f"{F7T_SCHEME_PROTOCOL}://{TASKS_HOST}:{TASKS_PORT}"

Expand Down Expand Up @@ -1013,6 +1013,34 @@ def nodes_task(headers, system_name, system_addr, action, task_id):
update_task(task_id, headers, async_task.SUCCESS, jobs, is_json=True)


def partitions_task(headers, system_name, system_addr, action, task_id, partitions_list):
# exec remote command
resp = exec_remote_command(headers, system_name, system_addr, action)

# in case of error:
if resp["error"] == -2:
update_task(task_id, headers, async_task.ERROR, "Machine is not available")
return

# in case of error:
if resp["error"] != 0:
err_msg = resp["msg"]
if in_str(err_msg,"OPENSSH"):
err_msg = "User does not have permissions to access machine"
update_task(task_id, headers, async_task.ERROR, err_msg)
return

try:
jobs = scheduler.parse_partitions_output(resp["msg"], partitions_list)
app.logger.info(f"Number of partitions: {len(jobs)}")
except ValueError as e:
update_task(task_id, headers, async_task.ERROR, str(e))
return

# as it is a json data to be stored in Tasks, the is_json=True
update_task(task_id, headers, async_task.SUCCESS, jobs, is_json=True)


# Job account information
@app.route("/acct",methods=["GET"])
@check_auth_header
Expand Down Expand Up @@ -1247,6 +1275,88 @@ def get_node(nodeName):
data = jsonify(description="Failed to retrieve node information",error=e)
return data, 400


@app.route("/partitions", methods=["GET"])
@check_auth_header
def get_partitions():
try:
system_name = request.headers["X-Machine-Name"]
except KeyError:
app.logger.error("No machinename given")
return jsonify(description="No machine name given"), 400

# public endpoints from Kong to users
if system_name not in SYSTEMS_PUBLIC:
header = {"X-Machine-Does-Not-Exists": "Machine does not exists"}
return jsonify(description="Failed to retrieve account information", error="Machine does not exists"), 400, header

# select index in the list corresponding with machine name
system_idx = SYSTEMS_PUBLIC.index(system_name)
system_addr = SYSTEMS_INTERNAL_COMPUTE[system_idx]

[headers, ID] = get_tracing_headers(request)
# check if machine is accessible by user:
resp = exec_remote_command(
headers,
system_name,
system_addr,
f"ID={ID} true"
)

if resp["error"] != 0:
error_str = resp["msg"]
if resp["error"] == -2:
header = {"X-Machine-Not-Available": "Machine is not available"}
return jsonify(description="Failed to retrieve account information"), 400, header
if in_str(error_str, "Permission") or in_str(error_str, "OPENSSH"):
header = {"X-Permission-Denied": "User does not have permissions to access machine or path"}
return jsonify(description="Failed to retrieve account information"), 404, header

partitions = request.args.get("partitions", None)
partitions_list = None
if partitions is not None:
v = validate_input(partitions)
if v != "":
return jsonify(description="Failed to retrieve partitions information", error=f"partition '{partitions}' {v}"), 400

try:
partitions_list = partitions.split(",")
except:
return jsonify(description="Failed to retrieve partitions information", error="Partitions list wrong format"), 400

# In Slurm we are not actually using the partition_names argument
# for the command but it can be used for other schedulers
sched_cmd = scheduler.get_partitions(partitions_list)
action = f"ID={ID} {sched_cmd}"

try:
# obtain new task from Tasks microservice
task_id = create_task(headers, service="compute", system=system_name)

# if error in creating task:
if task_id == -1:
return jsonify(description="Failed to retrieve partitions information", error='Error creating task'), 400

update_task(task_id, headers, async_task.QUEUED)

# asynchronous task creation
aTask = threading.Thread(target=partitions_task, name=ID,
args=(headers, system_name, system_addr, action, task_id, partitions_list))

aTask.start()
task_url = f"/tasks/{task_id}"

data = jsonify(
success="Task created", task_id=task_id, task_url=task_url
)
return data, 200

except Exception as e:
data = jsonify(
description="Failed to retrieve partitions information", error=e
)
return data, 400

@app.route("/status",methods=["GET"])
@check_auth_header
def status():
Expand Down
40 changes: 35 additions & 5 deletions src/tests/automated_tests/integration/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
UTILITIES_URL = os.environ.get("FIRECREST_URL") + "/utilities"
else:
F7T_SCHEME_PROTOCOL = ("https" if SSL_ENABLED else "http")
TASKS_HOST = os.environ.get("F7T_TASKS_HOST","127.0.0.1")

TASKS_HOST = os.environ.get("F7T_TASKS_HOST","127.0.0.1")
TASKS_PORT = os.environ.get("F7T_TASKS_PORT","5003")
TASKS_URL = f"{F7T_SCHEME_PROTOCOL}://{TASKS_HOST}:{TASKS_PORT}"
COMPUTE_HOST = os.environ.get("F7T_COMPUTE_HOST","127.0.0.1")

COMPUTE_HOST = os.environ.get("F7T_COMPUTE_HOST","127.0.0.1")
COMPUTE_PORT = os.environ.get("F7T_COMPUTE_PORT","5006")
COMPUTE_URL = f"{F7T_SCHEME_PROTOCOL}://{COMPUTE_HOST}:{COMPUTE_PORT}"

UTILITIES_HOST = os.environ.get("F7T_UTILITIES_HOST","127.0.0.1")
UTILITIES_HOST = os.environ.get("F7T_UTILITIES_HOST","127.0.0.1")
UTILITIES_PORT = os.environ.get("F7T_UTILITIES_PORT","5004")
UTILITIES_URL = f"{F7T_SCHEME_PROTOCOL}://{UTILITIES_HOST}:{UTILITIES_PORT}"

Expand Down Expand Up @@ -205,6 +205,36 @@ def test_nodes(machine, headers):
check_task_status(task_id, headers)


# Test partitions information
@skipif_not_uses_gateway
@pytest.mark.parametrize("machine", [SERVER_COMPUTE])
def test_partitions(machine, headers):
url = f"{COMPUTE_URL}/partitions"
headers.update({"X-Machine-Name": machine})
resp = requests.get(url, headers=headers, verify=False)
print(resp.content)
assert resp.status_code == 200

# check scancel status
task_id = resp.json()["task_id"]
check_task_status(task_id, headers)


@skipif_not_uses_gateway
@pytest.mark.parametrize("machine", [SERVER_COMPUTE])
def test_partitions_xfer(machine, headers):
url = f"{COMPUTE_URL}/partitions"
headers.update({"X-Machine-Name": machine})
params = {"partitions": "xfer"}
resp = requests.get(url, headers=headers, params=params, verify=False)
print(resp.content)
assert resp.status_code == 200

# check scancel status
task_id = resp.json()["task_id"]
check_task_status(task_id, headers)


if __name__ == '__main__':
pytest.main()

Expand Down
Loading

0 comments on commit d174ce3

Please sign in to comment.