Skip to content

Commit

Permalink
Merge branch 'RESTAPI-1049-Get-reservations' into 'master'
Browse files Browse the repository at this point in the history
Restapi 1049 get reservations

See merge request firecrest/firecrest!296
  • Loading branch information
ekouts committed May 2, 2024
2 parents d174ce3 + 283ea33 commit ed52231
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 7 deletions.
5 changes: 5 additions & 0 deletions deploy/test-build/cluster/ssh/ssh_command_wrapper.sh
Expand Up @@ -23,6 +23,11 @@ if [ "${SSH_EXECUTE:0:3}" == "ID=" -o "${SSH_EXECUTE:0:6}" == "F7TID=" ]; then
actual="${SSH_EXECUTE}"
fi

# Remove the SLURM_TIME_FORMAT=standard prefix if present
if [[ "${actual:0:26}" == "SLURM_TIME_FORMAT=standard" ]]; then
actual="${actual#* }" # remove everything before the first space, including the space
fi

command="${actual%% *}" # remove all after first space

case "$command" in
Expand Down
47 changes: 47 additions & 0 deletions doc/openapi/firecrest-api.yaml
Expand Up @@ -1963,6 +1963,53 @@ paths:
description: User does not have permissions to access machine
schema:
type: integer
'/compute/reservations':
parameters:
- in: header
name: X-Machine-Name
description: The system name
required: true
schema:
type: string
get:
summary: Retrieves information about all reservations
description: Information about reservations in the scheduling queue.
tags:
- Compute
parameters:
- name: partitions
in: query
description: Comma-separated list of reservations to retrieve
schema:
type: array
items:
type: string
responses:
'200':
description: Task created
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Success'
'400':
description: Task creation error
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Error'
headers:
X-Machine-Does-Not-Exist:
description: Machine does not exist
schema:
type: integer
X-Machine-Not-Available:
description: Machine is not available
schema:
type: integer
X-Permission-Denied:
description: User does not have permissions to access machine
schema:
type: integer
'/storage/xfer-internal/rsync':
parameters:
- in: header
Expand Down
47 changes: 47 additions & 0 deletions doc/openapi/firecrest-developers-api.yaml
Expand Up @@ -1951,6 +1951,53 @@ paths:
description: User does not have permissions to access machine
schema:
type: integer
'/compute/reservations':
parameters:
- in: header
name: X-Machine-Name
description: The system name
required: true
schema:
type: string
get:
summary: Retrieves information about all reservations
description: Information about reservations in the scheduling queue.
tags:
- Compute
parameters:
- name: partitions
in: query
description: Comma-separated list of reservations to retrieve
schema:
type: array
items:
type: string
responses:
'200':
description: Task created
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Success'
'400':
description: Task creation error
content:
application/json:
schema:
$ref: '#/components/schemas/Task-Creation-Error'
headers:
X-Machine-Does-Not-Exist:
description: Machine does not exist
schema:
type: integer
X-Machine-Not-Available:
description: Machine is not available
schema:
type: integer
X-Permission-Denied:
description: User does not have permissions to access machine
schema:
type: integer
'/storage/xfer-internal/rsync':
parameters:
- in: header
Expand Down
21 changes: 19 additions & 2 deletions src/common/schedulers/__init__.py
Expand Up @@ -176,12 +176,29 @@ def parse_partitions_output(self, output, partition_names=None):
"""
pass

@abc.abstractmethod
def get_reservations(self, reservation_names=None):
"""Return the reservations command of the system.
"""
pass

@abc.abstractmethod
def parse_reservations_output(self, output, reservation_names=None):
"""Parses the reservations command. Should return records with:
* ReservationName
* State
* Nodes
* StartTime
* EndTime
* Features
"""
pass

@abc.abstractmethod
def check_job_time(self, job_time):
""" Try to parse correctly the HH:MM:SS time format for the passed job_time argument. Accepted formats:
"""Try to parse correctly the HH:MM:SS time format for the passed job_time argument. Accepted formats:
* MM MM:SS
* HH:MM:SS
* DD-HH DD-HH:MM
* DD-HH:MM:SS
"""
pass
46 changes: 41 additions & 5 deletions src/common/schedulers/slurm.py
Expand Up @@ -277,11 +277,11 @@ def parse_partitions_output(self, output, partition_names=None):
"Default",
]
for part_descr in partitions_descriptions:
node_info = {}
part_info = {}
for attr_name in attributes:
attr_match = re.search(rf'{attr_name}=(\S+)', part_descr)
if attr_match:
node_info[attr_name] = attr_match.group(1)
part_info[attr_name] = attr_match.group(1)
else:
raise ValueError(
f"Could not parse attribute '{attr_name}' in "
Expand All @@ -290,11 +290,47 @@ def parse_partitions_output(self, output, partition_names=None):

if (
partitions_set is None or
node_info["PartitionName"] in partitions_set
part_info["PartitionName"] in partitions_set
):
partitions.append(node_info)
partitions.append(part_info)

return partitions

def get_reservations(self, reservation_names=None):
return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations"

def parse_reservations_output(self, output, reservation_names=None):
if output == "No reservations in the system":
return []

reservations_set = set(reservation_names) if reservation_names else None
reservations_descriptions = output.splitlines()
reservations = []
attribute_seps = {
"ReservationName": None,
"State": None,
"Nodes": None,
"StartTime": None,
"EndTime": None,
"Features": "&",
}
for res_descr in reservations_descriptions:
res_info = {}
for attr_name, sep in attribute_seps.items():
attr_match = re.search(rf'{attr_name}=(\S+)', res_descr)
if attr_match:
attr = attr_match.group(1)
res_info[attr_name] = attr.split(sep) if sep else attr
else:
raise ValueError(
f"Could not parse attribute '{attr_name}' in "
f"'{res_descr}'"
)

if reservations_set is None or res_info["ReservationName"] in reservations_set:
reservations.append(res_info)

return list(partitions)
return reservations

def is_valid_accounting_time(self, sacct_time):
# HH:MM[:SS] [AM|PM]
Expand Down
110 changes: 110 additions & 0 deletions src/compute/compute.py
Expand Up @@ -1041,6 +1041,34 @@ def partitions_task(headers, system_name, system_addr, action, task_id, partitio
update_task(task_id, headers, async_task.SUCCESS, jobs, is_json=True)


def reservations_task(headers, system_name, system_addr, action, task_id, reservations_list=None):
# exec remote command
resp = exec_remote_command(headers, system_name, system_addr, action)

# in case of error:
if resp["error"] == -2:
update_task(task_id, headers, async_task.ERROR, "Machine is not available")
return

# in case of error:
if resp["error"] != 0:
err_msg = resp["msg"]
if in_str(err_msg,"OPENSSH"):
err_msg = "User does not have permissions to access machine"
update_task(task_id, headers, async_task.ERROR, err_msg)
return

try:
reservations = scheduler.parse_reservations_output(resp["msg"], reservations_list)
app.logger.info(f"Number of reservations: {len(reservations)}")
except ValueError as e:
update_task(task_id, headers, async_task.ERROR, str(e))
return

# as it is a json data to be stored in Tasks, the is_json=True
update_task(task_id, headers, async_task.SUCCESS, reservations, is_json=True)


# Job account information
@app.route("/acct",methods=["GET"])
@check_auth_header
Expand Down Expand Up @@ -1357,6 +1385,88 @@ def get_partitions():
)
return data, 400

@app.route("/reservations", methods=["GET"])
@check_auth_header
def get_reservations():
try:
system_name = request.headers["X-Machine-Name"]
except KeyError:
app.logger.error("No machinename given")
return jsonify(description="No machine name given"), 400

# public endpoints from Kong to users
if system_name not in SYSTEMS_PUBLIC:
header = {"X-Machine-Does-Not-Exists": "Machine does not exists"}
return jsonify(description="Failed to retrieve account information", error="Machine does not exists"), 400, header

# select index in the list corresponding with machine name
system_idx = SYSTEMS_PUBLIC.index(system_name)
system_addr = SYSTEMS_INTERNAL_COMPUTE[system_idx]

[headers, ID] = get_tracing_headers(request)
# check if machine is accessible by user:
resp = exec_remote_command(
headers,
system_name,
system_addr,
f"ID={ID} true"
)

if resp["error"] != 0:
error_str = resp["msg"]
if resp["error"] == -2:
header = {"X-Machine-Not-Available": "Machine is not available"}
return jsonify(description="Failed to retrieve account information"), 400, header
if in_str(error_str, "Permission") or in_str(error_str, "OPENSSH"):
header = {"X-Permission-Denied": "User does not have permissions to access machine or path"}
return jsonify(description="Failed to retrieve account information"), 404, header

reservations = request.args.get("reservations", None)
reservations_list = None
if reservations is not None:
v = validate_input(reservations)
if v != "":
return jsonify(description="Failed to retrieve reservations information", error=f"reservations '{reservations}' {v}"), 400

try:
reservations_list = reservations.split(",")
except:
return jsonify(description="Failed to retrieve reservations information", error="Reservations list wrong format"), 400

# In Slurm we are not actually using the reservations_names argument
# for the command but it can be used for other schedulers
sched_cmd = scheduler.get_reservations(reservations_list)
action = f"ID={ID} {sched_cmd}"

try:
# obtain new task from Tasks microservice
task_id = create_task(headers, service="compute", system=system_name)

# if error in creating task:
if task_id == -1:
return jsonify(description="Failed to retrieve reservations information", error='Error creating task'), 400

update_task(task_id, headers, async_task.QUEUED)

# asynchronous task creation
aTask = threading.Thread(target=reservations_task, name=ID,
args=(headers, system_name, system_addr, action, task_id, reservations_list))

aTask.start()
task_url = f"/tasks/{task_id}"

data = jsonify(
success="Task created", task_id=task_id, task_url=task_url
)
return data, 200

except Exception as e:
data = jsonify(
description="Failed to retrieve reservations information", error=e
)
return data, 400


@app.route("/status",methods=["GET"])
@check_auth_header
def status():
Expand Down
14 changes: 14 additions & 0 deletions src/tests/automated_tests/integration/test_compute.py
Expand Up @@ -235,6 +235,20 @@ def test_partitions_xfer(machine, headers):
check_task_status(task_id, headers)


@skipif_not_uses_gateway
@pytest.mark.parametrize("machine", [SERVER_COMPUTE])
def test_reservations(machine, headers):
url = f"{COMPUTE_URL}/reservations"
headers.update({"X-Machine-Name": machine})
resp = requests.get(url, headers=headers, verify=False)
print(resp.content)
assert resp.status_code == 200

# check scancel status
task_id = resp.json()["task_id"]
check_task_status(task_id, headers)


if __name__ == '__main__':
pytest.main()

Expand Down
11 changes: 11 additions & 0 deletions src/tests/automated_tests/unit/test_unit_compute.py
Expand Up @@ -192,6 +192,17 @@ def test_partition_xfer(machine, expected_response_code, headers):
assert resp.status_code == expected_response_code


# Test get reservations information
@skipif_not_uses_gateway
@pytest.mark.parametrize("machine, expected_response_code", DATA)
def test_reservations(machine, expected_response_code, headers):
url = f"{COMPUTE_URL}/reservations"
headers.update({"X-Machine-Name": machine})
resp = requests.get(url, headers=headers, verify=False)
print(resp.content)
assert resp.status_code == expected_response_code


# Test get status of Jobs microservice
@skipif_uses_gateway
def test_status(headers):
Expand Down

0 comments on commit ed52231

Please sign in to comment.