From df8025efa86bd5a47436c7ebd4c5a4e282715ad1 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 12 Apr 2024 15:31:46 +0200 Subject: [PATCH 01/10] Add reservations endpoint --- CHANGELOG.md | 1 + src/common/schedulers/__init__.py | 21 +++- src/common/schedulers/slurm.py | 41 +++++++- src/compute/compute.py | 161 +++++++++++++++++++++++++++++- 4 files changed, 216 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f91274..f1e5c9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added recurisive option to ls utilities command `&recursive=true`. - Add the endpoints `/compute/partitions` and `/compute/partitions/{partitionName}` to retrieve information about partitions in the scheduling queue. - Add `examples` directory for practical use cases of FirecREST. +- Add the endpoints `/compute/reservations` and `/compute/reservations/{reservationName}` to retrieve information about reservations in the scheduling queue. ### Changed diff --git a/src/common/schedulers/__init__.py b/src/common/schedulers/__init__.py index 709c794..6687ebf 100644 --- a/src/common/schedulers/__init__.py +++ b/src/common/schedulers/__init__.py @@ -146,7 +146,7 @@ def parse_nodes_output(self, output): @abc.abstractmethod def get_partitions(self, partitions): - """Return the partitions of the system. + """Return the partitions command of the system. """ pass @@ -160,3 +160,22 @@ def parse_partitions_output(self, output): * Default """ pass + + @abc.abstractmethod + def get_reservations(self, reservations): + """Return the reservations command of the system. + """ + pass + + @abc.abstractmethod + def parse_reservations_output(self, output): + """Parses the reservations command. Should return records with: + * ReservationName + * State + * Nodes + * TotalCPUS + * TotalNodes + * StartTime + * EndTime + """ + pass diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index 9b0c963..bbf5423 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -278,18 +278,49 @@ def parse_partitions_output(self, output): "Default", ] for part_descr in partitions_descriptions: - node_info = {} + part_info = {} for attr_name in attributes: attr_match = re.search(rf'{attr_name}=(\S+)', part_descr) if attr_match: - node_info[attr_name] = attr_match.group(1) + part_info[attr_name] = attr_match.group(1) else: logger.error(f"Could not parse attribute {attr_name} in {part_descr}, will return `None`") - node_info[attr_name] = None + part_info[attr_name] = None + + partitions.append(part_info) + + return partitions + + def get_reservations(self, reservations): + reservations = [] if reservations is None else reservations + quotes_reservations = [f"'{res}'" for res in reservations] + return f"SLURM_TIME_FORMAT=standard scontrol -a show -o reservations {','.join(quotes_reservations)}" + + def parse_reservations_output(self, output): + reservations_descriptions = output.splitlines() + reservations = [] + attributes = [ + "ReservationName", + "State", + "Nodes", + "StartTime", + "EndTime", + "Features", + "PartitionName", + ] + for res_descr in reservations_descriptions: + res_info = {} + for attr_name in attributes: + attr_match = re.search(rf'{attr_name}=(\S+)', res_descr) + if attr_match: + res_info[attr_name] = attr_match.group(1) + else: + logger.error(f"Could not parse attribute {res_descr} in {res_descr}, will return `None`") + res_info[attr_name] = None - partitions.append(node_info) + reservations.append(res_info) - return list(partitions) + return reservations def is_valid_accounting_time(self,sacctTime): # HH:MM[:SS] [AM|PM] diff --git a/src/compute/compute.py b/src/compute/compute.py index f5e16dc..87a802e 100644 --- a/src/compute/compute.py +++ b/src/compute/compute.py @@ -38,11 +38,11 @@ # Internal microservices communication ## certificator -CERTIFICATOR_HOST = os.environ.get("F7T_CERTIFICATOR_HOST","127.0.0.1") +CERTIFICATOR_HOST = os.environ.get("F7T_CERTIFICATOR_HOST","127.0.0.1") CERTIFICATOR_PORT = os.environ.get("F7T_CERTIFICATOR_PORT","5000") CERTIFICATOR_URL = f"{F7T_SCHEME_PROTOCOL}://{CERTIFICATOR_HOST}:{CERTIFICATOR_PORT}" ## tasks -TASKS_HOST = os.environ.get("F7T_TASKS_HOST","127.0.0.1") +TASKS_HOST = os.environ.get("F7T_TASKS_HOST","127.0.0.1") TASKS_PORT = os.environ.get("F7T_TASKS_PORT","5003") TASKS_URL = f"{F7T_SCHEME_PROTOCOL}://{TASKS_HOST}:{TASKS_PORT}" @@ -1033,6 +1033,30 @@ def partitions_task(headers, system_name, system_addr, action, task_id): update_task(task_id, headers, async_task.SUCCESS, jobs, is_json=True) +def reservations_task(headers, system_name, system_addr, action, task_id): + # exec remote command + resp = exec_remote_command(headers, system_name, system_addr, action) + + # in case of error: + if resp["error"] == -2: + update_task(task_id, headers, async_task.ERROR, "Machine is not available") + return + + # in case of error: + if resp["error"] != 0: + err_msg = resp["msg"] + if in_str(err_msg,"OPENSSH"): + err_msg = "User does not have permissions to access machine" + update_task(task_id, headers, async_task.ERROR, err_msg) + return + + reservations = scheduler.parse_reservations_output(resp["msg"]) + app.logger.info(f"Number of reservations: {len(reservations)}") + + # as it is a json data to be stored in Tasks, the is_json=True + update_task(task_id, headers, async_task.SUCCESS, reservations, is_json=True) + + # Job account information @app.route("/acct",methods=["GET"]) @check_auth_header @@ -1394,6 +1418,139 @@ def get_partition(partitionName): data = jsonify(description="Failed to retrieve partition information",error=e) return data, 400 +@app.route("/reservations",methods=["GET"]) +@check_auth_header +def get_reservations(): + try: + system_name = request.headers["X-Machine-Name"] + except KeyError as e: + app.logger.error("No machinename given") + return jsonify(description="No machine name given"), 400 + + # public endpoints from Kong to users + if system_name not in SYSTEMS_PUBLIC: + header = {"X-Machine-Does-Not-Exists": "Machine does not exists"} + return jsonify(description="Failed to retrieve account information", error="Machine does not exists"), 400, header + + # select index in the list corresponding with machine name + system_idx = SYSTEMS_PUBLIC.index(system_name) + system_addr = SYSTEMS_INTERNAL_COMPUTE[system_idx] + + [headers, ID] = get_tracing_headers(request) + # check if machine is accessible by user: + resp = exec_remote_command(headers, system_name, system_addr, f"ID={ID} true") + + if resp["error"] != 0: + error_str = resp["msg"] + if resp["error"] == -2: + header = {"X-Machine-Not-Available": "Machine is not available"} + return jsonify(description="Failed to retrieve account information"), 400, header + if in_str(error_str,"Permission") or in_str(error_str,"OPENSSH"): + header = {"X-Permission-Denied": "User does not have permissions to access machine or path"} + return jsonify(description="Failed to retrieve account information"), 404, header + + reservations = request.args.get("reservations", None) + reservations_list = None + if reservations != None: + v = validate_input(reservations) + if v != "": + return jsonify(description="Failed to retrieve reservations information", error=f"reservations '{reservations}' {v}"), 400 + + try: + reservations_list = reservations.split(",") + except: + return jsonify(error="Jobs list wrong format", description="Failed to retrieve reservations information"), 400 + + sched_cmd = scheduler.get_reservations(reservations_list) + action = f"ID={ID} {sched_cmd}" + + try: + # obtain new task from Tasks microservice + task_id = create_task(headers, service="compute",system=system_name) + + # if error in creating task: + if task_id == -1: + return jsonify(description="Failed to retrieve reservations information", error='Error creating task'), 400 + + update_task(task_id, headers, async_task.QUEUED) + + # asynchronous task creation + aTask = threading.Thread(target=reservations_task, name=ID, + args=(headers, system_name, system_addr, action, task_id)) + + aTask.start() + task_url = f"/tasks/{task_id}" + + data = jsonify(success="Task created", task_id=task_id, task_url=task_url) + return data, 200 + + except Exception as e: + data = jsonify(description="Failed to retrieve reservations information",error=e) + return data, 400 + + +@app.route("/reservations/",methods=["GET"]) +@check_auth_header +def get_reservation(reservationName): + try: + system_name = request.headers["X-Machine-Name"] + except KeyError as e: + app.logger.error("No machinename given") + return jsonify(description="No machine name given"), 400 + + # public endpoints from Kong to users + if system_name not in SYSTEMS_PUBLIC: + header = {"X-Machine-Does-Not-Exists": "Machine does not exists"} + return jsonify(description="Failed to retrieve account information", error="Machine does not exists"), 400, header + + # select index in the list corresponding with machine name + system_idx = SYSTEMS_PUBLIC.index(system_name) + system_addr = SYSTEMS_INTERNAL_COMPUTE[system_idx] + + [headers, ID] = get_tracing_headers(request) + # check if machine is accessible by user: + resp = exec_remote_command(headers, system_name, system_addr, f"ID={ID} true") + + if resp["error"] != 0: + error_str = resp["msg"] + if resp["error"] == -2: + header = {"X-Machine-Not-Available": "Machine is not available"} + return jsonify(description="Failed to retrieve reservation information"), 400, header + if in_str(error_str,"Permission") or in_str(error_str,"OPENSSH"): + header = {"X-Permission-Denied": "User does not have permissions to access machine or path"} + return jsonify(description="Failed to retrieve reservation information"), 404, header + + v = validate_input(reservationName) + if v != "": + return jsonify(description="Failed to retrieve reservation", error=f"reservationName '{reservationName}' {v}"), 400 + + sched_cmd = scheduler.get_reservations([reservationName]) + action = f"ID={ID} {sched_cmd}" + + try: + # obtain new task from Tasks microservice + task_id = create_task(headers, service="compute", system=system_name) + + # if error in creating task: + if task_id == -1: + return jsonify(description="Failed to retrieve reservation information", error="Error creating task"), 400 + + update_task(task_id, headers, async_task.QUEUED) + + # asynchronous task creation + aTask = threading.Thread(target=reservations_task, name=ID, + args=(headers, system_name, system_addr, action, task_id)) + + aTask.start() + task_url = f"/tasks/{task_id}" + + data = jsonify(success="Task created", task_id=task_id, task_url=task_url) + return data, 200 + + except Exception as e: + data = jsonify(description="Failed to retrieve reservation information",error=e) + return data, 400 + @app.route("/status",methods=["GET"]) @check_auth_header def status(): From a4b6a5a53b3955c867b6afd461251f1d95d37f67 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 16 Apr 2024 07:31:08 +0200 Subject: [PATCH 02/10] Adapt ssh wrapper for the reservation command + handle empty list of reservtions --- deploy/test-build/cluster/ssh/ssh_command_wrapper.sh | 5 +++++ src/common/schedulers/slurm.py | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh b/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh index 59601cb..16726b2 100644 --- a/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh +++ b/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh @@ -23,6 +23,11 @@ if [ "${SSH_EXECUTE:0:3}" == "ID=" -o "${SSH_EXECUTE:0:6}" == "F7TID=" ]; then actual="${SSH_EXECUTE}" fi +# Remove the SLURM_TIME_FORMAT=standard prefix if present +if [[ "${actual:0:26}" == "SLURM_TIME_FORMAT=standard" ]]; then + actual="${actual#* }" # remove everything before the first space, including the space +fi + command="${actual%% *}" # remove all after first space case "$command" in diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index bbf5423..69494ad 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -297,6 +297,9 @@ def get_reservations(self, reservations): return f"SLURM_TIME_FORMAT=standard scontrol -a show -o reservations {','.join(quotes_reservations)}" def parse_reservations_output(self, output): + if output == "No reservations in the system": + return [] + reservations_descriptions = output.splitlines() reservations = [] attributes = [ @@ -315,7 +318,7 @@ def parse_reservations_output(self, output): if attr_match: res_info[attr_name] = attr_match.group(1) else: - logger.error(f"Could not parse attribute {res_descr} in {res_descr}, will return `None`") + logger.error(f"Could not parse attribute {attr_name} in {res_descr}, will return `None`") res_info[attr_name] = None reservations.append(res_info) From 7328360566a87148655a15e319d59d8212539325 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 16 Apr 2024 07:31:45 +0200 Subject: [PATCH 03/10] Add unittests --- .../integration/test_compute.py | 18 ++++++++++++++++-- .../automated_tests/unit/test_unit_compute.py | 11 +++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/tests/automated_tests/integration/test_compute.py b/src/tests/automated_tests/integration/test_compute.py index 4acb99c..d8aa011 100644 --- a/src/tests/automated_tests/integration/test_compute.py +++ b/src/tests/automated_tests/integration/test_compute.py @@ -208,7 +208,7 @@ def test_nodes(machine, headers): # Test partitions information @skipif_not_uses_gateway @pytest.mark.parametrize("machine", [SERVER_COMPUTE]) -def test_nodes(machine, headers): +def test_partitions(machine, headers): url = f"{COMPUTE_URL}/partitions" headers.update({"X-Machine-Name": machine}) resp = requests.get(url, headers=headers, verify=False) @@ -222,7 +222,7 @@ def test_nodes(machine, headers): @skipif_not_uses_gateway @pytest.mark.parametrize("machine", [SERVER_COMPUTE]) -def test_nodes(machine, headers): +def test_partition_xfer(machine, headers): url = f"{COMPUTE_URL}/partitions/xfer" headers.update({"X-Machine-Name": machine}) resp = requests.get(url, headers=headers, verify=False) @@ -234,6 +234,20 @@ def test_nodes(machine, headers): check_task_status(task_id, headers) +@skipif_not_uses_gateway +@pytest.mark.parametrize("machine", [SERVER_COMPUTE]) +def test_reservations(machine, headers): + url = f"{COMPUTE_URL}/reservations" + headers.update({"X-Machine-Name": machine}) + resp = requests.get(url, headers=headers, verify=False) + print(resp.content) + assert resp.status_code == 200 + + # check scancel status + task_id = resp.json()["task_id"] + check_task_status(task_id, headers) + + if __name__ == '__main__': pytest.main() diff --git a/src/tests/automated_tests/unit/test_unit_compute.py b/src/tests/automated_tests/unit/test_unit_compute.py index b7e19fd..26b783d 100644 --- a/src/tests/automated_tests/unit/test_unit_compute.py +++ b/src/tests/automated_tests/unit/test_unit_compute.py @@ -191,6 +191,17 @@ def test_partition_xfer(machine, expected_response_code, headers): assert resp.status_code == expected_response_code +# Test get reservations information +@skipif_not_uses_gateway +@pytest.mark.parametrize("machine, expected_response_code", DATA) +def test_reservations(machine, expected_response_code, headers): + url = f"{COMPUTE_URL}/reservations" + headers.update({"X-Machine-Name": machine}) + resp = requests.get(url, headers=headers, verify=False) + print(resp.content) + assert resp.status_code == expected_response_code + + # Test get status of Jobs microservice @skipif_uses_gateway def test_status(headers): From b26f386bae9ddcb5e9da1acc8b1d2ef9387f59b9 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 16 Apr 2024 07:35:08 +0200 Subject: [PATCH 04/10] Add docs --- doc/openapi/firecrest-api.yaml | 92 +++++++++++++++++++++++ doc/openapi/firecrest-developers-api.yaml | 92 +++++++++++++++++++++++ 2 files changed, 184 insertions(+) diff --git a/doc/openapi/firecrest-api.yaml b/doc/openapi/firecrest-api.yaml index caf51d9..1ae6cd4 100644 --- a/doc/openapi/firecrest-api.yaml +++ b/doc/openapi/firecrest-api.yaml @@ -1998,6 +1998,98 @@ paths: description: User does not have permissions to access machine schema: type: integer + '/compute/reservations': + parameters: + - in: header + name: X-Machine-Name + description: The system name + required: true + schema: + type: string + get: + summary: Retrieves information about all reservations + description: Information about reservations in the scheduling queue. + tags: + - Compute + parameters: + - name: partitions + in: query + description: Comma-separated list of reservations to retrieve + schema: + type: array + items: + type: string + responses: + '200': + description: Task created + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Success' + '400': + description: Task creation error + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Error' + headers: + X-Machine-Does-Not-Exist: + description: Machine does not exist + schema: + type: integer + X-Machine-Not-Available: + description: Machine is not available + schema: + type: integer + X-Permission-Denied: + description: User does not have permissions to access machine + schema: + type: integer + '/compute/reservations/{reservationName}': + get: + summary: Retrieves information about a specific reservation + description: Information about a specific reservation in the scheduling queue. + tags: + - Compute + parameters: + - name: X-Machine-Name + in: header + description: The system name + required: true + schema: + type: string + - name: reservationName + in: path + description: The ID of the reservation to retrieve. + required: true + schema: + type: string + responses: + '200': + description: Task created + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Success' + '400': + description: Task creation error + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Error' + headers: + X-Machine-Does-Not-Exist: + description: Machine does not exist + schema: + type: integer + X-Machine-Not-Available: + description: Machine is not available + schema: + type: integer + X-Permission-Denied: + description: User does not have permissions to access machine + schema: + type: integer '/storage/xfer-internal/rsync': parameters: - in: header diff --git a/doc/openapi/firecrest-developers-api.yaml b/doc/openapi/firecrest-developers-api.yaml index 2e36aa2..44e3407 100644 --- a/doc/openapi/firecrest-developers-api.yaml +++ b/doc/openapi/firecrest-developers-api.yaml @@ -1986,6 +1986,98 @@ paths: description: User does not have permissions to access machine schema: type: integer + '/compute/reservations': + parameters: + - in: header + name: X-Machine-Name + description: The system name + required: true + schema: + type: string + get: + summary: Retrieves information about all reservations + description: Information about reservations in the scheduling queue. + tags: + - Compute + parameters: + - name: partitions + in: query + description: Comma-separated list of reservations to retrieve + schema: + type: array + items: + type: string + responses: + '200': + description: Task created + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Success' + '400': + description: Task creation error + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Error' + headers: + X-Machine-Does-Not-Exist: + description: Machine does not exist + schema: + type: integer + X-Machine-Not-Available: + description: Machine is not available + schema: + type: integer + X-Permission-Denied: + description: User does not have permissions to access machine + schema: + type: integer + '/compute/reservations/{reservationName}': + get: + summary: Retrieves information about a specific reservation + description: Information about a specific reservation in the scheduling queue. + tags: + - Compute + parameters: + - name: X-Machine-Name + in: header + description: The system name + required: true + schema: + type: string + - name: reservationName + in: path + description: The ID of the reservation to retrieve. + required: true + schema: + type: string + responses: + '200': + description: Task created + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Success' + '400': + description: Task creation error + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Error' + headers: + X-Machine-Does-Not-Exist: + description: Machine does not exist + schema: + type: integer + X-Machine-Not-Available: + description: Machine is not available + schema: + type: integer + X-Permission-Denied: + description: User does not have permissions to access machine + schema: + type: integer '/storage/xfer-internal/rsync': parameters: - in: header From b01eef9cb7b9eb779713c28dc330f9d2794a147d Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 25 Apr 2024 15:10:11 +0200 Subject: [PATCH 05/10] Small fixes --- src/common/schedulers/slurm.py | 16 +++++++++++----- src/compute/compute.py | 20 ++++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index ae98fcc..f5c1f07 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -300,13 +300,19 @@ def parse_partitions_output(self, output, partitions_list=None): def get_reservations(self, reservations): reservations = [] if reservations is None else reservations - quotes_reservations = [f"'{res}'" for res in reservations] - return f"SLURM_TIME_FORMAT=standard scontrol -a show -o reservations {','.join(quotes_reservations)}" + cmd = "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" + # scontrol show reservations= is supported only + # for one partition. Otherwise we filter the output. + if len(reservations) == 1: + cmd += f"={reservations[0]}" + + return cmd - def parse_reservations_output(self, output): + def parse_reservations_output(self, output, reservations_list=None): if output == "No reservations in the system": return [] + reservations_set = set(reservations_list) if reservations_list else None reservations_descriptions = output.splitlines() reservations = [] attributes = [ @@ -316,7 +322,6 @@ def parse_reservations_output(self, output): "StartTime", "EndTime", "Features", - "PartitionName", ] for res_descr in reservations_descriptions: res_info = {} @@ -328,7 +333,8 @@ def parse_reservations_output(self, output): logger.error(f"Could not parse attribute {attr_name} in {res_descr}, will return `None`") res_info[attr_name] = None - reservations.append(res_info) + if reservations_set is None or res_info["ReservationName"] in reservations_set: + reservations.append(res_info) return reservations diff --git a/src/compute/compute.py b/src/compute/compute.py index aae5bb5..7a22e63 100644 --- a/src/compute/compute.py +++ b/src/compute/compute.py @@ -1032,7 +1032,7 @@ def partitions_task(headers, system_name, system_addr, action, task_id, partitio update_task(task_id, headers, async_task.SUCCESS, jobs, is_json=True) -def reservations_task(headers, system_name, system_addr, action, task_id): +def reservations_task(headers, system_name, system_addr, action, task_id, reservations_list=None): # exec remote command resp = exec_remote_command(headers, system_name, system_addr, action) @@ -1049,7 +1049,7 @@ def reservations_task(headers, system_name, system_addr, action, task_id): update_task(task_id, headers, async_task.ERROR, err_msg) return - reservations = scheduler.parse_reservations_output(resp["msg"]) + reservations = scheduler.parse_reservations_output(resp["msg"], reservations_list) app.logger.info(f"Number of reservations: {len(reservations)}") # as it is a json data to be stored in Tasks, the is_json=True @@ -1422,7 +1422,7 @@ def get_partition(partitionName): def get_reservations(): try: system_name = request.headers["X-Machine-Name"] - except KeyError as e: + except KeyError: app.logger.error("No machinename given") return jsonify(description="No machine name given"), 400 @@ -1444,7 +1444,7 @@ def get_reservations(): if resp["error"] == -2: header = {"X-Machine-Not-Available": "Machine is not available"} return jsonify(description="Failed to retrieve account information"), 400, header - if in_str(error_str,"Permission") or in_str(error_str,"OPENSSH"): + if in_str(error_str, "Permission") or in_str(error_str, "OPENSSH"): header = {"X-Permission-Denied": "User does not have permissions to access machine or path"} return jsonify(description="Failed to retrieve account information"), 404, header @@ -1460,12 +1460,12 @@ def get_reservations(): except: return jsonify(error="Jobs list wrong format", description="Failed to retrieve reservations information"), 400 - sched_cmd = scheduler.get_reservations(reservations_list) + sched_cmd = scheduler.get_reservations() action = f"ID={ID} {sched_cmd}" try: # obtain new task from Tasks microservice - task_id = create_task(headers, service="compute",system=system_name) + task_id = create_task(headers, service="compute", system=system_name) # if error in creating task: if task_id == -1: @@ -1475,7 +1475,7 @@ def get_reservations(): # asynchronous task creation aTask = threading.Thread(target=reservations_task, name=ID, - args=(headers, system_name, system_addr, action, task_id)) + args=(headers, system_name, system_addr, action, task_id, reservations_list)) aTask.start() task_url = f"/tasks/{task_id}" @@ -1488,12 +1488,12 @@ def get_reservations(): return data, 400 -@app.route("/reservations/",methods=["GET"]) +@app.route("/reservations/", methods=["GET"]) @check_auth_header def get_reservation(reservationName): try: system_name = request.headers["X-Machine-Name"] - except KeyError as e: + except KeyError: app.logger.error("No machinename given") return jsonify(description="No machine name given"), 400 @@ -1515,7 +1515,7 @@ def get_reservation(reservationName): if resp["error"] == -2: header = {"X-Machine-Not-Available": "Machine is not available"} return jsonify(description="Failed to retrieve reservation information"), 400, header - if in_str(error_str,"Permission") or in_str(error_str,"OPENSSH"): + if in_str(error_str, "Permission") or in_str(error_str, "OPENSSH"): header = {"X-Permission-Denied": "User does not have permissions to access machine or path"} return jsonify(description="Failed to retrieve reservation information"), 404, header From e104ad6792f94306c25eebd6793d1a8ffebf9ab0 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 25 Apr 2024 15:11:41 +0200 Subject: [PATCH 06/10] Update docstring --- src/common/schedulers/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/common/schedulers/__init__.py b/src/common/schedulers/__init__.py index 9bbdca8..bcd58cf 100644 --- a/src/common/schedulers/__init__.py +++ b/src/common/schedulers/__init__.py @@ -183,15 +183,14 @@ def get_reservations(self, reservations): pass @abc.abstractmethod - def parse_reservations_output(self, output): + def parse_reservations_output(self, output, reservations_list=None): """Parses the reservations command. Should return records with: * ReservationName * State * Nodes - * TotalCPUS - * TotalNodes * StartTime * EndTime + * Features """ pass From a7710ac8ef91f8fc18e303efb6b39ffd0b617c8e Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 29 Apr 2024 11:56:43 +0200 Subject: [PATCH 07/10] Update reservation endpoint --- src/common/schedulers/__init__.py | 4 +- src/common/schedulers/slurm.py | 42 +++++++------- src/compute/compute.py | 95 ++++++++----------------------- 3 files changed, 45 insertions(+), 96 deletions(-) diff --git a/src/common/schedulers/__init__.py b/src/common/schedulers/__init__.py index c71472f..be91dd2 100644 --- a/src/common/schedulers/__init__.py +++ b/src/common/schedulers/__init__.py @@ -177,13 +177,13 @@ def parse_partitions_output(self, output, partition_names=None): pass @abc.abstractmethod - def get_reservations(self, reservations): + def get_reservations(self, reservation_names=None): """Return the reservations command of the system. """ pass @abc.abstractmethod - def parse_reservations_output(self, output, reservations_list=None): + def parse_reservations_output(self, output, reservation_names=None): """Parses the reservations command. Should return records with: * ReservationName * State diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index 54ffb97..4e2f214 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -296,40 +296,36 @@ def parse_partitions_output(self, output, partition_names=None): return partitions - def get_reservations(self, reservations): - reservations = [] if reservations is None else reservations - cmd = "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" - # scontrol show reservations= is supported only - # for one partition. Otherwise we filter the output. - if len(reservations) == 1: - cmd += f"={reservations[0]}" + def get_reservations(self, reservation_names=None): + return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" - return cmd - - def parse_reservations_output(self, output, reservations_list=None): + def parse_reservations_output(self, output, reservation_names=None): if output == "No reservations in the system": return [] - reservations_set = set(reservations_list) if reservations_list else None + reservations_set = set(reservation_names) if reservation_names else None reservations_descriptions = output.splitlines() reservations = [] - attributes = [ - "ReservationName", - "State", - "Nodes", - "StartTime", - "EndTime", - "Features", - ] + attribute_seps = { + "ReservationName": None, + "State": None, + "Nodes": None, + "StartTime": None, + "EndTime": None, + "Features": "&", + } for res_descr in reservations_descriptions: res_info = {} - for attr_name in attributes: + for attr_name, sep in attribute_seps.items(): attr_match = re.search(rf'{attr_name}=(\S+)', res_descr) if attr_match: - res_info[attr_name] = attr_match.group(1) + attr = attr_match.group(1) + res_info[attr_name] = attr.split(sep) if sep else attr else: - logger.error(f"Could not parse attribute {attr_name} in {res_descr}, will return `None`") - res_info[attr_name] = None + raise ValueError( + f"Could not parse attribute '{attr_name}' in " + f"'{res_descr}'" + ) if reservations_set is None or res_info["ReservationName"] in reservations_set: reservations.append(res_info) diff --git a/src/compute/compute.py b/src/compute/compute.py index 30b46ed..e797f59 100644 --- a/src/compute/compute.py +++ b/src/compute/compute.py @@ -1053,8 +1053,12 @@ def reservations_task(headers, system_name, system_addr, action, task_id, reserv update_task(task_id, headers, async_task.ERROR, err_msg) return - reservations = scheduler.parse_reservations_output(resp["msg"], reservations_list) - app.logger.info(f"Number of reservations: {len(reservations)}") + try: + reservations = scheduler.parse_reservations_output(resp["msg"], reservations_list) + app.logger.info(f"Number of reservations: {len(reservations)}") + except ValueError as e: + update_task(task_id, headers, async_task.ERROR, str(e)) + return # as it is a json data to be stored in Tasks, the is_json=True update_task(task_id, headers, async_task.SUCCESS, reservations, is_json=True) @@ -1370,7 +1374,7 @@ def get_partitions(): ) return data, 400 -@app.route("/reservations",methods=["GET"]) +@app.route("/reservations", methods=["GET"]) @check_auth_header def get_reservations(): try: @@ -1390,7 +1394,12 @@ def get_reservations(): [headers, ID] = get_tracing_headers(request) # check if machine is accessible by user: - resp = exec_remote_command(headers, system_name, system_addr, f"ID={ID} true") + resp = exec_remote_command( + headers, + system_name, + system_addr, + f"ID={ID} true" + ) if resp["error"] != 0: error_str = resp["msg"] @@ -1403,7 +1412,7 @@ def get_reservations(): reservations = request.args.get("reservations", None) reservations_list = None - if reservations != None: + if reservations is not None: v = validate_input(reservations) if v != "": return jsonify(description="Failed to retrieve reservations information", error=f"reservations '{reservations}' {v}"), 400 @@ -1411,9 +1420,11 @@ def get_reservations(): try: reservations_list = reservations.split(",") except: - return jsonify(error="Jobs list wrong format", description="Failed to retrieve reservations information"), 400 + return jsonify(description="Failed to retrieve reservations information", error="Reservations list wrong format"), 400 - sched_cmd = scheduler.get_reservations() + # In Slurm we are not actually using the reservations_names argument + # for the command but it can be used for other schedulers + sched_cmd = scheduler.get_reservations(reservations_list) action = f"ID={ID} {sched_cmd}" try: @@ -1433,76 +1444,18 @@ def get_reservations(): aTask.start() task_url = f"/tasks/{task_id}" - data = jsonify(success="Task created", task_id=task_id, task_url=task_url) + data = jsonify( + success="Task created", task_id=task_id, task_url=task_url + ) return data, 200 except Exception as e: - data = jsonify(description="Failed to retrieve reservations information",error=e) + data = jsonify( + description="Failed to retrieve reservations information", error=e + ) return data, 400 -@app.route("/reservations/", methods=["GET"]) -@check_auth_header -def get_reservation(reservationName): - try: - system_name = request.headers["X-Machine-Name"] - except KeyError: - app.logger.error("No machinename given") - return jsonify(description="No machine name given"), 400 - - # public endpoints from Kong to users - if system_name not in SYSTEMS_PUBLIC: - header = {"X-Machine-Does-Not-Exists": "Machine does not exists"} - return jsonify(description="Failed to retrieve account information", error="Machine does not exists"), 400, header - - # select index in the list corresponding with machine name - system_idx = SYSTEMS_PUBLIC.index(system_name) - system_addr = SYSTEMS_INTERNAL_COMPUTE[system_idx] - - [headers, ID] = get_tracing_headers(request) - # check if machine is accessible by user: - resp = exec_remote_command(headers, system_name, system_addr, f"ID={ID} true") - - if resp["error"] != 0: - error_str = resp["msg"] - if resp["error"] == -2: - header = {"X-Machine-Not-Available": "Machine is not available"} - return jsonify(description="Failed to retrieve reservation information"), 400, header - if in_str(error_str, "Permission") or in_str(error_str, "OPENSSH"): - header = {"X-Permission-Denied": "User does not have permissions to access machine or path"} - return jsonify(description="Failed to retrieve reservation information"), 404, header - - v = validate_input(reservationName) - if v != "": - return jsonify(description="Failed to retrieve reservation", error=f"reservationName '{reservationName}' {v}"), 400 - - sched_cmd = scheduler.get_reservations([reservationName]) - action = f"ID={ID} {sched_cmd}" - - try: - # obtain new task from Tasks microservice - task_id = create_task(headers, service="compute", system=system_name) - - # if error in creating task: - if task_id == -1: - return jsonify(description="Failed to retrieve reservation information", error="Error creating task"), 400 - - update_task(task_id, headers, async_task.QUEUED) - - # asynchronous task creation - aTask = threading.Thread(target=reservations_task, name=ID, - args=(headers, system_name, system_addr, action, task_id)) - - aTask.start() - task_url = f"/tasks/{task_id}" - - data = jsonify(success="Task created", task_id=task_id, task_url=task_url) - return data, 200 - - except Exception as e: - data = jsonify(description="Failed to retrieve reservation information",error=e) - return data, 400 - @app.route("/status",methods=["GET"]) @check_auth_header def status(): From 8b593553aa6c2b66e3ef40ce66753434dd5310f0 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 29 Apr 2024 11:58:53 +0200 Subject: [PATCH 08/10] Remove reservations/ from docs --- doc/openapi/firecrest-api.yaml | 45 ----------------------- doc/openapi/firecrest-developers-api.yaml | 45 ----------------------- 2 files changed, 90 deletions(-) diff --git a/doc/openapi/firecrest-api.yaml b/doc/openapi/firecrest-api.yaml index cf7f399..7ef4db2 100644 --- a/doc/openapi/firecrest-api.yaml +++ b/doc/openapi/firecrest-api.yaml @@ -2010,51 +2010,6 @@ paths: description: User does not have permissions to access machine schema: type: integer - '/compute/reservations/{reservationName}': - get: - summary: Retrieves information about a specific reservation - description: Information about a specific reservation in the scheduling queue. - tags: - - Compute - parameters: - - name: X-Machine-Name - in: header - description: The system name - required: true - schema: - type: string - - name: reservationName - in: path - description: The ID of the reservation to retrieve. - required: true - schema: - type: string - responses: - '200': - description: Task created - content: - application/json: - schema: - $ref: '#/components/schemas/Task-Creation-Success' - '400': - description: Task creation error - content: - application/json: - schema: - $ref: '#/components/schemas/Task-Creation-Error' - headers: - X-Machine-Does-Not-Exist: - description: Machine does not exist - schema: - type: integer - X-Machine-Not-Available: - description: Machine is not available - schema: - type: integer - X-Permission-Denied: - description: User does not have permissions to access machine - schema: - type: integer '/storage/xfer-internal/rsync': parameters: - in: header diff --git a/doc/openapi/firecrest-developers-api.yaml b/doc/openapi/firecrest-developers-api.yaml index b47bf59..282b30b 100644 --- a/doc/openapi/firecrest-developers-api.yaml +++ b/doc/openapi/firecrest-developers-api.yaml @@ -1998,51 +1998,6 @@ paths: description: User does not have permissions to access machine schema: type: integer - '/compute/reservations/{reservationName}': - get: - summary: Retrieves information about a specific reservation - description: Information about a specific reservation in the scheduling queue. - tags: - - Compute - parameters: - - name: X-Machine-Name - in: header - description: The system name - required: true - schema: - type: string - - name: reservationName - in: path - description: The ID of the reservation to retrieve. - required: true - schema: - type: string - responses: - '200': - description: Task created - content: - application/json: - schema: - $ref: '#/components/schemas/Task-Creation-Success' - '400': - description: Task creation error - content: - application/json: - schema: - $ref: '#/components/schemas/Task-Creation-Error' - headers: - X-Machine-Does-Not-Exist: - description: Machine does not exist - schema: - type: integer - X-Machine-Not-Available: - description: Machine is not available - schema: - type: integer - X-Permission-Denied: - description: User does not have permissions to access machine - schema: - type: integer '/storage/xfer-internal/rsync': parameters: - in: header From 8f9f6b3bd30bcef03e6147817eeb2f3e87993acc Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 2 May 2024 09:57:08 +0200 Subject: [PATCH 09/10] test --- src/common/schedulers/slurm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index cccd8a4..59d457b 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -297,7 +297,8 @@ def parse_partitions_output(self, output, partition_names=None): return partitions def get_reservations(self, reservation_names=None): - return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" + return "scontrol -a show -o reservations" + # return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" def parse_reservations_output(self, output, reservation_names=None): if output == "No reservations in the system": From 283ea33cdef020f6ef3c839cc94ee1b0bcc98814 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 2 May 2024 10:37:02 +0200 Subject: [PATCH 10/10] Fix reservation command --- src/common/schedulers/slurm.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index 59d457b..cccd8a4 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -297,8 +297,7 @@ def parse_partitions_output(self, output, partition_names=None): return partitions def get_reservations(self, reservation_names=None): - return "scontrol -a show -o reservations" - # return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" + return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" def parse_reservations_output(self, output, reservation_names=None): if output == "No reservations in the system":