Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/murfey/cli/generate_route_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ def get_route_manifest(routers: dict[str, APIRouter]):
),
}
path_params.append(param_info)
for route_dependency in route.dependant.dependencies:
for param in route_dependency.path_params:
param_type = param.type_ if param.type_ is not None else Any
param_info = {
"name": param.name if hasattr(param, "name") else "",
"type": (
param_type.__name__
if hasattr(param_type, "__name__")
else str(param_type)
),
}
path_params.append(param_info)
route_info = {
"path": route.path if hasattr(route, "path") else "",
"function": route.name if hasattr(route, "name") else "",
Expand Down
89 changes: 62 additions & 27 deletions src/murfey/cli/repost_failed_calls.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import argparse
import asyncio
import json
from datetime import datetime
from functools import partial
from inspect import getfullargspec, iscoroutinefunction
from pathlib import Path
from queue import Empty, Queue

import requests
from jose import jwt
from sqlmodel import Session
from workflows.transport.pika_transport import PikaTransport

import murfey.server.api.auth
import murfey.server.api.bootstrap
import murfey.server.api.clem
import murfey.server.api.display
import murfey.server.api.file_io_frontend
import murfey.server.api.file_io_instrument
import murfey.server.api.hub
import murfey.server.api.instrument
import murfey.server.api.mag_table
import murfey.server.api.processing_parameters
import murfey.server.api.prometheus
import murfey.server.api.session_control
import murfey.server.api.session_info
import murfey.server.api.websocket
import murfey.server.api.workflow
from murfey.server.murfey_db import get_murfey_db_session
from murfey.util.config import security_from_file


Expand Down Expand Up @@ -85,26 +102,56 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
transport.disconnect()


def handle_failed_posts(messages_path: list[Path], token: str):
def handle_failed_posts(messages_path: list[Path], murfey_db: Session):
"""Deal with any messages that have been sent as failed client posts"""
for json_file in messages_path:
with open(json_file, "r") as json_data:
message = json.load(json_data)
router_name = message.get("message", {}).get("router_name", "")
router_base = router_name.split(".")[0]
function_name = message.get("message", {}).get("function_name", "")
if not router_name or not function_name:
print(
f"Cannot repost {json_file} as it does not have a router or function name"
)
continue

if not message.get("message") or not message["message"].get("url"):
print(f"{json_file} is not a failed client post")
try:
function_to_call = getattr(
getattr(murfey.server.api, router_base), function_name
)
except AttributeError:
print(f"Cannot repost {json_file} as {function_name} does not exist")
continue
expected_args = getfullargspec(function_to_call)

call_kwargs = message.get("message", {}).get("kwargs", {})
call_data = message.get("message", {}).get("data", {})
function_call_dict = {}

try:
for call_arg in expected_args.args:
call_arg_type = expected_args.annotations.get(call_arg, str)
if call_arg in call_kwargs.keys():
function_call_dict[call_arg] = call_arg_type(call_kwargs[call_arg])
elif call_arg == "db":
function_call_dict["db"] = murfey_db
else:
print(call_data, call_arg_type, call_arg)
function_call_dict[call_arg] = call_arg_type(**call_data)
except TypeError as e:
print(f"Cannot repost {json_file} due to argument error: {e}")
continue
dest = message["message"]["url"]
message_json = message["message"]["json"]

response = requests.post(
dest, json=message_json, headers={"Authorization": f"Bearer {token}"}
)
if response.status_code != 200:
print(f"Failed to repost {json_file}")
else:
try:
if iscoroutinefunction(function_to_call):
asyncio.run(function_to_call(**function_call_dict))
else:
function_to_call(**function_call_dict)
print(f"Reposted {json_file}")
json_file.unlink()
except Exception as e:
print(f"Failed to post {json_file} to {function_name}: {e}")


def run():
Expand All @@ -123,26 +170,14 @@ def run():
help="Security config file",
required=True,
)
parser.add_argument(
"-u",
"--username",
help="Token username",
required=True,
)
parser.add_argument(
"-d", "--dir", default="DLQ", help="Directory to export messages to"
)
args = parser.parse_args()

# Read the security config file
security_config = security_from_file(args.config)

# Get the token to post to the api with
token = jwt.encode(
{"user": args.username},
security_config.auth_key,
algorithm=security_config.auth_algorithm,
)
murfey_db = get_murfey_db_session(security_config)

# Purge the queue and repost/reinject any messages found
dlq_dump_path = Path(args.dir)
Expand All @@ -152,7 +187,7 @@ def run():
security_config.feedback_queue,
security_config.rabbitmq_credentials,
)
handle_failed_posts(exported_messages, token)
handle_failed_posts(exported_messages, murfey_db)
handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials)

# Clean up any created directories
Expand Down
7 changes: 5 additions & 2 deletions src/murfey/server/api/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ class DCGroupParameters(BaseModel):
"/visits/{visit_name}/sessions/{session_id}/register_data_collection_group"
)
def register_dc_group(
visit_name, session_id: MurfeySessionID, dcg_params: DCGroupParameters, db=murfey_db
visit_name: str,
session_id: MurfeySessionID,
dcg_params: DCGroupParameters,
db=murfey_db,
):
ispyb_proposal_code = visit_name[:2]
ispyb_proposal_number = visit_name.split("-")[0][2:]
Expand Down Expand Up @@ -201,7 +204,7 @@ class DCParameters(BaseModel):

@router.post("/visits/{visit_name}/sessions/{session_id}/start_data_collection")
def start_dc(
visit_name, session_id: MurfeySessionID, dc_params: DCParameters, db=murfey_db
visit_name: str, session_id: MurfeySessionID, dc_params: DCParameters, db=murfey_db
):
ispyb_proposal_code = visit_name[:2]
ispyb_proposal_number = visit_name.split("-")[0][2:]
Expand Down
Loading