diff --git a/src/murfey/cli/generate_route_manifest.py b/src/murfey/cli/generate_route_manifest.py index d103abe7d..ba14b5cb8 100644 --- a/src/murfey/cli/generate_route_manifest.py +++ b/src/murfey/cli/generate_route_manifest.py @@ -81,6 +81,18 @@ def get_route_manifest(routers: dict[str, APIRouter]): ), } path_params.append(param_info) + for route_dependency in route.dependant.dependencies: + for param in route_dependency.path_params: + param_type = param.type_ if param.type_ is not None else Any + param_info = { + "name": param.name if hasattr(param, "name") else "", + "type": ( + param_type.__name__ + if hasattr(param_type, "__name__") + else str(param_type) + ), + } + path_params.append(param_info) route_info = { "path": route.path if hasattr(route, "path") else "", "function": route.name if hasattr(route, "name") else "", diff --git a/src/murfey/cli/repost_failed_calls.py b/src/murfey/cli/repost_failed_calls.py index c8a3f46b7..a3fe219c9 100644 --- a/src/murfey/cli/repost_failed_calls.py +++ b/src/murfey/cli/repost_failed_calls.py @@ -1,14 +1,31 @@ import argparse +import asyncio import json from datetime import datetime from functools import partial +from inspect import getfullargspec, iscoroutinefunction from pathlib import Path from queue import Empty, Queue -import requests -from jose import jwt +from sqlmodel import Session from workflows.transport.pika_transport import PikaTransport +import murfey.server.api.auth +import murfey.server.api.bootstrap +import murfey.server.api.clem +import murfey.server.api.display +import murfey.server.api.file_io_frontend +import murfey.server.api.file_io_instrument +import murfey.server.api.hub +import murfey.server.api.instrument +import murfey.server.api.mag_table +import murfey.server.api.processing_parameters +import murfey.server.api.prometheus +import murfey.server.api.session_control +import murfey.server.api.session_info +import murfey.server.api.websocket +import murfey.server.api.workflow +from murfey.server.murfey_db import get_murfey_db_session from murfey.util.config import security_from_file @@ -85,26 +102,56 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path): transport.disconnect() -def handle_failed_posts(messages_path: list[Path], token: str): +def handle_failed_posts(messages_path: list[Path], murfey_db: Session): """Deal with any messages that have been sent as failed client posts""" for json_file in messages_path: with open(json_file, "r") as json_data: message = json.load(json_data) + router_name = message.get("message", {}).get("router_name", "") + router_base = router_name.split(".")[0] + function_name = message.get("message", {}).get("function_name", "") + if not router_name or not function_name: + print( + f"Cannot repost {json_file} as it does not have a router or function name" + ) + continue - if not message.get("message") or not message["message"].get("url"): - print(f"{json_file} is not a failed client post") + try: + function_to_call = getattr( + getattr(murfey.server.api, router_base), function_name + ) + except AttributeError: + print(f"Cannot repost {json_file} as {function_name} does not exist") + continue + expected_args = getfullargspec(function_to_call) + + call_kwargs = message.get("message", {}).get("kwargs", {}) + call_data = message.get("message", {}).get("data", {}) + function_call_dict = {} + + try: + for call_arg in expected_args.args: + call_arg_type = expected_args.annotations.get(call_arg, str) + if call_arg in call_kwargs.keys(): + function_call_dict[call_arg] = call_arg_type(call_kwargs[call_arg]) + elif call_arg == "db": + function_call_dict["db"] = murfey_db + else: + print(call_data, call_arg_type, call_arg) + function_call_dict[call_arg] = call_arg_type(**call_data) + except TypeError as e: + print(f"Cannot repost {json_file} due to argument error: {e}") continue - dest = message["message"]["url"] - message_json = message["message"]["json"] - response = requests.post( - dest, json=message_json, headers={"Authorization": f"Bearer {token}"} - ) - if response.status_code != 200: - print(f"Failed to repost {json_file}") - else: + try: + if iscoroutinefunction(function_to_call): + asyncio.run(function_to_call(**function_call_dict)) + else: + function_to_call(**function_call_dict) print(f"Reposted {json_file}") json_file.unlink() + except Exception as e: + print(f"Failed to post {json_file} to {function_name}: {e}") def run(): @@ -123,12 +170,6 @@ def run(): help="Security config file", required=True, ) - parser.add_argument( - "-u", - "--username", - help="Token username", - required=True, - ) parser.add_argument( "-d", "--dir", default="DLQ", help="Directory to export messages to" ) @@ -136,13 +177,7 @@ def run(): # Read the security config file security_config = security_from_file(args.config) - - # Get the token to post to the api with - token = jwt.encode( - {"user": args.username}, - security_config.auth_key, - algorithm=security_config.auth_algorithm, - ) + murfey_db = get_murfey_db_session(security_config) # Purge the queue and repost/reinject any messages found dlq_dump_path = Path(args.dir) @@ -152,7 +187,7 @@ def run(): security_config.feedback_queue, security_config.rabbitmq_credentials, ) - handle_failed_posts(exported_messages, token) + handle_failed_posts(exported_messages, murfey_db) handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials) # Clean up any created directories diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index 71fe71cc0..ff67c4a89 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -94,7 +94,10 @@ class DCGroupParameters(BaseModel): "/visits/{visit_name}/sessions/{session_id}/register_data_collection_group" ) def register_dc_group( - visit_name, session_id: MurfeySessionID, dcg_params: DCGroupParameters, db=murfey_db + visit_name: str, + session_id: MurfeySessionID, + dcg_params: DCGroupParameters, + db=murfey_db, ): ispyb_proposal_code = visit_name[:2] ispyb_proposal_number = visit_name.split("-")[0][2:] @@ -201,7 +204,7 @@ class DCParameters(BaseModel): @router.post("/visits/{visit_name}/sessions/{session_id}/start_data_collection") def start_dc( - visit_name, session_id: MurfeySessionID, dc_params: DCParameters, db=murfey_db + visit_name: str, session_id: MurfeySessionID, dc_params: DCParameters, db=murfey_db ): ispyb_proposal_code = visit_name[:2] ispyb_proposal_number = visit_name.split("-")[0][2:] diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index 72e6e859a..4a7d080f1 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -18,17 +18,23 @@ murfey.instrument_server.api.router: - POST - path: /sessions/{session_id}/check_token function: check_token - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /sessions/{session_id}/multigrid_watcher function: setup_multigrid_watcher - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/start_multigrid_watcher function: start_multigrid_watcher - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/multigrid_watcher/{label} @@ -36,66 +42,92 @@ murfey.instrument_server.api.router: path_params: - name: label type: str + - name: session_id + type: int methods: - DELETE - path: /sessions/{session_id}/multigrid_controller/status function: check_multigrid_controller_status - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /sessions/{session_id}/multigrid_controller/visit_end_time function: update_multigrid_controller_visit_end_time - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/stop_rsyncer function: stop_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/remove_rsyncer function: remove_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/abandon_controller function: abandon_controller - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/finalise_rsyncer function: finalise_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/finalise_session function: finalise_session - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/restart_rsyncer function: restart_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/flush_skipped_rsyncer function: flush_skipped_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /sessions/{session_id}/rsyncer_info function: get_rsyncer_info - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /sessions/{session_id}/analyser_info function: get_analyser_info - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /sessions/{session_id}/processing_parameters function: register_processing_parameters - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instruments/{instrument_name}/sessions/{session_id}/possible_gain_references @@ -103,6 +135,8 @@ murfey.instrument_server.api.router: path_params: - name: instrument_name type: str + - name: session_id + type: int methods: - GET - path: /instruments/{instrument_name}/sessions/{session_id}/upload_gain_reference @@ -110,6 +144,8 @@ murfey.instrument_server.api.router: path_params: - name: instrument_name type: str + - name: session_id + type: int methods: - POST - path: /visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request @@ -117,6 +153,8 @@ murfey.instrument_server.api.router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST murfey.server.api.auth.router: @@ -127,7 +165,9 @@ murfey.server.api.auth.router: - POST - path: /auth/sessions/{session_id}/token function: mint_session_token - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /auth/validate_token @@ -432,7 +472,9 @@ murfey.server.api.display.router: murfey.server.api.file_io_frontend.router: - path: /file_io/frontend/sessions/{session_id}/process_gain function: process_gain - path_params: [] + path_params: + - name: session_id + type: int methods: - POST murfey.server.api.file_io_instrument.router: @@ -454,7 +496,9 @@ murfey.server.api.file_io_instrument.router: - POST - path: /file_io/instrument/sessions/{session_id}/process_gain function: process_gain - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /file_io/instrument/visits/{visit_name}/sessions/{session_id}/eer_fractionation_file @@ -483,44 +527,54 @@ murfey.server.api.instrument.router: - path: /instrument_server/instruments/{instrument_name}/sessions/{session_id}/activate_instrument_server function: activate_instrument_server_for_session path_params: - - name: instrument_name - type: str - name: session_id type: int + - name: instrument_name + type: str methods: - POST - path: /instrument_server/instruments/{instrument_name}/sessions/{session_id}/active function: check_if_session_is_active path_params: - - name: instrument_name - type: str - name: session_id type: int + - name: instrument_name + type: str methods: - GET - path: /instrument_server/sessions/{session_id}/multigrid_controller/status function: check_multigrid_controller_status - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /instrument_server/sessions/{session_id}/multigrid_watcher function: setup_multigrid_watcher - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/start_multigrid_watcher function: start_multigrid_watcher - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/multigrid_controller/visit_end_time function: update_visit_end_time - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/provided_processing_parameters function: pass_proc_params_to_instrument_server - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/instruments/{instrument_name}/instrument_server @@ -535,11 +589,15 @@ murfey.server.api.instrument.router: path_params: - name: instrument_name type: str + - name: session_id + type: int methods: - GET - path: /instrument_server/sessions/{session_id}/upload_gain_reference function: request_gain_reference_upload - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request @@ -547,41 +605,57 @@ murfey.server.api.instrument.router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/stop_rsyncer function: stop_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/finalise_rsyncer function: finalise_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/finalise_session function: finalise_session - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/abandon_session function: abandon_session - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/remove_rsyncer function: remove_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/restart_rsyncer function: restart_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/sessions/{session_id}/flush_skipped_rsyncer function: flush_skipped_rsyncer - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /instrument_server/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info @@ -589,6 +663,8 @@ murfey.server.api.instrument.router: path_params: - name: instrument_name type: str + - name: session_id + type: int methods: - GET murfey.server.api.mag_table.router: @@ -612,12 +688,16 @@ murfey.server.api.mag_table.router: murfey.server.api.processing_parameters.router: - path: /session_parameters/sessions/{session_id}/session_processing_parameters function: get_session_processing_parameters - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_parameters/sessions/{session_id}/session_processing_parameters function: set_session_processing_parameters - path_params: [] + path_params: + - name: session_id + type: int methods: - POST murfey.server.api.prometheus.router: @@ -668,7 +748,9 @@ murfey.server.api.prometheus.router: murfey.server.api.session_control.correlative_router: - path: /session_control/correlative/sessions/{session_id}/upstream_visits function: find_upstream_visits - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths @@ -739,12 +821,16 @@ murfey.server.api.session_control.router: - GET - path: /session_control/sessions/{session_id} function: remove_session - path_params: [] + path_params: + - name: session_id + type: int methods: - DELETE - path: /session_control/sessions/{session_id}/successful_processing function: register_processing_success_in_ispyb - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /session_control/num_movies @@ -768,7 +854,9 @@ murfey.server.api.session_control.router: - POST - path: /session_control/sessions/{session_id}/rsyncers function: get_rsyncers_for_session - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_control/sessions/{session_id}/rsyncer_stopped @@ -795,7 +883,9 @@ murfey.server.api.session_control.router: murfey.server.api.session_control.spa_router: - path: /session_control/spa/sessions/{session_id}/grid_squares function: get_grid_squares - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_control/spa/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares @@ -803,6 +893,8 @@ murfey.server.api.session_control.spa_router: path_params: - name: dcgid type: int + - name: session_id + type: int methods: - GET - path: /session_control/spa/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares/{gsid}/foil_holes @@ -812,6 +904,8 @@ murfey.server.api.session_control.spa_router: type: int - name: gsid type: int + - name: session_id + type: int methods: - GET - path: /session_control/spa/sessions/{session_id}/foil_hole/{fh_name} @@ -819,11 +913,15 @@ murfey.server.api.session_control.spa_router: path_params: - name: fh_name type: int + - name: session_id + type: int methods: - GET - path: /session_control/spa/sessions/{session_id}/make_atlas_jpg function: make_atlas_jpg - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /session_control/spa/sessions/{session_id}/grid_square/{gsid} @@ -831,6 +929,8 @@ murfey.server.api.session_control.spa_router: path_params: - name: gsid type: int + - name: session_id + type: int methods: - POST - path: /session_control/spa/sessions/{session_id}/grid_square/{gs_name}/foil_hole @@ -838,6 +938,8 @@ murfey.server.api.session_control.spa_router: path_params: - name: gs_name type: int + - name: session_id + type: int methods: - POST murfey.server.api.session_control.tomo_router: @@ -846,6 +948,8 @@ murfey.server.api.session_control.tomo_router: path_params: - name: sm_name type: str + - name: session_id + type: int methods: - POST - path: /session_control/tomo/sessions/{session_id}/batch_position/{batch_name} @@ -853,12 +957,16 @@ murfey.server.api.session_control.tomo_router: path_params: - name: batch_name type: str + - name: session_id + type: int methods: - POST murfey.server.api.session_info.correlative_router: - path: /session_info/correlative/sessions/{session_id}/upstream_visits function: find_upstream_visits - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths @@ -915,12 +1023,16 @@ murfey.server.api.session_info.router: - GET - path: /session_info/sessions/{session_id}/rsyncers function: get_rsyncers_for_client - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_info/sessions/{session_id} function: get_session - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_info/sessions @@ -931,31 +1043,35 @@ murfey.server.api.session_info.router: - path: /session_info/instruments/{instrument_name}/visits/{visit}/session/{name} function: create_session path_params: - - name: instrument_name - type: str - name: visit type: str - name: name type: str + - name: instrument_name + type: str methods: - POST - path: /session_info/sessions/{session_id} function: update_session - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /session_info/sessions/{session_id} function: remove_session - path_params: [] + path_params: + - name: session_id + type: int methods: - DELETE - path: /session_info/instruments/{instrument_name}/visits/{visit_name}/sessions function: get_sessions_with_visit path_params: - - name: instrument_name - type: str - name: visit_name type: str + - name: instrument_name + type: str methods: - GET - path: /session_info/instruments/{instrument_name}/sessions @@ -967,7 +1083,9 @@ murfey.server.api.session_info.router: - GET - path: /session_info/sessions/{session_id}/data_collection_groups function: get_dc_groups - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_info/sessions/{session_id}/data_collection_groups/{dcgid}/data_collections @@ -975,6 +1093,8 @@ murfey.server.api.session_info.router: path_params: - name: dcgid type: int + - name: session_id + type: int methods: - GET - path: /session_info/clients @@ -984,13 +1104,17 @@ murfey.server.api.session_info.router: - GET - path: /session_info/sessions/{session_id}/current_gain_ref function: update_current_gain_ref - path_params: [] + path_params: + - name: session_id + type: int methods: - PUT murfey.server.api.session_info.spa_router: - path: /session_info/spa/sessions/{session_id}/spa_processing_parameters function: get_spa_proc_param_details - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_info/spa/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares/{gsid}/foil_holes/{fhid}/num_movies @@ -1008,7 +1132,9 @@ murfey.server.api.session_info.spa_router: - GET - path: /session_info/spa/sessions/{session_id}/grid_squares function: get_grid_squares - path_params: [] + path_params: + - name: session_id + type: int methods: - GET - path: /session_info/spa/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares @@ -1016,6 +1142,8 @@ murfey.server.api.session_info.spa_router: path_params: - name: dcgid type: int + - name: session_id + type: int methods: - GET - path: /session_info/spa/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares/{gsid}/foil_holes @@ -1025,6 +1153,8 @@ murfey.server.api.session_info.spa_router: type: int - name: gsid type: int + - name: session_id + type: int methods: - GET - path: /session_info/spa/sessions/{session_id}/foil_hole/{fh_name} @@ -1032,6 +1162,8 @@ murfey.server.api.session_info.spa_router: path_params: - name: fh_name type: int + - name: session_id + type: int methods: - GET murfey.server.api.session_info.tomo_router: @@ -1040,6 +1172,8 @@ murfey.server.api.session_info.tomo_router: path_params: - name: tilt_series_tag type: str + - name: session_id + type: int methods: - GET murfey.server.api.websocket.ws: @@ -1121,14 +1255,18 @@ murfey.server.api.workflow.router: function: register_dc_group path_params: - name: visit_name - type: typing.Any + type: str + - name: session_id + type: int methods: - POST - path: /workflow/visits/{visit_name}/sessions/{session_id}/start_data_collection function: start_dc path_params: - name: visit_name - type: typing.Any + type: str + - name: session_id + type: int methods: - POST - path: /workflow/visits/{visit_name}/sessions/{session_id}/register_processing_job @@ -1136,12 +1274,16 @@ murfey.server.api.workflow.router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST murfey.server.api.workflow.spa_router: - path: /workflow/spa/sessions/{session_id}/spa_processing_parameters function: register_spa_proc_params - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /workflow/spa/visits/{visit_name}/sessions/{session_id}/flush_spa_processing @@ -1149,6 +1291,8 @@ murfey.server.api.workflow.spa_router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST - path: /workflow/spa/visits/{visit_name}/sessions/{session_id}/spa_preprocess @@ -1156,12 +1300,16 @@ murfey.server.api.workflow.spa_router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST murfey.server.api.workflow.tomo_router: - path: /workflow/tomo/sessions/{session_id}/tomography_processing_parameters function: register_tomo_proc_params - path_params: [] + path_params: + - name: session_id + type: int methods: - POST - path: /workflow/tomo/visits/{visit_name}/sessions/{session_id}/flush_tomography_processing @@ -1169,6 +1317,8 @@ murfey.server.api.workflow.tomo_router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST - path: /workflow/tomo/visits/{visit_name}/tilt_series @@ -1190,6 +1340,8 @@ murfey.server.api.workflow.tomo_router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST - path: /workflow/tomo/visits/{visit_name}/sessions/{session_id}/completed_tilt_series @@ -1197,6 +1349,8 @@ murfey.server.api.workflow.tomo_router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST - path: /workflow/tomo/visits/{visit_name}/rerun_tilt_series @@ -1211,5 +1365,7 @@ murfey.server.api.workflow.tomo_router: path_params: - name: visit_name type: str + - name: session_id + type: int methods: - POST diff --git a/tests/cli/test_repost_failed_calls.py b/tests/cli/test_repost_failed_calls.py index 6ef2c7e81..ea8fc76e5 100644 --- a/tests/cli/test_repost_failed_calls.py +++ b/tests/cli/test_repost_failed_calls.py @@ -1,4 +1,5 @@ import json +import os import subprocess import sys from pathlib import Path @@ -6,6 +7,8 @@ from unittest import mock from murfey.cli import repost_failed_calls +from murfey.util.config import security_from_file +from murfey.util.db import Tilt @mock.patch("murfey.cli.repost_failed_calls.PikaTransport") @@ -104,23 +107,53 @@ def test_handle_dlq_messages(mock_transport, tmp_path): mock_transport().disconnect.assert_called_once() -@mock.patch("murfey.cli.repost_failed_calls.requests") -def test_handle_failed_posts(mock_requests, tmp_path): +def test_handle_failed_posts(tmp_path): """Test that the API is called with any failed client post messages""" # Create some sample messages messages_paths_list: list[Path] = [] messages_dict: dict[str, dict] = { "msg1": { - "message": {"url": "sample/url", "json": {"content": "msg1"}}, - }, + "message": { + "router_name": "workflow.tomo_router", + "function_name": "register_completed_tilt_series", + "kwargs": {"visit_name": "cm12345-1", "session_id": 1}, + "data": { + "tags": ["tag"], + "source": "source", + "tilt_series_lengths": [10], + }, + }, + }, # normal example "msg2": { - "message": {"url": "sample/url", "json": {"content": "msg2"}}, - }, + "message": { + "router_name": "workflow.tomo_router", + "function_name": "register_tilt", + "kwargs": {"visit_name": "cm12345-1", "session_id": 1}, + "data": { + "tilt_series_tag": "tag", + "source": "source", + "movie_path": "path", + }, + }, + }, # async example "msg3": { - "message": {"content": "msg3"}, # not a failed client post + "message": { + "router_name": "workflow.tomo_router", + "function_name": "register_completed_tilt_series", + "data": {"tags": ["tag"]}, + } }, "msg4": { - "header": {"content": "msg3"}, # does not have a message + "message": {"function_name": "dummy"}, # does not have a router + }, + "msg5": { + "message": {"router_name": "workflow"}, # does not have a function + }, + "msg6": { + "message": { + "router_name": "workflow", + "function_name": "dummy", + }, # function does not exist }, } for file_name, message in messages_dict.items(): @@ -128,82 +161,76 @@ def test_handle_failed_posts(mock_requests, tmp_path): with open(tmp_path / file_name, "w") as msg_file: json.dump(message, msg_file) - class Response: - def __init__(self, status_code): - self.status_code = status_code - - mock_requests.post.side_effect = [Response(200), Response(300)] - - repost_failed_calls.handle_failed_posts(messages_paths_list, "dummy_token") + mock_db = mock.Mock() + mock_exec_return = mock.Mock() + mock_exec_return.all.return_value = [] + mock_db.exec.return_value = mock_exec_return + repost_failed_calls.handle_failed_posts(messages_paths_list, mock_db) # Check the failed posts were resent - assert mock_requests.post.call_count == 2 - mock_requests.post.assert_any_call( - "sample/url", - json={"content": "msg1"}, - headers={"Authorization": "Bearer dummy_token"}, - ) - mock_requests.post.assert_any_call( - "sample/url", - json={"content": "msg2"}, - headers={"Authorization": "Bearer dummy_token"}, + assert mock_db.exec.call_count == 3 + assert mock_db.exec().one.call_count == 1 + assert mock_db.exec().all.call_count == 2 + assert mock_exec_return.one.call_count == 1 + assert mock_exec_return.all.call_count == 2 + assert mock_db.commit.call_count == 3 + mock_db.add.assert_called_once_with( + Tilt(movie_path="path", tilt_series_id=mock.ANY, motion_corrected=False) ) # Check only the failed post which was successfully reinjected got deleted assert not (tmp_path / "msg1").is_file() # got resent - assert (tmp_path / "msg2").is_file() # failed reinjection - assert (tmp_path / "msg3").is_file() # not a failed client post - assert (tmp_path / "msg4").is_file() # does not have a message + assert not (tmp_path / "msg2").is_file() # got resent + assert (tmp_path / "msg3").is_file() # failed reinjection + assert (tmp_path / "msg4").is_file() # does not have a router + assert (tmp_path / "msg5").is_file() # does not have a function + assert (tmp_path / "msg6").is_file() # function does not exist @mock.patch("murfey.cli.repost_failed_calls.dlq_purge") @mock.patch("murfey.cli.repost_failed_calls.handle_failed_posts") @mock.patch("murfey.cli.repost_failed_calls.handle_dlq_messages") -@mock.patch("murfey.cli.repost_failed_calls.jwt") +@mock.patch("murfey.cli.repost_failed_calls.get_murfey_db_session") def test_run_repost_failed_calls( - mock_jwt, + mock_db, mock_reinject, mock_repost, mock_purge, mock_security_configuration, ): - mock_jwt.encode.return_value = "dummy_token" + mock_db.return_value = "db" mock_purge.return_value = ["/path/to/msg1"] config_file = mock_security_configuration with open(config_file) as f: - security_config = json.load(f) + security_config_dict = json.load(f) sys.argv = [ "murfey.repost_failed_calls", "--config", str(config_file), - "--username", - "user", "--dir", "DLQ_dir", ] repost_failed_calls.run() - mock_jwt.encode.assert_called_with( - {"user": "user"}, - security_config["auth_key"], - algorithm=security_config["auth_algorithm"], - ) + security_config_class = security_from_file(config_file) + mock_db.assert_called_with(security_config_class) mock_purge.assert_called_once_with( Path("DLQ_dir"), "murfey_feedback", - Path(security_config["rabbitmq_credentials"]), + Path(security_config_dict["rabbitmq_credentials"]), ) - mock_repost.assert_called_once_with(["/path/to/msg1"], "dummy_token") + mock_repost.assert_called_once_with(["/path/to/msg1"], "db") mock_reinject.assert_called_once_with( - ["/path/to/msg1"], Path(security_config["rabbitmq_credentials"]) + ["/path/to/msg1"], Path(security_config_dict["rabbitmq_credentials"]) ) -def test_repost_failed_calls_exists(): +def test_repost_failed_calls_exists(mock_security_configuration): """Test the CLI is made""" + os.environ["MURFEY_SECURITY_CONFIGURATION"] = str(mock_security_configuration) result = subprocess.run( [ "murfey.repost_failed_calls", @@ -218,6 +245,4 @@ def test_repost_failed_calls_exists(): cleaned_help_line = ( stdout_as_string.split("\n\n")[0].replace("\n", "").replace(" ", "") ) - assert cleaned_help_line == ( - "usage:murfey.repost_failed_calls[-h]-cCONFIG-uUSERNAME[-dDIR]" - ) + assert cleaned_help_line == ("usage:murfey.repost_failed_calls[-h]-cCONFIG[-dDIR]") diff --git a/tests/client/test_context.py b/tests/client/test_context.py index 2c71d70fc..d72573560 100644 --- a/tests/client/test_context.py +++ b/tests/client/test_context.py @@ -26,6 +26,7 @@ def test_tomography_context_add_tomo_tilt(mock_post, mock_get, tmp_path): default_destinations={tmp_path: str(tmp_path)}, instrument_name="", visit="test", + murfey_session=1, ) context = TomographyContext("tomo", tmp_path) (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() @@ -82,6 +83,7 @@ def test_tomography_context_add_tomo_tilt_out_of_order(mock_post, mock_get, tmp_ default_destinations={tmp_path: str(tmp_path)}, instrument_name="", visit="test", + murfey_session=1, ) context = TomographyContext("tomo", tmp_path) (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() @@ -166,6 +168,7 @@ def test_tomography_context_add_tomo_tilt_delayed_tilt(mock_post, mock_get, tmp_ default_destinations={tmp_path: str(tmp_path)}, instrument_name="", visit="test", + murfey_session=1, ) context = TomographyContext("tomo", tmp_path) (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() @@ -230,6 +233,7 @@ def test_setting_tilt_series_size_and_completion_from_mdoc_parsing( default_destinations={tmp_path: str(tmp_path)}, instrument_name="", visit="test", + murfey_session=1, ) context = TomographyContext("tomo", tmp_path) assert len(context._tilt_series_sizes) == 0