From 7da87a355c36518a8d92a35f71b8584607cc73b5 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 14:47:42 +0100 Subject: [PATCH 01/12] Rationalise the api --- src/murfey/server/api/file_io_instrument.py | 4 +-- src/murfey/server/api/instrument.py | 2 +- src/murfey/server/api/session_control.py | 6 ++-- src/murfey/server/api/session_info.py | 8 +++-- src/murfey/server/api/workflow.py | 26 +++++++++------ src/murfey/util/route_manifest.yaml | 36 ++++++++++----------- 6 files changed, 46 insertions(+), 36 deletions(-) diff --git a/src/murfey/server/api/file_io_instrument.py b/src/murfey/server/api/file_io_instrument.py index 99d55e0d5..7abb81726 100644 --- a/src/murfey/server/api/file_io_instrument.py +++ b/src/murfey/server/api/file_io_instrument.py @@ -34,7 +34,7 @@ class SuggestedPathParameters(BaseModel): extra_directory: str = "" -@router.post("/visits/{visit_name}/{session_id}/suggested_path") +@router.post("/visits/{visit_name}/sessions/{session_id}/suggested_path") def suggest_path( visit_name: str, session_id: int, params: SuggestedPathParameters, db=murfey_db ): @@ -125,7 +125,7 @@ class FractionationParameters(BaseModel): fractionation_file_name: str = "eer_fractionation.txt" -@router.post("/visits/{visit_name}/{session_id}/eer_fractionation_file") +@router.post("/visits/{visit_name}/sessions/{session_id}/eer_fractionation_file") async def write_eer_fractionation_file( visit_name: str, session_id: int, diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index 7d5ae6d1f..699dc01c6 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -353,7 +353,7 @@ async def request_gain_reference_upload( return data -@router.post("/visits/{visit_name}/{session_id}/upstream_tiff_data_request") +@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request") async def request_upstream_tiff_data_download( visit_name: str, session_id: MurfeySessionID, db=murfey_db ): diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index 1a5702904..f5eec9e14 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -424,7 +424,9 @@ async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db): return upstream_visits -@correlative_router.get("/visits/{visit_name}/{session_id}/upstream_tiff_paths") +@correlative_router.get( + "/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths" +) async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): """ Looks for TIFF files associated with the current session in the permitted storage @@ -446,7 +448,7 @@ async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): @correlative_router.get( - "/visits/{visit_name}/{session_id}/upstream_tiff/{tiff_path:path}" + "/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}" ) async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): instrument_name = ( diff --git a/src/murfey/server/api/session_info.py b/src/murfey/server/api/session_info.py index d44507ad8..474b8ae89 100644 --- a/src/murfey/server/api/session_info.py +++ b/src/murfey/server/api/session_info.py @@ -135,7 +135,7 @@ class SessionClients(BaseModel): clients: List[ClientEnvironment] -@router.get("/session/{session_id}") +@router.get("/sessions/{session_id}") async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClients: session = db.exec(select(Session).where(Session.id == session_id)).one() clients = db.exec( @@ -419,7 +419,9 @@ async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db): return upstream_visits -@correlative_router.get("/visits/{visit_name}/{session_id}/upstream_tiff_paths") +@correlative_router.get( + "/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths" +) async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): """ Looks for TIFF files associated with the current session in the permitted storage @@ -441,7 +443,7 @@ async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): @correlative_router.get( - "/visits/{visit_name}/{session_id}/upstream_tiff/{tiff_path:path}" + "/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}" ) async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): instrument_name = ( diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index af9c314f0..fdaa9f662 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -90,7 +90,9 @@ class DCGroupParameters(BaseModel): atlas_pixel_size: float = 0 -@router.post("/visits/{visit_name}/{session_id}/register_data_collection_group") +@router.post( + "/visits/{visit_name}/sessions/{session_id}/register_data_collection_group" +) def register_dc_group( visit_name, session_id: MurfeySessionID, dcg_params: DCGroupParameters, db=murfey_db ): @@ -197,7 +199,7 @@ class DCParameters(BaseModel): data_collection_tag: str = "" -@router.post("/visits/{visit_name}/{session_id}/start_data_collection") +@router.post("/visits/{visit_name}/sessions/{session_id}/start_data_collection") def start_dc( visit_name, session_id: MurfeySessionID, dc_params: DCParameters, db=murfey_db ): @@ -263,7 +265,7 @@ class ProcessingJobParameters(BaseModel): experiment_type: str = "spa" -@router.post("/visits/{visit_name}/{session_id}/register_processing_job") +@router.post("/visits/{visit_name}/sessions/{session_id}/register_processing_job") def register_proc( visit_name: str, session_id: MurfeySessionID, @@ -346,7 +348,7 @@ class Tag(BaseModel): tag: str -@spa_router.post("/visits/{visit_name}/{session_id}/flush_spa_processing") +@spa_router.post("/visits/{visit_name}/sessions/{session_id}/flush_spa_processing") def flush_spa_processing( visit_name: str, session_id: MurfeySessionID, tag: Tag, db=murfey_db ): @@ -378,7 +380,7 @@ class SPAProcessFile(BaseModel): source: str = "" -@spa_router.post("/visits/{visit_name}/{session_id}/spa_preprocess") +@spa_router.post("/visits/{visit_name}/sessions/{session_id}/spa_preprocess") async def request_spa_preprocessing( visit_name: str, session_id: MurfeySessionID, @@ -553,7 +555,9 @@ class Source(BaseModel): rsync_source: str -@tomo_router.post("/visits/{visit_name}/{session_id}/flush_tomography_processing") +@tomo_router.post( + "/visits/{visit_name}/sessions/{session_id}/flush_tomography_processing" +) def flush_tomography_processing( visit_name: str, session_id: MurfeySessionID, rsync_source: Source, db=murfey_db ): @@ -638,7 +642,7 @@ class TomoProcessFile(BaseModel): group_tag: Optional[str] = None -@tomo_router.post("/visits/{visit_name}/{session_id}/tomography_preprocess") +@tomo_router.post("/visits/{visit_name}/sessions/{session_id}/tomography_preprocess") async def request_tomography_preprocessing( visit_name: str, session_id: MurfeySessionID, @@ -741,7 +745,7 @@ async def request_tomography_preprocessing( return proc_file -@tomo_router.post("/visits/{visit_name}/{session_id}/completed_tilt_series") +@tomo_router.post("/visits/{visit_name}/sesisons{session_id}/completed_tilt_series") def register_completed_tilt_series( visit_name: str, session_id: MurfeySessionID, @@ -872,7 +876,7 @@ class TiltInfo(BaseModel): source: str -@tomo_router.post("/visits/{visit_name}/{session_id}/tilt") +@tomo_router.post("/visits/{visit_name}/sessions{session_id}/tilt") async def register_tilt( visit_name: str, session_id: MurfeySessionID, tilt_info: TiltInfo, db=murfey_db ): @@ -1006,7 +1010,9 @@ class MillingParameters(BaseModel): raw_directory: str -@correlative_router.post("/visits/{year}/{visit_name}/{session_id}/make_milling_gif") +@correlative_router.post( + "/year/{year}/visits/{visit_name}/sessions/{session_id}/make_milling_gif" +) async def make_gif( year: int, visit_name: str, diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index de173cbd7..72e6e859a 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -436,7 +436,7 @@ murfey.server.api.file_io_frontend.router: methods: - POST murfey.server.api.file_io_instrument.router: - - path: /file_io/instrument/visits/{visit_name}/{session_id}/suggested_path + - path: /file_io/instrument/visits/{visit_name}/sessions/{session_id}/suggested_path function: suggest_path path_params: - name: visit_name @@ -457,7 +457,7 @@ murfey.server.api.file_io_instrument.router: path_params: [] methods: - POST - - path: /file_io/instrument/visits/{visit_name}/{session_id}/eer_fractionation_file + - path: /file_io/instrument/visits/{visit_name}/sessions/{session_id}/eer_fractionation_file function: write_eer_fractionation_file path_params: - name: visit_name @@ -542,7 +542,7 @@ murfey.server.api.instrument.router: path_params: [] methods: - POST - - path: /instrument_server/visits/{visit_name}/{session_id}/upstream_tiff_data_request + - path: /instrument_server/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request function: request_upstream_tiff_data_download path_params: - name: visit_name @@ -671,7 +671,7 @@ murfey.server.api.session_control.correlative_router: path_params: [] methods: - GET - - path: /session_control/correlative/visits/{visit_name}/{session_id}/upstream_tiff_paths + - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths function: gather_upstream_tiffs path_params: - name: visit_name @@ -680,7 +680,7 @@ murfey.server.api.session_control.correlative_router: type: int methods: - GET - - path: /session_control/correlative/visits/{visit_name}/{session_id}/upstream_tiff/{tiff_path:path} + - path: /session_control/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path} function: get_tiff path_params: - name: visit_name @@ -861,7 +861,7 @@ murfey.server.api.session_info.correlative_router: path_params: [] methods: - GET - - path: /session_info/correlative/visits/{visit_name}/{session_id}/upstream_tiff_paths + - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths function: gather_upstream_tiffs path_params: - name: visit_name @@ -870,7 +870,7 @@ murfey.server.api.session_info.correlative_router: type: int methods: - GET - - path: /session_info/correlative/visits/{visit_name}/{session_id}/upstream_tiff/{tiff_path:path} + - path: /session_info/correlative/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path} function: get_tiff path_params: - name: visit_name @@ -918,7 +918,7 @@ murfey.server.api.session_info.router: path_params: [] methods: - GET - - path: /session_info/session/{session_id} + - path: /session_info/sessions/{session_id} function: get_session path_params: [] methods: @@ -1105,7 +1105,7 @@ murfey.server.api.workflow.correlative_router: type: str methods: - POST - - path: /workflow/correlative/visits/{year}/{visit_name}/{session_id}/make_milling_gif + - path: /workflow/correlative/year/{year}/visits/{visit_name}/sessions/{session_id}/make_milling_gif function: make_gif path_params: - name: year @@ -1117,21 +1117,21 @@ murfey.server.api.workflow.correlative_router: methods: - POST murfey.server.api.workflow.router: - - path: /workflow/visits/{visit_name}/{session_id}/register_data_collection_group + - path: /workflow/visits/{visit_name}/sessions/{session_id}/register_data_collection_group function: register_dc_group path_params: - name: visit_name type: typing.Any methods: - POST - - path: /workflow/visits/{visit_name}/{session_id}/start_data_collection + - path: /workflow/visits/{visit_name}/sessions/{session_id}/start_data_collection function: start_dc path_params: - name: visit_name type: typing.Any methods: - POST - - path: /workflow/visits/{visit_name}/{session_id}/register_processing_job + - path: /workflow/visits/{visit_name}/sessions/{session_id}/register_processing_job function: register_proc path_params: - name: visit_name @@ -1144,14 +1144,14 @@ murfey.server.api.workflow.spa_router: path_params: [] methods: - POST - - path: /workflow/spa/visits/{visit_name}/{session_id}/flush_spa_processing + - path: /workflow/spa/visits/{visit_name}/sessions/{session_id}/flush_spa_processing function: flush_spa_processing path_params: - name: visit_name type: str methods: - POST - - path: /workflow/spa/visits/{visit_name}/{session_id}/spa_preprocess + - path: /workflow/spa/visits/{visit_name}/sessions/{session_id}/spa_preprocess function: request_spa_preprocessing path_params: - name: visit_name @@ -1164,7 +1164,7 @@ murfey.server.api.workflow.tomo_router: path_params: [] methods: - POST - - path: /workflow/tomo/visits/{visit_name}/{session_id}/flush_tomography_processing + - path: /workflow/tomo/visits/{visit_name}/sessions/{session_id}/flush_tomography_processing function: flush_tomography_processing path_params: - name: visit_name @@ -1185,14 +1185,14 @@ murfey.server.api.workflow.tomo_router: type: int methods: - POST - - path: /workflow/tomo/visits/{visit_name}/{session_id}/tomography_preprocess + - path: /workflow/tomo/visits/{visit_name}/sessions/{session_id}/tomography_preprocess function: request_tomography_preprocessing path_params: - name: visit_name type: str methods: - POST - - path: /workflow/tomo/visits/{visit_name}/{session_id}/completed_tilt_series + - path: /workflow/tomo/visits/{visit_name}/sessions/{session_id}/completed_tilt_series function: register_completed_tilt_series path_params: - name: visit_name @@ -1206,7 +1206,7 @@ murfey.server.api.workflow.tomo_router: type: str methods: - POST - - path: /workflow/tomo/visits/{visit_name}/{session_id}/tilt + - path: /workflow/tomo/visits/{visit_name}/sessions/{session_id}/tilt function: register_tilt path_params: - name: visit_name From 2d1298003c81883f10bc18067674cc9320f10b06 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 15:15:28 +0100 Subject: [PATCH 02/12] All client posts should be captured --- src/murfey/client/contexts/atlas.py | 6 +- src/murfey/client/contexts/fib.py | 9 +- src/murfey/client/contexts/spa.py | 12 +-- src/murfey/client/contexts/spa_metadata.py | 9 +- src/murfey/client/contexts/tomo.py | 17 ++-- src/murfey/client/contexts/tomo_metadata.py | 5 +- src/murfey/client/multigrid_control.py | 49 +++++------ src/murfey/client/tui/app.py | 51 ++++++----- src/murfey/client/tui/main.py | 17 ++-- src/murfey/client/tui/screens.py | 96 ++++++++++----------- src/murfey/util/client.py | 12 +-- 11 files changed, 119 insertions(+), 164 deletions(-) diff --git a/src/murfey/client/contexts/atlas.py b/src/murfey/client/contexts/atlas.py index 0dcac02bb..7f98fe312 100644 --- a/src/murfey/client/contexts/atlas.py +++ b/src/murfey/client/contexts/atlas.py @@ -2,19 +2,15 @@ from pathlib import Path from typing import Optional -import requests - from murfey.client.context import Context from murfey.client.contexts.spa import _get_source from murfey.client.contexts.spa_metadata import _atlas_destination from murfey.client.instance_environment import MurfeyInstanceEnvironment from murfey.util.api import url_path_for -from murfey.util.client import authorised_requests, capture_post +from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.atlas") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - class AtlasContext(Context): def __init__(self, acquisition_software: str, basepath: Path): diff --git a/src/murfey/client/contexts/fib.py b/src/murfey/client/contexts/fib.py index 43f1ce7a2..9b83c5e5f 100644 --- a/src/murfey/client/contexts/fib.py +++ b/src/murfey/client/contexts/fib.py @@ -5,18 +5,15 @@ from pathlib import Path from typing import Dict, List, NamedTuple, Optional -import requests import xmltodict from murfey.client.context import Context from murfey.client.instance_environment import MurfeyInstanceEnvironment from murfey.util.api import url_path_for -from murfey.util.client import authorised_requests +from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.fib") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - class Lamella(NamedTuple): name: str @@ -95,8 +92,8 @@ def post_transfer( environment.default_destinations[self._basepath] ).name # post gif list to gif making API call - requests.post( - f"{environment.url.geturl()}{url_path_for('workflow.correlative_router', 'make_gif', year=datetime.now().year, visit_name=environment.visit, session_id=environment.murfey_session)}", + capture_post( + url=f"{environment.url.geturl()}{url_path_for('workflow.correlative_router', 'make_gif', year=datetime.now().year, visit_name=environment.visit, session_id=environment.murfey_session)}", json={ "lamella_number": lamella_number, "images": gif_list, diff --git a/src/murfey/client/contexts/spa.py b/src/murfey/client/contexts/spa.py index 99a182ea5..928acd423 100644 --- a/src/murfey/client/contexts/spa.py +++ b/src/murfey/client/contexts/spa.py @@ -5,7 +5,6 @@ from pathlib import Path from typing import Any, Dict, List, Optional, OrderedDict, Tuple -import requests import xmltodict from murfey.client.context import Context, ProcessingParameter @@ -15,12 +14,7 @@ MurfeyInstanceEnvironment, ) from murfey.util.api import url_path_for -from murfey.util.client import ( - authorised_requests, - capture_get, - capture_post, - get_machine_config_client, -) +from murfey.util.client import capture_get, capture_post, get_machine_config_client from murfey.util.spa_metadata import ( foil_hole_data, foil_hole_from_file, @@ -31,8 +25,6 @@ logger = logging.getLogger("murfey.client.contexts.spa") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - def _file_transferred_to( environment: MurfeyInstanceEnvironment, source: Path, file_path: Path @@ -304,7 +296,7 @@ def _position_analysis( Optional[float], ] = (None, None, None, None, None, None, None) data_collection_group = ( - requests.get( + capture_get( f"{environment.url.geturl()}{url_path_for('session_info.router', 'get_dc_groups', session_id=environment.murfey_session)}" ) .json() diff --git a/src/murfey/client/contexts/spa_metadata.py b/src/murfey/client/contexts/spa_metadata.py index 4f23448f4..a2062d0f3 100644 --- a/src/murfey/client/contexts/spa_metadata.py +++ b/src/murfey/client/contexts/spa_metadata.py @@ -2,18 +2,13 @@ from pathlib import Path from typing import Dict, Optional -import requests import xmltodict from murfey.client.context import Context from murfey.client.contexts.spa import _file_transferred_to, _get_source from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo from murfey.util.api import url_path_for -from murfey.util.client import ( - authorised_requests, - capture_post, - get_machine_config_client, -) +from murfey.util.client import capture_post, get_machine_config_client from murfey.util.spa_metadata import ( FoilHoleInfo, get_grid_square_atlas_positions, @@ -22,8 +17,6 @@ logger = logging.getLogger("murfey.client.contexts.spa_metadata") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - def _foil_hole_positions(xml_path: Path, grid_square: int) -> Dict[str, FoilHoleInfo]: with open(xml_path, "r") as xml: diff --git a/src/murfey/client/contexts/tomo.py b/src/murfey/client/contexts/tomo.py index a9204259e..158498b84 100644 --- a/src/murfey/client/contexts/tomo.py +++ b/src/murfey/client/contexts/tomo.py @@ -5,7 +5,6 @@ from threading import RLock from typing import Callable, Dict, List, OrderedDict -import requests import xmltodict import murfey.util.eer @@ -17,17 +16,11 @@ MurfeyInstanceEnvironment, ) from murfey.util.api import url_path_for -from murfey.util.client import ( - authorised_requests, - capture_post, - get_machine_config_client, -) +from murfey.util.client import capture_get, capture_post, get_machine_config_client from murfey.util.mdoc import get_block, get_global_data, get_num_blocks logger = logging.getLogger("murfey.client.contexts.tomo") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - def _get_tilt_angle_v5_7(p: Path) -> str: return p.name.split("[")[1].split("]")[0] @@ -313,8 +306,8 @@ def _add_tilt( eer_fractionation_file = None if self.data_collection_parameters.get("num_eer_frames"): - response = requests.post( - f"{str(environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=environment.visit, session_id=environment.murfey_session)}", + response = capture_post( + url=f"{str(environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=environment.visit, session_id=environment.murfey_session)}", json={ "num_frames": self.data_collection_parameters["num_eer_frames"], "fractionation": self.data_collection_parameters[ @@ -550,8 +543,8 @@ def gather_metadata( superres_binning = int(mdoc_data_block["Binning"]) binning_factor = 1 if environment: - server_config = requests.get( - f"{str(environment.url.geturl())}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" + server_config = capture_get( + url=f"{str(environment.url.geturl())}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" ).json() if ( server_config.get("superres") diff --git a/src/murfey/client/contexts/tomo_metadata.py b/src/murfey/client/contexts/tomo_metadata.py index 8a23d676d..2ffb6b619 100644 --- a/src/murfey/client/contexts/tomo_metadata.py +++ b/src/murfey/client/contexts/tomo_metadata.py @@ -2,7 +2,6 @@ from pathlib import Path from typing import Optional -import requests import xmltodict from murfey.client.context import Context @@ -10,12 +9,10 @@ from murfey.client.contexts.spa_metadata import _atlas_destination from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo from murfey.util.api import url_path_for -from murfey.util.client import authorised_requests, capture_post +from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.tomo_metadata") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - def ensure_dcg_exists(transferred_file: Path, environment: MurfeyInstanceEnvironment): # Make sure we have a data collection group diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 26bf67f35..bd93a1223 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -10,8 +10,6 @@ from typing import Dict, List, Optional from urllib.parse import quote, urlparse -import requests - import murfey.client.websocket from murfey.client.analyser import Analyser from murfey.client.contexts.spa import SPAModularContext @@ -22,7 +20,12 @@ from murfey.client.watchdir import DirWatcher from murfey.util import posix_path from murfey.util.api import url_path_for -from murfey.util.client import capture_delete, capture_post, get_machine_config_client +from murfey.util.client import ( + capture_delete, + capture_get, + capture_post, + get_machine_config_client, +) log = logging.getLogger("murfey.client.multigrid_control") @@ -53,18 +56,8 @@ class MultigridController: visit_end_time: Optional[datetime] = None def __post_init__(self): - if self.token: - requests.get = partial( - requests.get, headers={"Authorization": f"Bearer {self.token}"} - ) - requests.post = partial( - requests.post, headers={"Authorization": f"Bearer {self.token}"} - ) - requests.delete = partial( - requests.delete, headers={"Authorization": f"Bearer {self.token}"} - ) - machine_data = requests.get( - f"{self.murfey_url}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}" + machine_data = capture_get( + url=f"{self.murfey_url}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}" ).json() self.rsync_url = machine_data.get("rsync_url", "") self.rsync_module = machine_data.get("rsync_module", "data") @@ -106,8 +99,8 @@ def __post_init__(self): # Calculate the time offset between the client and the server current_time = datetime.now() - server_timestamp = requests.get( - f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" + server_timestamp = capture_get( + url=f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" ).json()["timestamp"] self.server_time_offset = current_time - datetime.fromtimestamp( server_timestamp @@ -235,8 +228,8 @@ def _start_rsyncer_multigrid( log.info(f"Starting multigrid rsyncer: {source}") log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}") destination_overrides = destination_overrides or {} - machine_data = requests.get( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}" + machine_data = capture_get( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}" ).json() if destination_overrides.get(source): destination = ( @@ -289,7 +282,7 @@ def _start_rsyncer_multigrid( def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): if explicit_stop: remove_url = f"{self.murfey_url}{url_path_for('session_control.router', 'delete_rsyncer', session_id=self.session_id)}?source={quote(str(source), safe='')}" - requests.delete(remove_url) + capture_delete(url=remove_url) else: stop_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_stopped_rsyncer', session_id=self.session_id)}" capture_post(stop_url, json={"path": str(source)}) @@ -415,7 +408,7 @@ def rsync_result(update: RSyncerUpdate): "transferring": self.do_transfer or self._environment.demo, "tag": tag, } - requests.post(url, json=rsyncer_data) + capture_post(url=url, json=rsyncer_data) self._environment.watchers[source] = DirWatcher(source, settling_time=30) if not self.analysers.get(source) and analyse: @@ -519,8 +512,8 @@ def _start_dc(self, metadata_json, from_form: bool = False): log.info("Registering tomography processing parameters") if context.data_collection_parameters.get("num_eer_frames"): - eer_response = requests.post( - f"{str(self._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self._environment.visit, session_id=self._environment.murfey_session)}", + eer_response = capture_post( + url=f"{str(self._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self._environment.visit, session_id=self._environment.murfey_session)}", json={ "num_frames": context.data_collection_parameters[ "num_eer_frames" @@ -637,7 +630,7 @@ def _increment_file_count( "increment_count": len(observed_files), "increment_data_count": num_data_files, } - requests.post(url, json=data) + capture_post(url=url, json=data) # Prometheus can handle higher traffic so update for every transferred file rather # than batching as we do for the Murfey database updates in _increment_transferred_files @@ -664,7 +657,7 @@ def _increment_transferred_files_prometheus( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - requests.post(url, json=data) + capture_post(url=url, json=data) def _increment_transferred_files( self, @@ -674,8 +667,8 @@ def _increment_transferred_files( destination: str, ): skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}" - requests.post( - skip_url, + capture_post( + url=skip_url, json={ "source": source, "session_id": self.session_id, @@ -706,4 +699,4 @@ def _increment_transferred_files( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - requests.post(url, json=data) + capture_post(url=url, json=data) diff --git a/src/murfey/client/tui/app.py b/src/murfey/client/tui/app.py index 7c947dc54..6dca90c4f 100644 --- a/src/murfey/client/tui/app.py +++ b/src/murfey/client/tui/app.py @@ -9,7 +9,6 @@ from typing import Awaitable, Callable, Dict, List, OrderedDict, TypeVar from urllib.parse import urlparse -import requests from textual.app import App from textual.reactive import reactive from textual.widgets import Button, Input @@ -36,6 +35,8 @@ from murfey.util import posix_path from murfey.util.api import url_path_for from murfey.util.client import ( + capture_delete, + capture_get, capture_post, get_machine_config_client, read_config, @@ -49,10 +50,6 @@ token = read_config()["Murfey"].get("token", "") instrument_name = read_config()["Murfey"].get("instrument_name", "") -requests.get = partial(requests.get, headers={"Authorization": f"Bearer {token}"}) -requests.post = partial(requests.post, headers={"Authorization": f"Bearer {token}"}) -requests.delete = partial(requests.delete, headers={"Authorization": f"Bearer {token}"}) - class MurfeyTUI(App): CSS_PATH = "controller.css" @@ -155,8 +152,8 @@ def _start_rsyncer_multigrid( ): log.info(f"starting multigrid rsyncer: {source}") destination_overrides = destination_overrides or {} - machine_data = requests.get( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + machine_data = capture_get( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" ).json() if destination_overrides.get(source): destination = destination_overrides[source] + f"/{extra_directory}" @@ -281,7 +278,7 @@ def rsync_result(update: RSyncerUpdate): "session_id": self._environment.murfey_session, "transferring": self._do_transfer, } - requests.post(url, json=rsyncer_data) + capture_post(url=url, json=rsyncer_data) self._environment.watchers[source] = DirWatcher(source, settling_time=30) @@ -360,7 +357,7 @@ def _increment_file_count( "increment_count": len(observed_files), "increment_data_count": num_data_files, } - requests.post(url, json=data) + capture_post(url=url, json=data) # Prometheus can handle higher traffic so update for every transferred file rather # than batching as we do for the Murfey database updates in _increment_transferred_files @@ -387,7 +384,7 @@ def _increment_transferred_files_prometheus( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - requests.post(url, json=data) + capture_post(url=url, json=data) def _increment_transferred_files( self, updates: List[RSyncerUpdate], source: str, destination: str @@ -415,7 +412,7 @@ def _increment_transferred_files( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - requests.post(url, json=data) + capture_post(url=url, json=data) def _set_register_dc(self, response: str): if response == "y": @@ -490,8 +487,8 @@ def _start_dc(self, metadata_json, from_form: bool = False): log.info("Registering tomography processing parameters") if context.data_collection_parameters.get("num_eer_frames"): - eer_response = requests.post( - f"{str(self.app._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", + eer_response = capture_post( + url=f"{str(self.app._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", json={ "num_frames": context.data_collection_parameters[ "num_eer_frames" @@ -503,8 +500,8 @@ def _start_dc(self, metadata_json, from_form: bool = False): ) eer_fractionation_file = eer_response.json()["eer_fractionation_file"] metadata_json.update({"eer_fractionation_file": eer_fractionation_file}) - requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self.app._environment.murfey_session)}", + capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self.app._environment.murfey_session)}", json=metadata_json, ) capture_post( @@ -620,8 +617,8 @@ async def on_button_pressed(self, event: Button.Pressed): await self.reset() async def on_mount(self) -> None: - exisiting_sessions = requests.get( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'get_sessions')}" + exisiting_sessions = capture_get( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'get_sessions')}" ).json() if self.visits: self.install_screen(VisitSelection(self.visits), "visit-select-screen") @@ -666,8 +663,8 @@ async def reset(self): if self.rsync_processes and machine_config.get("allow_removal"): sources = "\n".join(str(k) for k in self.rsync_processes.keys()) prompt = f"Remove files from the following:\n {sources} \n" - rsync_instances = requests.get( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'get_rsyncers_for_session', session_id=self._environment.murfey_session)}" + rsync_instances = capture_get( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'get_rsyncers_for_session', session_id=self._environment.murfey_session)}" ).json() prompt += f"Copied {sum(r['files_counted'] for r in rsync_instances)} / {sum(r['files_transferred'] for r in rsync_instances)}" self.install_screen( @@ -690,8 +687,8 @@ async def action_quit(self) -> None: self.exit() async def action_remove_session(self) -> None: - requests.delete( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" + capture_delete( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" ) if self.rsync_processes: for rp in self.rsync_processes.values(): @@ -704,8 +701,8 @@ async def action_remove_session(self) -> None: self.exit() def clean_up_quit(self) -> None: - requests.delete( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" + capture_delete( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" ) self.exit() @@ -747,11 +744,11 @@ def _remove_data(self, listener: Callable[..., Awaitable[None] | None], **kwargs removal_rp.queue.put(f) removal_rp.stop() log.info(f"rsyncer {rp} rerun with removal") - requests.post( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'register_processing_success_in_ispyb', session_id=self._environment.murfey_session)}" + capture_post( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'register_processing_success_in_ispyb', session_id=self._environment.murfey_session)}" ) - requests.delete( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" + capture_delete( + url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" ) self.exit() diff --git a/src/murfey/client/tui/main.py b/src/murfey/client/tui/main.py index 68ff64890..4cb04809a 100644 --- a/src/murfey/client/tui/main.py +++ b/src/murfey/client/tui/main.py @@ -15,7 +15,6 @@ from typing import Literal from urllib.parse import ParseResult, urlparse -import requests from rich.prompt import Confirm import murfey.client.update @@ -26,23 +25,21 @@ from murfey.client.tui.app import MurfeyTUI from murfey.client.tui.status_bar import StatusBar from murfey.util.api import url_path_for -from murfey.util.client import authorised_requests, read_config +from murfey.util.client import capture_get, read_config from murfey.util.models import Visit log = logging.getLogger("murfey.client") -requests.get, requests.post, requests.put, requests.delete = authorised_requests() - def _get_visit_list(api_base: ParseResult, instrument_name: str): proxy_path = api_base.path.rstrip("/") get_visits_url = api_base._replace( path=f"{proxy_path}{url_path_for('session_control.router', 'get_current_visits', instrument_name=instrument_name)}" ) - server_reply = requests.get(get_visits_url.geturl()) + server_reply = capture_get(url=get_visits_url.geturl()) if server_reply.status_code != 200: raise ValueError(f"Server unreachable ({server_reply.status_code})") - return [Visit.parse_obj(v) for v in server_reply.json()] + return [Visit.model_validate(v) for v in server_reply.json()] def write_config(config: configparser.ConfigParser): @@ -274,8 +271,8 @@ def run(): rich_handler.setLevel(logging.DEBUG if args.debug else logging.INFO) # Set up websocket app and handler - client_id_response = requests.get( - f"{murfey_url.geturl()}{url_path_for('session_control.router', 'new_client_id')}" + client_id_response = capture_get( + url=f"{murfey_url.geturl()}{url_path_for('session_control.router', 'new_client_id')}" ) if client_id_response.status_code == 401: exit( @@ -303,8 +300,8 @@ def run(): log.info("Starting Websocket connection") # Load machine data for subsequent sections - machine_data = requests.get( - f"{murfey_url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + machine_data = capture_get( + url=f"{murfey_url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" ).json() gain_ref: Path | None = None diff --git a/src/murfey/client/tui/screens.py b/src/murfey/client/tui/screens.py index 2dec96595..23958e48f 100644 --- a/src/murfey/client/tui/screens.py +++ b/src/murfey/client/tui/screens.py @@ -18,7 +18,6 @@ TypeVar, ) -import requests from pydantic import BaseModel, ValidationError from rich.box import SQUARE from rich.panel import Panel @@ -57,7 +56,13 @@ from murfey.client.rsync import RSyncer from murfey.util import posix_path from murfey.util.api import url_path_for -from murfey.util.client import capture_post, get_machine_config_client, read_config +from murfey.util.client import ( + capture_delete, + capture_get, + capture_post, + get_machine_config_client, + read_config, +) from murfey.util.models import ProcessingParametersSPA, ProcessingParametersTomo log = logging.getLogger("murfey.tui.screens") @@ -67,10 +72,6 @@ token = read_config()["Murfey"].get("token", "") instrument_name = read_config()["Murfey"].get("instrument_name", "") -requests.get = partial(requests.get, headers={"Authorization": f"Bearer {token}"}) -requests.post = partial(requests.post, headers={"Authorization": f"Bearer {token}"}) -requests.delete = partial(requests.delete, headers={"Authorization": f"Bearer {token}"}) - def determine_default_destination( visit: str, @@ -83,8 +84,8 @@ def determine_default_destination( include_mid_path: bool = True, use_suggested_path: bool = True, ) -> str: - machine_data = requests.get( - f"{environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" + machine_data = capture_get( + url=f"{environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" ).json() _default = "" if environment.processing_only_mode and environment.sources: @@ -264,8 +265,8 @@ def __init__( def compose(self): - machine_data = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + machine_data = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" ).json() self._dir_tree = _DirectoryTree( str(self._selected_dir), @@ -469,17 +470,17 @@ def _write_params( self.app.query_one("#info").write(f"{k.label}: {params.get(k.name)}") self.app._start_dc(params) if model == ProcessingParametersTomo: - requests.post( - f"{self.app._environment.url.geturl()}/{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self.app._environment.murfey_session)}", + capture_post( + url=f"{self.app._environment.url.geturl()}/{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self.app._environment.murfey_session)}", json=params, ) elif model == ProcessingParametersSPA: - requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'register_spa_proc_params', session_id=self.app._environment.murfey_session)}", + capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'register_spa_proc_params', session_id=self.app._environment.murfey_session)}", json=params, ) - requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'flush_spa_processing', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", + capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'flush_spa_processing', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", ) def on_switch_changed(self, event): @@ -621,17 +622,17 @@ def on_button_pressed(self, event: Button.Pressed): session_id = self.app._environment.murfey_session self.app.pop_screen() session_name = "Client connection" - self.app._environment.murfey_session = requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'link_client_to_session', instrument_name=self.app._environment.instrument_name, client_id=self.app._environment.client_id)}", + self.app._environment.murfey_session = capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'link_client_to_session', instrument_name=self.app._environment.instrument_name, client_id=self.app._environment.client_id)}", json={"session_id": session_id, "session_name": session_name}, ).json() def _remove_session(self, session_id: int, **kwargs): - requests.delete( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=session_id)}" + capture_delete( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=session_id)}" ) - exisiting_sessions = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'get_sessions')}" + exisiting_sessions = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'get_sessions')}" ).json() self.app.uninstall_screen("session-select-screen") if exisiting_sessions: @@ -674,13 +675,13 @@ def on_button_pressed(self, event: Button.Pressed): text = str(event.button.label) self.app._visit = text self.app._environment.visit = text - response = requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_client_to_visit', visit_name=text)}", + response = capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_client_to_visit', visit_name=text)}", json={"id": self.app._environment.client_id}, ) log.info(f"Posted visit registration: {response.status_code}") - machine_data = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + machine_data = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" ).json() if self._switch_status: @@ -713,8 +714,8 @@ def on_button_pressed(self, event: Button.Pressed): self.app.push_screen("launcher") if machine_data.get("upstream_data_directories"): - upstream_downloads = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'find_upstream_visits', session_id=self.app._environment.murfey_session)}" + upstream_downloads = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'find_upstream_visits', session_id=self.app._environment.murfey_session)}" ).json() self.app.install_screen( UpstreamDownloads(upstream_downloads), "upstream-downloads" @@ -741,13 +742,13 @@ def on_button_pressed(self, event: Button.Pressed): text = str(self.visit_name) self.app._visit = text self.app._environment.visit = text - response = requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_client_to_visit', visit_name=text)}", + response = capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_client_to_visit', visit_name=text)}", json={"id": self.app._environment.client_id}, ) log.info(f"Posted visit registration: {response.status_code}") - machine_data = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + machine_data = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" ).json() self.app.install_screen( @@ -775,8 +776,8 @@ def on_button_pressed(self, event: Button.Pressed): self.app.push_screen("directory-select") if machine_data.get("upstream_data_directories"): - upstream_downloads = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'find_upstream_visits', session_id=self.app._environment.murfey_session)}" + upstream_downloads = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'find_upstream_visits', session_id=self.app._environment.murfey_session)}" ).json() self.app.install_screen( UpstreamDownloads(upstream_downloads), "upstream-downloads" @@ -797,8 +798,8 @@ def compose(self): yield Button("Skip", classes="btn-directory") def on_button_pressed(self, event: Button.Pressed): - machine_data = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + machine_data = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" ).json() if machine_data.get("upstream_data_download_directory"): # Create the directory locally to save files to @@ -808,8 +809,8 @@ def on_button_pressed(self, event: Button.Pressed): download_dir.mkdir(exist_ok=True) # Get the paths to the TIFF files generated previously under the same session ID - upstream_tiff_paths_response = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'gather_upstream_tiffs', visit_name=event.button.label, session_id=self.app._environment.murfey_session)}" + upstream_tiff_paths_response = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'gather_upstream_tiffs', visit_name=event.button.label, session_id=self.app._environment.murfey_session)}" ) upstream_tiff_paths = upstream_tiff_paths_response.json() or [] @@ -817,9 +818,8 @@ def on_button_pressed(self, event: Button.Pressed): for tp in upstream_tiff_paths: (download_dir / tp).parent.mkdir(exist_ok=True, parents=True) # Write TIFF to the specified file path - stream_response = requests.get( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff', visit_name=event.button.label, session_id=self.app._environment.murfey_session, tiff_path=tp)}", - stream=True, + stream_response = capture_get( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff', visit_name=event.button.label, session_id=self.app._environment.murfey_session, tiff_path=tp)}", ) # Write the file chunk-by-chunk to avoid hogging memory with open(download_dir / tp, "wb") as utiff: @@ -881,7 +881,7 @@ def on_button_pressed(self, event): log.warning( f"Gain reference file {posix_path(self._dir_tree._gain_reference)!r} was not successfully transferred to {visit_path}/processing" ) - process_gain_response = requests.post( + process_gain_response = capture_post( url=f"{str(self.app._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'process_gain', session_id=self.app._environment.murfey_session)}", json={ "gain_ref": str(self._dir_tree._gain_reference), @@ -1182,11 +1182,11 @@ def on_button_pressed(self, event: Button.Pressed) -> None: def file_copied(self, *args, **kwargs): self.query_one(ProgressBar).advance(1) if self.query_one(ProgressBar).progress == self.query_one(ProgressBar).total: - requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_processing_success_in_ispyb', session_id=self.app._environment.murfey_session)}" + capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_processing_success_in_ispyb', session_id=self.app._environment.murfey_session)}" ) - requests.delete( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.app._environment.murfey_session)}" + capture_delete( + url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.app._environment.murfey_session)}" ) self.app.exit() @@ -1215,6 +1215,6 @@ def compose(self): yield Footer() def on_mount(self, event): - requests.post( - f"{self.app._environment.url.geturl()}{url_path_for('prometheus.router', 'change_monitoring_status', visit_name=self.app._environment.visit, on=1)}" + capture_post( + url=f"{self.app._environment.url.geturl()}{url_path_for('prometheus.router', 'change_monitoring_status', visit_name=self.app._environment.visit, on=1)}" ) diff --git a/src/murfey/util/client.py b/src/murfey/util/client.py index 273536db3..68da35377 100644 --- a/src/murfey/util/client.py +++ b/src/murfey/util/client.py @@ -76,7 +76,7 @@ def authorised_requests() -> tuple[Callable, Callable, Callable, Callable]: requests.get, requests.post, requests.put, requests.delete = authorised_requests() -def capture_post(url: str, json: Union[dict, list] = {}) -> Optional[requests.Response]: +def capture_post(url: str, json: Union[dict, list] = {}) -> requests.Response: try: response = requests.post(url, json=json) except Exception as e: @@ -113,13 +113,13 @@ def capture_post(url: str, json: Union[dict, list] = {}) -> Optional[requests.Re return response -def capture_get(url: str) -> Optional[requests.Response]: +def capture_get(url: str) -> requests.Response: try: response = requests.get(url) except Exception as e: logger.error(f"Exception encountered in get from {url}: {e}") - response = None - if response and response.status_code != 200: + response = requests.Response() + if response.status_code != 200: logger.warning( f"Response to get from {url} had status code {response.status_code}. " f"The reason given was {response.reason}" @@ -127,12 +127,12 @@ def capture_get(url: str) -> Optional[requests.Response]: return response -def capture_delete(url: str) -> Optional[requests.Response]: +def capture_delete(url: str) -> requests.Response: try: response = requests.delete(url) except Exception as e: logger.error(f"Exception encountered in delete of {url}: {e}") - response = None + response = requests.Response() if response and response.status_code != 200: logger.warning( f"Response to delete of {url} had status code {response.status_code}. " From b1231c52389d1ac6c97b2c5da595f9712d0cde2a Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 15:29:10 +0100 Subject: [PATCH 03/12] Change how captures take args --- src/murfey/util/client.py | 46 +++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/src/murfey/util/client.py b/src/murfey/util/client.py index 68da35377..2deb2dde5 100644 --- a/src/murfey/util/client.py +++ b/src/murfey/util/client.py @@ -17,7 +17,6 @@ from functools import lru_cache, partial from pathlib import Path from typing import Awaitable, Callable, Optional, Union -from urllib.parse import urlparse, urlunparse import requests @@ -76,9 +75,16 @@ def authorised_requests() -> tuple[Callable, Callable, Callable, Callable]: requests.get, requests.post, requests.put, requests.delete = authorised_requests() -def capture_post(url: str, json: Union[dict, list] = {}) -> requests.Response: +def capture_post( + base_url: str, + router_name: str, + function_name: str, + data: Optional[dict] = None, + **kwargs, +) -> requests.Response: + url = f"{base_url}{url_path_for(router_name, function_name, **kwargs)}" try: - response = requests.post(url, json=json) + response = requests.post(url, json=data) except Exception as e: logger.error(f"Exception encountered in post to {url}: {e}") response = requests.Response() @@ -87,20 +93,22 @@ def capture_post(url: str, json: Union[dict, list] = {}) -> requests.Response: f"Response to post to {url} with data {json} had status code " f"{response.status_code}. The reason given was {response.reason}" ) - split_url = urlparse(url) client_config = read_config() - failure_url = urlunparse( - split_url._replace( - path=url_path_for( - "session_control.router", - "failed_client_post", - instrument_name=client_config["Murfey"]["instrument_name"], - ) - ) + failure_address = url_path_for( + "session_control.router", + "failed_client_post", + instrument_name=client_config["Murfey"]["instrument_name"], ) + failure_url = f"{base_url}{failure_address}" try: resend_response = requests.post( - failure_url, json={"url": url, "data": json} + failure_url, + json={ + "router_name": router_name, + "function_name": function_name, + "data": data, + "kwargs": kwargs, + }, ) except Exception as e: logger.error(f"Exception encountered in post to {failure_url}: {e}") @@ -113,7 +121,10 @@ def capture_post(url: str, json: Union[dict, list] = {}) -> requests.Response: return response -def capture_get(url: str) -> requests.Response: +def capture_get( + base_url: str, router_name: str, function_name: str, **kwargs +) -> requests.Response: + url = f"{base_url}{url_path_for(router_name, function_name, **kwargs)}" try: response = requests.get(url) except Exception as e: @@ -127,7 +138,10 @@ def capture_get(url: str) -> requests.Response: return response -def capture_delete(url: str) -> requests.Response: +def capture_delete( + base_url: str, router_name: str, function_name: str, **kwargs +) -> requests.Response: + url = f"{base_url}{url_path_for(router_name, function_name, **kwargs)}" try: response = requests.delete(url) except Exception as e: @@ -135,7 +149,7 @@ def capture_delete(url: str) -> requests.Response: response = requests.Response() if response and response.status_code != 200: logger.warning( - f"Response to delete of {url} had status code {response.status_code}. " + f"Response to delete on {url} had status code {response.status_code}. " f"The reason given was {response.reason}" ) return response From 814a286097ad52723bf6e69f89c80b1292d369ca Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 15:40:07 +0100 Subject: [PATCH 04/12] Update tomo posts --- src/murfey/client/contexts/tomo.py | 98 ++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 25 deletions(-) diff --git a/src/murfey/client/contexts/tomo.py b/src/murfey/client/contexts/tomo.py index 158498b84..8a7454ceb 100644 --- a/src/murfey/client/contexts/tomo.py +++ b/src/murfey/client/contexts/tomo.py @@ -15,7 +15,6 @@ MurfeyID, MurfeyInstanceEnvironment, ) -from murfey.util.api import url_path_for from murfey.util.client import capture_get, capture_post, get_machine_config_client from murfey.util.mdoc import get_block, get_global_data, get_num_blocks @@ -102,7 +101,6 @@ def register_tomography_data_collections( ) return try: - dcg_url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=environment.visit, session_id=environment.murfey_session)}" dcg_data = { "experiment_type": "tomo", "experiment_type_id": 36, @@ -110,11 +108,17 @@ def register_tomography_data_collections( "atlas": "", "sample": None, } - capture_post(dcg_url, json=dcg_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) for tilt_series in self._tilt_series.keys(): if tilt_series not in self._tilt_series_with_pjids: - dc_url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'start_dc', visit_name=environment.visit, session_id=environment.murfey_session)}" dc_data = { "experiment_type": "tomography", "file_extension": file_extension, @@ -146,13 +150,23 @@ def register_tomography_data_collections( ], } ) - capture_post(dc_url, json=dc_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="start_dc", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dc_data, + ) - proc_url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'register_proc', visit_name=environment.visit, session_id=environment.murfey_session)}" for recipe in ("em-tomo-preprocess", "em-tomo-align"): capture_post( - proc_url, - json={ + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_proc", + visit_name=environment.visit, + session_id=environment.murfey_session, + data={ "tag": tilt_series, "source": str(self._basepath), "recipe": recipe, @@ -253,13 +267,18 @@ def _add_tilt( f"Tilt series {tilt_series} was previously thought complete but now {file_path} has been seen" ) self._completed_tilt_series.remove(tilt_series) - rerun_url = f"{str(environment.url.geturl())}{url_path_for('workflow.tomo_router', 'register_tilt_series_for_rerun', visit_name=environment.visit)}" rerun_data = { "session_id": environment.murfey_session, "tag": tilt_series, "source": str(file_path.parent), } - capture_post(rerun_url, json=rerun_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tilt_series_for_rerun", + visit_name=environment.visit, + data=rerun_data, + ) if tilt_series in self._aligned_tilt_series: with self._lock: self._aligned_tilt_series.remove(tilt_series) @@ -267,13 +286,18 @@ def _add_tilt( if not self._tilt_series.get(tilt_series): logger.info(f"New tilt series found: {tilt_series}") self._tilt_series[tilt_series] = [file_path] - ts_url = f"{str(environment.url.geturl())}{url_path_for('workflow.tomo_router', 'register_tilt_series', visit_name=environment.visit)}" ts_data = { "session_id": environment.murfey_session, "tag": tilt_series, "source": str(file_path.parent), } - capture_post(ts_url, json=ts_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tilt_series", + visit_name=environment.visit, + data=ts_data, + ) if not self._tilt_series_sizes.get(tilt_series): self._tilt_series_sizes[tilt_series] = 0 @@ -296,19 +320,29 @@ def _add_tilt( self._tilt_series[tilt_series].append(file_path) if environment: - tilt_url = f"{str(environment.url.geturl())}{url_path_for('workflow.tomo_router', 'register_tilt', visit_name=environment.visit, session_id=environment.murfey_session)}" tilt_data = { "movie_path": str(file_transferred_to), "tilt_series_tag": tilt_series, "source": str(file_path.parent), } - capture_post(tilt_url, json=tilt_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tilt", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=tilt_data, + ) eer_fractionation_file = None if self.data_collection_parameters.get("num_eer_frames"): response = capture_post( - url=f"{str(environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=environment.visit, session_id=environment.murfey_session)}", - json={ + base_url=str(environment.url.geturl()), + router_name="file_io_instrument.router", + function_name="write_eer_fractionation_file", + visit_name=environment.visit, + session_id=environment.murfey_session, + data={ "num_frames": self.data_collection_parameters["num_eer_frames"], "fractionation": self.data_collection_parameters[ "eer_fractionation" @@ -318,7 +352,6 @@ def _add_tilt( }, ) eer_fractionation_file = response.json()["eer_fractionation_file"] - preproc_url = f"{str(environment.url.geturl())}{url_path_for('workflow.tomo_router', 'request_tomography_preprocessing', visit_name=environment.visit, session_id=environment.murfey_session)}" preproc_data = { "path": str(file_transferred_to), "description": "", @@ -338,7 +371,14 @@ def _add_tilt( "tag": tilt_series, "group_tag": str(self._basepath), } - capture_post(preproc_url, json=preproc_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="request_tomography_preprocessing", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=preproc_data, + ) return self._check_tilt_series(tilt_series) @@ -443,10 +483,12 @@ def post_transfer( # Always update the tilt series length in the database after an mdoc if environment.murfey_session is not None: - length_url = f"{str(environment.url.geturl())}{url_path_for('workflow.tomo_router', 'register_tilt_series_length', session_id=environment.murfey_session)}" capture_post( - length_url, - json={ + base_url=str(environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tilt_series_length", + session_id=environment.murfey_session, + data={ "tags": [tilt_series], "source": str(transferred_file.parent), "tilt_series_lengths": [ @@ -460,10 +502,13 @@ def post_transfer( f"The following tilt series are considered complete: {completed_tilts} " f"after {transferred_file}" ) - complete_url = f"{str(environment.url.geturl())}{url_path_for('workflow.tomo_router', 'register_completed_tilt_series', visit_name=environment.visit, session_id=environment.murfey_session)}" capture_post( - complete_url, - json={ + base_url=str(environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_completed_tilt_series", + visit_name=environment.visit, + session_id=environment.murfey_session, + data={ "tags": completed_tilts, "source": str(transferred_file.parent), "tilt_series_lengths": [ @@ -544,7 +589,10 @@ def gather_metadata( binning_factor = 1 if environment: server_config = capture_get( - url=f"{str(environment.url.geturl())}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" + base_url=str(environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=environment.instrument_name, ).json() if ( server_config.get("superres") From 47f03b9b83484d4b4035c563f67852958bc195d4 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 16:00:04 +0100 Subject: [PATCH 05/12] Adjust remaining contexts --- src/murfey/client/contexts/fib.py | 10 +++- src/murfey/client/contexts/spa.py | 58 ++++++++++++------ src/murfey/client/contexts/spa_metadata.py | 46 ++++++++++---- src/murfey/client/contexts/tomo_metadata.py | 66 ++++++++++++++------- 4 files changed, 128 insertions(+), 52 deletions(-) diff --git a/src/murfey/client/contexts/fib.py b/src/murfey/client/contexts/fib.py index 9b83c5e5f..3e3d4f4a5 100644 --- a/src/murfey/client/contexts/fib.py +++ b/src/murfey/client/contexts/fib.py @@ -9,7 +9,6 @@ from murfey.client.context import Context from murfey.client.instance_environment import MurfeyInstanceEnvironment -from murfey.util.api import url_path_for from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.fib") @@ -93,8 +92,13 @@ def post_transfer( ).name # post gif list to gif making API call capture_post( - url=f"{environment.url.geturl()}{url_path_for('workflow.correlative_router', 'make_gif', year=datetime.now().year, visit_name=environment.visit, session_id=environment.murfey_session)}", - json={ + base_url=str(environment.url.geturl()), + router_name="workflow.correlative_router", + function_name="make_gif", + year=datetime.now().year, + visit_name=environment.visit, + session_id=environment.murfey_session, + data={ "lamella_number": lamella_number, "images": gif_list, "raw_directory": raw_directory, diff --git a/src/murfey/client/contexts/spa.py b/src/murfey/client/contexts/spa.py index 928acd423..7b5d52ea4 100644 --- a/src/murfey/client/contexts/spa.py +++ b/src/murfey/client/contexts/spa.py @@ -13,7 +13,6 @@ MurfeyID, MurfeyInstanceEnvironment, ) -from murfey.util.api import url_path_for from murfey.util.client import capture_get, capture_post, get_machine_config_client from murfey.util.spa_metadata import ( foil_hole_data, @@ -227,7 +226,10 @@ def gather_metadata( binning_factor = 1 if environment: server_config_response = capture_get( - f"{str(environment.url.geturl())}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" + base_url=str(environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=environment.instrument_name, ) if server_config_response is None: return None @@ -297,7 +299,10 @@ def _position_analysis( ] = (None, None, None, None, None, None, None) data_collection_group = ( capture_get( - f"{environment.url.geturl()}{url_path_for('session_info.router', 'get_dc_groups', session_id=environment.murfey_session)}" + base_url=str(environment.url.geturl()), + router_name="session_info.router", + function_name="get_dc_groups", + session_id=environment.murfey_session, ) .json() .get(str(source), {}) @@ -319,7 +324,6 @@ def _position_analysis( local_atlas_path, grid_square=str(grid_square), )[str(grid_square)] - gs_url = f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'register_grid_square', session_id=environment.murfey_session, gsid=grid_square)}" gs = grid_square_data( grid_square_metadata_file, grid_square, @@ -340,8 +344,12 @@ def _position_analysis( else "" ) capture_post( - gs_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="register_grid_square", + session_id=environment.murfey_session, + gsid=grid_square, + data={ "tag": str(source), "readout_area_x": gs.readout_area_x, "readout_area_y": gs.readout_area_y, @@ -360,7 +368,6 @@ def _position_analysis( ) foil_hole = foil_hole_from_file(transferred_file) if foil_hole not in self._foil_holes[grid_square]: - fh_url = f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'register_foil_hole', session_id=environment.murfey_session, gs_name=grid_square)}" if environment.murfey_session is not None: fh = foil_hole_data( grid_square_metadata_file, @@ -383,8 +390,12 @@ def _position_analysis( else "" ) capture_post( - fh_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="register_foil_hole", + session_id=environment.murfey_session, + gs_name=grid_square, + data={ "name": foil_hole, "x_location": fh.x_location, "y_location": fh.y_location, @@ -402,8 +413,12 @@ def _position_analysis( ) else: capture_post( - fh_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="register_foil_hole", + session_id=environment.murfey_session, + gs_name=grid_square, + data={ "name": foil_hole, "tag": str(source), }, @@ -459,7 +474,9 @@ def post_transfer( ) if not environment.movie_counters.get(str(source)): movie_counts_get = capture_get( - f"{environment.url.geturl()}{url_path_for('session_control.router', 'count_number_of_movies')}", + base_url=str(environment.url.geturl()), + router_name="session_control.router", + function_name="count_number_of_movies", ) if movie_counts_get is not None: environment.movie_counters[str(source)] = count( @@ -473,8 +490,12 @@ def post_transfer( eer_fractionation_file = None if file_transferred_to.suffix == ".eer": response = capture_post( - f"{str(environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=environment.visit, session_id=environment.murfey_session)}", - json={ + base_url=str(environment.url.geturl()), + router_name="file_io_instrument.router", + function_name="write_eer_fractionation_file", + visit_name=environment.visit, + session_id=environment.murfey_session, + data={ "eer_path": str(file_transferred_to), "fractionation": self.data_collection_parameters[ "eer_fractionation" @@ -503,7 +524,6 @@ def post_transfer( ) foil_hole = None - preproc_url = f"{str(environment.url.geturl())}{url_path_for('workflow.spa_router', 'request_spa_preprocessing', visit_name=environment.visit, session_id=environment.murfey_session)}" preproc_data = { "path": str(file_transferred_to), "description": "", @@ -529,8 +549,12 @@ def post_transfer( "foil_hole_id": foil_hole, } capture_post( - preproc_url, - json={ + base_url=str(environment.url.geturl()), + router_name="workflow.spa_router", + function_name="request_spa_preprocessing", + visit_name=environment.visit, + session_id=environment.murfey_session, + data={ k: None if v == "None" else v for k, v in preproc_data.items() }, diff --git a/src/murfey/client/contexts/spa_metadata.py b/src/murfey/client/contexts/spa_metadata.py index a2062d0f3..6260bb202 100644 --- a/src/murfey/client/contexts/spa_metadata.py +++ b/src/murfey/client/contexts/spa_metadata.py @@ -7,7 +7,6 @@ from murfey.client.context import Context from murfey.client.contexts.spa import _file_transferred_to, _get_source from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo -from murfey.util.api import url_path_for from murfey.util.client import capture_post, get_machine_config_client from murfey.util.spa_metadata import ( FoilHoleInfo, @@ -160,7 +159,6 @@ def post_transfer( environment.samples[source] = SampleInfo( atlas=Path(partial_path), sample=sample ) - url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=environment.visit, session_id=environment.murfey_session)}" dcg_search_dir = "/".join( p for p in transferred_file.parent.parts if p != environment.visit ) @@ -189,15 +187,26 @@ def post_transfer( "sample": environment.samples[source].sample, "atlas_pixel_size": atlas_pixel_size, } - capture_post(url, json=dcg_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) gs_pix_positions = get_grid_square_atlas_positions( source_visit_dir / partial_path ) for gs, pos_data in gs_pix_positions.items(): if pos_data: capture_post( - f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'register_grid_square', session_id=environment.murfey_session, gsid=int(gs))}", - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="register_grid_square", + session_id=environment.murfey_session, + gsid=int(gs), + data={ "tag": dcg_tag, "x_location": pos_data[0], "y_location": pos_data[1], @@ -215,7 +224,6 @@ def post_transfer( and environment ): # Make sure we have a data collection group before trying to register grid square - url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=environment.visit, session_id=environment.murfey_session)}" dcg_search_dir = "/".join( p for p in transferred_file.parent.parent.parts @@ -239,7 +247,14 @@ def post_transfer( "experiment_type_id": 37, "tag": dcg_tag, } - capture_post(url, json=dcg_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) gs_name = int(transferred_file.stem.split("_")[1]) logger.info( @@ -264,7 +279,6 @@ def post_transfer( visitless_source = str(visitless_source_images_dirs[-1]) if fh_positions: - gs_url = f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'register_grid_square', session_id=environment.murfey_session, gsid=gs_name)}" gs_info = grid_square_data( transferred_file, gs_name, @@ -275,8 +289,12 @@ def post_transfer( else "" ) capture_post( - gs_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="register_grid_square", + session_id=environment.murfey_session, + gsid=gs_name, + data={ "tag": visitless_source, "readout_area_x": gs_info.readout_area_x, "readout_area_y": gs_info.readout_area_y, @@ -289,8 +307,12 @@ def post_transfer( for fh, fh_data in fh_positions.items(): capture_post( - f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'register_foil_hole', session_id=environment.murfey_session, gs_name=gs_name)}", - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="register_foil_hole", + session_id=environment.murfey_session, + gs_name=gs_name, + data={ "name": fh, "x_location": fh_data.x_location, "y_location": fh_data.y_location, diff --git a/src/murfey/client/contexts/tomo_metadata.py b/src/murfey/client/contexts/tomo_metadata.py index 2ffb6b619..91227e528 100644 --- a/src/murfey/client/contexts/tomo_metadata.py +++ b/src/murfey/client/contexts/tomo_metadata.py @@ -8,7 +8,6 @@ from murfey.client.contexts.spa import _file_transferred_to, _get_source from murfey.client.contexts.spa_metadata import _atlas_destination from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo -from murfey.util.api import url_path_for from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.tomo_metadata") @@ -20,13 +19,19 @@ def ensure_dcg_exists(transferred_file: Path, environment: MurfeyInstanceEnviron if not source: return None dcg_tag = str(source).replace(f"/{environment.visit}", "") - url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=environment.visit, session_id=environment.murfey_session)}" dcg_data = { "experiment_type": "tomo", "experiment_type_id": 36, "tag": dcg_tag, } - capture_post(url, json=dcg_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) return dcg_tag @@ -95,7 +100,6 @@ def post_transfer( environment.samples[source] = SampleInfo( atlas=Path(partial_path), sample=sample ) - url = f"{str(environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=environment.visit, session_id=environment.murfey_session)}" dcg_tag = "/".join( p for p in transferred_file.parent.parts if p != environment.visit ).replace("//", "/") @@ -111,7 +115,14 @@ def post_transfer( "sample": environment.samples[source].sample, "atlas_pixel_size": atlas_pixel_size, } - capture_post(url, json=dcg_data) + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) elif transferred_file.name == "SearchMap.xml" and environment: logger.info("Tomography session search map xml found") @@ -192,10 +203,13 @@ def post_transfer( else "" ) - sm_url = f"{str(environment.url.geturl())}{url_path_for('session_control.tomo_router', 'register_search_map', session_id=environment.murfey_session, sm_name=transferred_file.parent.name)}" capture_post( - sm_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.tomo_router", + function_name="register_search_map", + session_id=environment.murfey_session, + sm_name=transferred_file.parent.name, + data={ "tag": dcg_tag, "x_stage_position": float(stage_position["X"]), "y_stage_position": float(stage_position["Y"]), @@ -240,10 +254,13 @@ def post_transfer( f"Inserting incorrect width {sm_width}, height {sm_height} for SearchMap display" ) - sm_url = f"{str(environment.url.geturl())}{url_path_for('session_control.tomo_router', 'register_search_map', session_id=environment.murfey_session, sm_name=transferred_file.parent.name)}" capture_post( - sm_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.tomo_router", + function_name="register_search_map", + session_id=environment.murfey_session, + sm_name=transferred_file.parent.name, + data={ "tag": dcg_tag, "height": sm_height, "width": sm_width, @@ -278,19 +295,25 @@ def post_transfer( ) # Always need search map before batch position - sm_url = f"{str(environment.url.geturl())}{url_path_for('session_control.tomo_router', 'register_search_map', session_id=environment.murfey_session, sm_name=search_map_name)}" capture_post( - sm_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.tomo_router", + function_name="register_search_map", + session_id=environment.murfey_session, + sm_name=search_map_name, + data={ "tag": dcg_tag, }, ) # Then register batch position - bp_url = f"{str(environment.url.geturl())}{url_path_for('session_control.tomo_router', 'register_batch_position', session_id=environment.murfey_session, batch_name=batch_name)}" capture_post( - bp_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.tomo_router", + function_name="register_batch_position", + session_id=environment.murfey_session, + batch_name=batch_name, + data={ "tag": dcg_tag, "x_stage_position": batch_stage_location_x, "y_stage_position": batch_stage_location_y, @@ -313,10 +336,13 @@ def post_transfer( beamshift_position_y = float(beamshift["PositionY"]) # Registration of beamshifted position - bp_url = f"{str(environment.url.geturl())}{url_path_for('session_control.tomo_router', 'register_batch_position', session_id=environment.murfey_session, batch_name=beamshift_name)}" capture_post( - bp_url, - json={ + base_url=str(environment.url.geturl()), + router_name="session_control.tomo_router", + function_name="register_batch_position", + session_id=environment.murfey_session, + batch_name=beamshift_name, + data={ "tag": dcg_tag, "x_stage_position": batch_stage_location_x, "y_stage_position": batch_stage_location_y, From 25ac8d012010d68f7ed697cb874f6bc612f7c9d4 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 16:08:35 +0100 Subject: [PATCH 06/12] More contexts --- src/murfey/client/contexts/atlas.py | 8 ++-- src/murfey/client/contexts/clem.py | 74 +++++++++++------------------ 2 files changed, 33 insertions(+), 49 deletions(-) diff --git a/src/murfey/client/contexts/atlas.py b/src/murfey/client/contexts/atlas.py index 7f98fe312..e0a85d3c9 100644 --- a/src/murfey/client/contexts/atlas.py +++ b/src/murfey/client/contexts/atlas.py @@ -6,7 +6,6 @@ from murfey.client.contexts.spa import _get_source from murfey.client.contexts.spa_metadata import _atlas_destination from murfey.client.instance_environment import MurfeyInstanceEnvironment -from murfey.util.api import url_path_for from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.atlas") @@ -40,8 +39,11 @@ def post_transfer( environment, source, transferred_file ) / transferred_file.relative_to(source.parent) capture_post( - f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'make_atlas_jpg', session_id=environment.murfey_session)}", - json={"path": str(transferred_atlas_name)}, + base_url=str(environment.url.geturl()), + router_name="session_control.spa_router", + function_name="make_atlas_jpg", + session_id=environment.murfey_session, + data={"path": str(transferred_atlas_name)}, ) logger.info( f"Submitted request to create JPG image of atlas {str(transferred_atlas_name)!r}" diff --git a/src/murfey/client/contexts/clem.py b/src/murfey/client/contexts/clem.py index fd193eac0..7a634c09e 100644 --- a/src/murfey/client/contexts/clem.py +++ b/src/murfey/client/contexts/clem.py @@ -13,7 +13,6 @@ from murfey.client.context import Context from murfey.client.instance_environment import MurfeyInstanceEnvironment -from murfey.util.api import url_path_for from murfey.util.client import capture_post, get_machine_config_client # Create logger object @@ -353,19 +352,14 @@ def register_lif_file( register the LIF file in the database correctly as part of the CLEM workflow. """ try: - # Construct URL to post to post the request to - url = f"{environment.url.geturl()}{url_path_for('clem.router', 'register_lif_file', session_id=environment.murfey_session)}?lif_file={quote(str(lif_file), safe='')}" - # Validate - if not url: - logger.error( - "URL could not be constructed from the environment and file path" - ) - return False - - # Send the message - capture_post(url) + capture_post( + base_url=str(environment.url.geturl()), + router_name="clem.router", + function_name="register_lif_file", + session_id=environment.murfey_session, + data={"lif_file": quote(str(lif_file), safe="")}, + ) return True - except Exception as e: logger.error( f"Error encountered when registering the LIF file in the database: {e}" @@ -383,19 +377,14 @@ def process_lif_file( """ try: - # Construct the URL to post the request to - url = f"{environment.url.geturl()}{url_path_for('clem.router', 'process_raw_lifs', session_id=environment.murfey_session)}?lif_file={quote(str(lif_file), safe='')}" - # Validate - if not url: - logger.error( - "URL could not be constructed from the environment and file path" - ) - return False - - # Send the message - capture_post(url) + capture_post( + base_url=str(environment.url.geturl()), + router_name="clem.router", + function_name="process_raw_lifs", + session_id=environment.murfey_session, + data={"lif_file": quote(str(lif_file), safe="")}, + ) return True - except Exception as e: logger.error(f"Error encountered processing LIF file: {e}") return False @@ -411,17 +400,14 @@ def register_tiff_file( """ try: - url = f"{environment.url.geturl()}{url_path_for('clem.router', 'register_tiff_file', session_id=environment.murfey_session)}?tiff_file={quote(str(tiff_file), safe='')}" - if not url: - logger.error( - "URL could not be constructed from the environment and file path" - ) - return False - - # Send the message - capture_post(url) + capture_post( + base_url=str(environment.url.geturl()), + router_name="clem.router", + function_name="register_tiff_file", + session_id=environment.murfey_session, + data={"tiff_file": quote(str(tiff_file), safe="")}, + ) return True - except Exception as e: logger.error( f"Error encountered when registering the TIFF file in the database: {e}" @@ -439,18 +425,14 @@ def process_tiff_series( """ try: - # Construct URL for Murfey server to communicate with - url = f"{environment.url.geturl()}{url_path_for('clem.router', 'process_raw_tiffs', session_id=environment.murfey_session)}" - if not url: - logger.error( - "URL could not be constructed from the environment and file path" - ) - return False - - # Send the message - capture_post(url, json=tiff_dataset) + capture_post( + base_url=str(environment.url.geturl()), + router_name="clem.router", + function_name="process_raw_tiffs", + session_id=environment.murfey_session, + data=tiff_dataset, + ) return True - except Exception as e: logger.error(f"Error encountered processing the TIFF series: {e}") return False From 1cf689e2b3f307faf162af6e43d1e127c0ad7570 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 16:36:49 +0100 Subject: [PATCH 07/12] Sort out multigrid control --- src/murfey/client/multigrid_control.py | 175 +++++++++++++++++------ src/murfey/server/api/session_control.py | 8 +- 2 files changed, 136 insertions(+), 47 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index bd93a1223..2b57c26f3 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -8,7 +8,7 @@ from functools import partial from pathlib import Path from typing import Dict, List, Optional -from urllib.parse import quote, urlparse +from urllib.parse import urlparse import murfey.client.websocket from murfey.client.analyser import Analyser @@ -19,7 +19,6 @@ from murfey.client.tui.screens import determine_default_destination from murfey.client.watchdir import DirWatcher from murfey.util import posix_path -from murfey.util.api import url_path_for from murfey.util.client import ( capture_delete, capture_get, @@ -57,7 +56,10 @@ class MultigridController: def __post_init__(self): machine_data = capture_get( - url=f"{self.murfey_url}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}" + base_url=self.murfey_url, + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=self.instrument_name, ).json() self.rsync_url = machine_data.get("rsync_url", "") self.rsync_module = machine_data.get("rsync_module", "data") @@ -100,7 +102,9 @@ def __post_init__(self): # Calculate the time offset between the client and the server current_time = datetime.now() server_timestamp = capture_get( - url=f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}" + base_url=self.murfey_url, + router_name="session_control.router", + function_name="get_current_timestamp", ).json()["timestamp"] self.server_time_offset = current_time - datetime.fromtimestamp( server_timestamp @@ -157,7 +161,10 @@ def clean_up_once_dormant(self, running_threads: list[threading.Thread]): f"Submitting request to remove session {self.session_id} from database" ) response = capture_delete( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}", + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="remove_session", + session_id=self.session_id, ) success = response.status_code == 200 if response else False if not success: @@ -229,7 +236,10 @@ def _start_rsyncer_multigrid( log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}") destination_overrides = destination_overrides or {} machine_data = capture_get( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=self.instrument_name, ).json() if destination_overrides.get(source): destination = ( @@ -281,11 +291,21 @@ def _start_rsyncer_multigrid( def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False): if explicit_stop: - remove_url = f"{self.murfey_url}{url_path_for('session_control.router', 'delete_rsyncer', session_id=self.session_id)}?source={quote(str(source), safe='')}" - capture_delete(url=remove_url) + capture_delete( + base_url=self.murfey_url, + router_name="session_control.router", + function_name="delete_rsyncer", + session_id=self.session_id, + data={"path": str(source)}, + ) else: - stop_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_stopped_rsyncer', session_id=self.session_id)}" - capture_post(stop_url, json={"path": str(source)}) + capture_post( + base_url=self.murfey_url, + router_name="session_control.router", + function_name="register_stopped_rsyncer", + session_id=self.session_id, + data={"path": str(source)}, + ) def _finalise_rsyncer(self, source: Path): """ @@ -304,8 +324,13 @@ def _finalise_rsyncer(self, source: Path): def _restart_rsyncer(self, source: Path): self.rsync_processes[source].restart() - restarted_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_restarted_rsyncer', session_id=self.session_id)}" - capture_post(restarted_url, json={"path": str(source)}) + capture_post( + base_url=self.murfey_url, + router_name="session_control.router", + function_name="register_restarted_rsyncer", + session_id=self.session_id, + data={"path": str(source)}, + ) def _request_watcher_stop(self, source: Path): self._environment.watchers[source]._stopping = True @@ -327,8 +352,13 @@ def _start_rsyncer( log.info(f"starting rsyncer: {source}") if transfer: # Always make sure the destination directory exists - make_directory_url = f"{self.murfey_url}{url_path_for('file_io_instrument.router', 'make_rsyncer_destination', session_id=self.session_id)}" - capture_post(make_directory_url, json={"destination": destination}) + capture_post( + base_url=self.murfey_url, + router_name="file_io_instrument.router", + function_name="make_rsyncer_destination", + session_id=self.session_id, + data={"destination": destination}, + ) if self._environment: self._environment.default_destinations[source] = destination if self._environment.gain_ref and visit_path: @@ -397,10 +427,14 @@ def rsync_result(update: RSyncerUpdate): secondary=True, ) if restarted: - restarted_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_restarted_rsyncer', session_id=self.session_id)}" - capture_post(restarted_url, json={"path": str(source)}) + capture_post( + base_url=self.murfey_url, + router_name="session_control.router", + function_name="register_restarted_rsyncer", + session_id=self.session_id, + data={"path": str(source)}, + ) else: - url = f"{str(self._environment.url.geturl())}{url_path_for('session_control.router', 'register_rsyncer', session_id=self._environment.murfey_session)}" rsyncer_data = { "source": str(source), "destination": destination, @@ -408,7 +442,13 @@ def rsync_result(update: RSyncerUpdate): "transferring": self.do_transfer or self._environment.demo, "tag": tag, } - capture_post(url=url, json=rsyncer_data) + capture_post( + base_url=self.murfey_url, + router_name="session_control.router", + function_name="register_rsyncer", + session_id=self._environment.murfey_session, + data=rsyncer_data, + ) self._environment.watchers[source] = DirWatcher(source, settling_time=30) if not self.analysers.get(source) and analyse: @@ -513,8 +553,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): log.info("Registering tomography processing parameters") if context.data_collection_parameters.get("num_eer_frames"): eer_response = capture_post( - url=f"{str(self._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self._environment.visit, session_id=self._environment.murfey_session)}", - json={ + base_url=str(self._environment.url.geturl()), + router_name="file_io_instrument.router", + function_name="write_eer_fractionation_file", + visit_name=self._environment.visit, + session_id=self._environment.murfey_session, + data={ "num_frames": context.data_collection_parameters[ "num_eer_frames" ], @@ -526,17 +570,23 @@ def _start_dc(self, metadata_json, from_form: bool = False): eer_fractionation_file = eer_response.json()["eer_fractionation_file"] metadata_json.update({"eer_fractionation_file": eer_fractionation_file}) capture_post( - f"{self._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self._environment.murfey_session)}", - json=metadata_json, + base_url=str(self._environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tomo_proc_params", + session_id=self._environment.murfey_session, + data=metadata_json, ) capture_post( - f"{self._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'flush_tomography_processing', visit_name=self._environment.visit, session_id=self._environment.murfey_session)}", - json={"rsync_source": str(source)}, + base_url=str(self._environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="flush_tomography_processing", + visit_name=self._environment.visit, + session_id=self._environment.murfey_session, + data={"rsync_source": str(source)}, ) log.info("Tomography processing flushed") elif isinstance(context, SPAModularContext): - url = f"{str(self._environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=self._environment.visit, session_id=self.session_id)}" dcg_data = { "experiment_type": "single particle", "experiment_type_id": 37, @@ -552,7 +602,14 @@ def _start_dc(self, metadata_json, from_form: bool = False): else None ), } - capture_post(url, json=dcg_data) + capture_post( + base_url=str(self._environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=self._environment.visit, + session_id=self.session_id, + data=dcg_data, + ) if from_form: data = { "voltage": metadata_json["voltage"], @@ -575,8 +632,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): "phase_plate": metadata_json.get("phase_plate", False), } capture_post( - f"{str(self._environment.url.geturl())}{url_path_for('workflow.router', 'start_dc', visit_name=self._environment.visit, session_id=self.session_id)}", - json=data, + base_url=str(self._environment.url.geturl()), + router_name="workflow.router", + function_name="start_dc", + visit_name=self._environment.visit, + session_id=self.session_id, + data=data, ) for recipe in ( "em-spa-preprocess", @@ -586,8 +647,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): "em-spa-refine", ): capture_post( - f"{str(self._environment.url.geturl())}{url_path_for('workflow.router', 'register_proc', visit_name=self._environment.visit, session_id=self.session_id)}", - json={ + base_url=str(self._environment.url.geturl()), + router_name="workflow.router", + function_name="register_proc", + visit_name=self._environment.visit, + session_id=self.session_id, + data={ "tag": str(source), "source": str(source), "recipe": recipe, @@ -595,8 +660,11 @@ def _start_dc(self, metadata_json, from_form: bool = False): ) log.info(f"Posting SPA processing parameters: {metadata_json}") response = capture_post( - f"{self._environment.url.geturl()}{url_path_for('workflow.spa_router', 'register_spa_proc_params', session_id=self.session_id)}", - json={ + base_url=str(self._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="register_spa_proc_params", + session_id=self.session_id, + data={ **{ k: None if v == "None" else v for k, v in metadata_json.items() @@ -607,14 +675,17 @@ def _start_dc(self, metadata_json, from_form: bool = False): if response and not str(response.status_code).startswith("2"): log.warning(f"{response.reason}") capture_post( - f"{self._environment.url.geturl()}{url_path_for('workflow.spa_router', 'flush_spa_processing', visit_name=self._environment.visit, session_id=self.session_id)}", - json={"tag": str(source)}, + base_url=str(self._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="flush_spa_processing", + visit_name=self._environment.visit, + session_id=self.session_id, + data={"tag": str(source)}, ) def _increment_file_count( self, observed_files: List[Path], source: str, destination: str ): - url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_file_count', visit_name=self._environment.visit)}" num_data_files = len( [ f @@ -630,7 +701,13 @@ def _increment_file_count( "increment_count": len(observed_files), "increment_data_count": num_data_files, } - capture_post(url=url, json=data) + capture_post( + base_url=str(self._environment.url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_file_count", + visit_name=self._environment.visit, + data=data, + ) # Prometheus can handle higher traffic so update for every transferred file rather # than batching as we do for the Murfey database updates in _increment_transferred_files @@ -638,7 +715,6 @@ def _increment_transferred_files_prometheus( self, update: RSyncerUpdate, source: str, destination: str ): if update.outcome is TransferResult.SUCCESS: - url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_transferred_files_prometheus', visit_name=self._environment.visit)}" data_files = ( [update] if update.file_path.suffix in self._data_suffixes @@ -657,7 +733,13 @@ def _increment_transferred_files_prometheus( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - capture_post(url=url, json=data) + capture_post( + base_url=str(self._environment.url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_transferred_files_prometheus", + visit_name=self._environment.visit, + data=data, + ) def _increment_transferred_files( self, @@ -666,10 +748,12 @@ def _increment_transferred_files( source: str, destination: str, ): - skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}" capture_post( - url=skip_url, - json={ + base_url=str(self._environment.url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_skipped_files_prometheus", + visit_name=self._environment.visit, + data={ "source": source, "session_id": self.session_id, "increment_count": num_skipped_files, @@ -681,7 +765,6 @@ def _increment_transferred_files( ] if not checked_updates: return - url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_transferred_files', visit_name=self._environment.visit)}" data_files = [ u for u in updates @@ -699,4 +782,10 @@ def _increment_transferred_files( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - capture_post(url=url, json=data) + capture_post( + base_url=str(self._environment.url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_transferred_files", + visit_name=self._environment.visit, + data=data, + ) diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index f5eec9e14..d9df164ee 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -285,19 +285,19 @@ def register_restarted_rsyncer( @router.delete("/sessions/{session_id}/rsyncer") -def delete_rsyncer(session_id: int, source: Path, db=murfey_db): +def delete_rsyncer(session_id: int, rsyncer_source: StringOfPathModel, db=murfey_db): try: rsync_instance = db.exec( select(RsyncInstance) .where(RsyncInstance.session_id == session_id) - .where(RsyncInstance.source == str(source)) + .where(RsyncInstance.source == str(rsyncer_source.path)) ).one() db.delete(rsync_instance) db.commit() except Exception: logger.error( - f"Failed to delete rsyncer for source directory {sanitise(str(source))!r} " - f"in session {session_id}.", + "Failed to delete rsyncer for source directory " + f"{sanitise(str(rsyncer_source.path))!r} in session {session_id}.", exc_info=True, ) From f87fac4623c6d6594f0f37fc74c1e963179c5064 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 21 Aug 2025 17:11:21 +0100 Subject: [PATCH 08/12] Post updates for tui code --- src/murfey/client/tui/app.py | 152 +++++++++++++++++++++++-------- src/murfey/client/tui/main.py | 19 ++-- src/murfey/client/tui/screens.py | 139 +++++++++++++++++++++------- 3 files changed, 236 insertions(+), 74 deletions(-) diff --git a/src/murfey/client/tui/app.py b/src/murfey/client/tui/app.py index 6dca90c4f..e06d12c21 100644 --- a/src/murfey/client/tui/app.py +++ b/src/murfey/client/tui/app.py @@ -33,7 +33,6 @@ from murfey.client.watchdir import DirWatcher from murfey.client.watchdir_multigrid import MultigridDirWatcher from murfey.util import posix_path -from murfey.util.api import url_path_for from murfey.util.client import ( capture_delete, capture_get, @@ -153,7 +152,10 @@ def _start_rsyncer_multigrid( log.info(f"starting multigrid rsyncer: {source}") destination_overrides = destination_overrides or {} machine_data = capture_get( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=instrument_name, ).json() if destination_overrides.get(source): destination = destination_overrides[source] + f"/{extra_directory}" @@ -205,8 +207,13 @@ def _start_rsyncer( log.info(f"starting rsyncer: {source}") if transfer: # Always make sure the destination directory exists - make_directory_url = f"{str(self._url.geturl())}{url_path_for('file_io_instrument.router', 'make_rsyncer_destination', session_id=self._environment.murfey_session)}" - capture_post(make_directory_url, json={"destination": destination}) + capture_post( + base_url=str(self._url.geturl()), + router_name="file_io_instrument.router", + function_name="make_rsyncer_destination", + session_id=self._environment.murfey_session, + data={"destination": destination}, + ) if self._environment: self._environment.default_destinations[source] = destination if self._environment.gain_ref and visit_path: @@ -271,14 +278,19 @@ def rsync_result(update: RSyncerUpdate): ), secondary=True, ) - url = f"{str(self._url.geturl())}{url_path_for('session_control.router', 'register_rsyncer', session_id=self._environment.murfey_session)}" rsyncer_data = { "source": str(source), "destination": destination, "session_id": self._environment.murfey_session, "transferring": self._do_transfer, } - capture_post(url=url, json=rsyncer_data) + capture_post( + base_url=str(self._url.geturl()), + router_name="session_control.router", + function_name="register_rsyncer", + session_id=self._environment.murfey_session, + data=rsyncer_data, + ) self._environment.watchers[source] = DirWatcher(source, settling_time=30) @@ -341,7 +353,6 @@ def _increment_file_count( self, observed_files: List[Path], source: str, destination: str ): if len(observed_files): - url = f"{str(self._url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_file_count', visit_name=self._visit)}" num_data_files = len( [ f @@ -357,7 +368,13 @@ def _increment_file_count( "increment_count": len(observed_files), "increment_data_count": num_data_files, } - capture_post(url=url, json=data) + capture_post( + base_url=str(self._url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_file_count", + visit_name=self._visit, + data=data, + ) # Prometheus can handle higher traffic so update for every transferred file rather # than batching as we do for the Murfey database updates in _increment_transferred_files @@ -365,7 +382,6 @@ def _increment_transferred_files_prometheus( self, update: RSyncerUpdate, source: str, destination: str ): if update.outcome is TransferResult.SUCCESS: - url = f"{str(self._url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_transferred_files_prometheus', visit_name=self._visit)}" data_files = ( [update] if update.file_path.suffix in self._data_suffixes @@ -384,7 +400,13 @@ def _increment_transferred_files_prometheus( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - capture_post(url=url, json=data) + capture_post( + base_url=str(self._url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_transferred_files_prometheus", + visit_name=self._visit, + data=data, + ) def _increment_transferred_files( self, updates: List[RSyncerUpdate], source: str, destination: str @@ -394,7 +416,6 @@ def _increment_transferred_files( ] if not checked_updates: return - url = f"{str(self._url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_transferred_files', visit_name=self._visit)}" data_files = [ u for u in updates @@ -412,7 +433,13 @@ def _increment_transferred_files( "increment_data_count": len(data_files), "data_bytes": sum(f.file_size for f in data_files), } - capture_post(url=url, json=data) + capture_post( + base_url=str(self._url.geturl()), + router_name="prometheus.router", + function_name="increment_rsync_transferred_files", + visit_name=self._visit, + data=data, + ) def _set_register_dc(self, response: str): if response == "y": @@ -488,8 +515,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): log.info("Registering tomography processing parameters") if context.data_collection_parameters.get("num_eer_frames"): eer_response = capture_post( - url=f"{str(self.app._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", - json={ + base_url=str(self.app._environment.url.geturl()), + router_name="file_io_instrument.router", + function_name="write_eer_fractionation_file", + visit_name=self.app._environment.visit, + session_id=self.app._environment.murfey_session, + data={ "num_frames": context.data_collection_parameters[ "num_eer_frames" ], @@ -501,16 +532,22 @@ def _start_dc(self, metadata_json, from_form: bool = False): eer_fractionation_file = eer_response.json()["eer_fractionation_file"] metadata_json.update({"eer_fractionation_file": eer_fractionation_file}) capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self.app._environment.murfey_session)}", - json=metadata_json, + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tomo_proc_params", + session_id=self.app._environment.murfey_session, + data=metadata_json, ) capture_post( - f"{self.app._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'flush_tomography_processing', visit_name=self._visit, session_id=self.app._environment.murfey_session)}", - json={"rsync_source": str(source)}, + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="flush_tomography_processing", + visit_name=self._visit, + session_id=self.app._environment.murfey_session, + data={"rsync_source": str(source)}, ) log.info("Tomography processing flushed") elif isinstance(context, SPAModularContext): - url = f"{str(self._url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=self._visit, session_id=self._environment.murfey_session)}" dcg_data = { "experiment_type": "single particle", "experiment_type_id": 37, @@ -526,7 +563,14 @@ def _start_dc(self, metadata_json, from_form: bool = False): else None ), } - capture_post(url, json=dcg_data) + capture_post( + base_url=str(self._url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + visit_name=self._visit, + session_id=self._environment.murfey_session, + data=dcg_data, + ) if from_form: data = { "voltage": metadata_json["voltage"], @@ -549,8 +593,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): "phase_plate": metadata_json.get("phase_plate", False), } capture_post( - f"{str(self._url.geturl())}{url_path_for('workflow.router', 'start_dc', visit_name=self._visit, session_id=self._environment.murfey_session)}", - json=data, + base_url=str(self._url.geturl()), + router_name="workflow.router", + function_name="start_dc", + visit_name=self._visit, + session_id=self._environment.murfey_session, + data=data, ) for recipe in ( "em-spa-preprocess", @@ -560,8 +608,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): "em-spa-refine", ): capture_post( - f"{str(self._url.geturl())}{url_path_for('workflow.router', 'register_proc', visit_name=self._visit, session_id=self._environment.murfey_session)}", - json={ + base_url=str(self._url.geturl()), + router_name="workflow.router", + function_name="register_proc", + visit_name=self._visit, + session_id=self._environment.murfey_session, + data={ "tag": str(source), "source": str(source), "recipe": recipe, @@ -569,8 +621,11 @@ def _start_dc(self, metadata_json, from_form: bool = False): ) log.info(f"Posting SPA processing parameters: {metadata_json}") response = capture_post( - f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'register_spa_proc_params', session_id=self.app._environment.murfey_session)}", - json={ + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="register_spa_proc_params", + session_id=self.app._environment.murfey_session, + data={ **{ k: None if v == "None" else v for k, v in metadata_json.items() @@ -586,8 +641,12 @@ def _start_dc(self, metadata_json, from_form: bool = False): if not str(response.status_code).startswith("2"): log.warning(f"{response.reason}") capture_post( - f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'flush_spa_processing', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", - json={"tag": str(source)}, + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="flush_spa_processing", + visit_name=self.app._environment.visit, + session_id=self.app._environment.murfey_session, + data={"tag": str(source)}, ) def _set_request_destination(self, response: str): @@ -618,7 +677,9 @@ async def on_button_pressed(self, event: Button.Pressed): async def on_mount(self) -> None: exisiting_sessions = capture_get( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'get_sessions')}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="get_sessions", ).json() if self.visits: self.install_screen(VisitSelection(self.visits), "visit-select-screen") @@ -645,8 +706,12 @@ async def on_mount(self) -> None: else: session_name = "Client connection" resp = capture_post( - f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'link_client_to_session', instrument_name=self._environment.instrument_name, client_id=self._environment.client_id)}", - json={"session_id": None, "session_name": session_name}, + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="link_client_to_session", + instrument_name=self._environment.instrument_name, + client_id=self._environment.client_id, + data={"session_id": None, "session_name": session_name}, ) if resp: self._environment.murfey_session = resp.json() @@ -664,7 +729,10 @@ async def reset(self): sources = "\n".join(str(k) for k in self.rsync_processes.keys()) prompt = f"Remove files from the following:\n {sources} \n" rsync_instances = capture_get( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'get_rsyncers_for_session', session_id=self._environment.murfey_session)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="get_rsyncers_for_session", + session_id=self._environment.murfey_session, ).json() prompt += f"Copied {sum(r['files_counted'] for r in rsync_instances)} / {sum(r['files_transferred'] for r in rsync_instances)}" self.install_screen( @@ -688,7 +756,10 @@ async def action_quit(self) -> None: async def action_remove_session(self) -> None: capture_delete( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="remove_session", + session_id=self._environment.murfey_session, ) if self.rsync_processes: for rp in self.rsync_processes.values(): @@ -702,7 +773,10 @@ async def action_remove_session(self) -> None: def clean_up_quit(self) -> None: capture_delete( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="remove_session", + session_id=self._environment.murfey_session, ) self.exit() @@ -745,10 +819,16 @@ def _remove_data(self, listener: Callable[..., Awaitable[None] | None], **kwargs removal_rp.stop() log.info(f"rsyncer {rp} rerun with removal") capture_post( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'register_processing_success_in_ispyb', session_id=self._environment.murfey_session)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="register_processing_success_in_ispyb", + session_id=self._environment.murfey_session, ) capture_delete( - url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self._environment.murfey_session)}" + base_url=str(self._environment.url.geturl()), + router_name="session_control.router", + function_name="remove_session", + session_id=self._environment.murfey_session, ) self.exit() diff --git a/src/murfey/client/tui/main.py b/src/murfey/client/tui/main.py index 4cb04809a..3bf59f0c8 100644 --- a/src/murfey/client/tui/main.py +++ b/src/murfey/client/tui/main.py @@ -24,7 +24,6 @@ from murfey.client.instance_environment import MurfeyInstanceEnvironment from murfey.client.tui.app import MurfeyTUI from murfey.client.tui.status_bar import StatusBar -from murfey.util.api import url_path_for from murfey.util.client import capture_get, read_config from murfey.util.models import Visit @@ -33,10 +32,13 @@ def _get_visit_list(api_base: ParseResult, instrument_name: str): proxy_path = api_base.path.rstrip("/") - get_visits_url = api_base._replace( - path=f"{proxy_path}{url_path_for('session_control.router', 'get_current_visits', instrument_name=instrument_name)}" + get_visits_url = api_base._replace(path=f"{proxy_path}") + server_reply = capture_get( + base_url=str(get_visits_url.geturl()), + router_name="session_control.router", + function_name="get_current_visits", + instrument_name=instrument_name, ) - server_reply = capture_get(url=get_visits_url.geturl()) if server_reply.status_code != 200: raise ValueError(f"Server unreachable ({server_reply.status_code})") return [Visit.model_validate(v) for v in server_reply.json()] @@ -272,7 +274,9 @@ def run(): # Set up websocket app and handler client_id_response = capture_get( - url=f"{murfey_url.geturl()}{url_path_for('session_control.router', 'new_client_id')}" + base_url=str(murfey_url.geturl()), + router_name="session_control.router", + function_name="new_client_id", ) if client_id_response.status_code == 401: exit( @@ -301,7 +305,10 @@ def run(): # Load machine data for subsequent sections machine_data = capture_get( - url=f"{murfey_url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + base_url=str(murfey_url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=instrument_name, ).json() gain_ref: Path | None = None diff --git a/src/murfey/client/tui/screens.py b/src/murfey/client/tui/screens.py index 23958e48f..888865ae4 100644 --- a/src/murfey/client/tui/screens.py +++ b/src/murfey/client/tui/screens.py @@ -55,7 +55,6 @@ ) from murfey.client.rsync import RSyncer from murfey.util import posix_path -from murfey.util.api import url_path_for from murfey.util.client import ( capture_delete, capture_get, @@ -85,7 +84,10 @@ def determine_default_destination( use_suggested_path: bool = True, ) -> str: machine_data = capture_get( - url=f"{environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=environment.instrument_name)}" + base_url=str(environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=environment.instrument_name, ).json() _default = "" if environment.processing_only_mode and environment.sources: @@ -110,8 +112,12 @@ def determine_default_destination( _default = environment.destination_registry[source_name] else: suggested_path_response = capture_post( - url=f"{str(environment.url.geturl())}{url_path_for('file_io_instrument.router', 'suggest_path', visit_name=visit, session_id=environment.murfey_session)}", - json={ + base_url=str(environment.url.geturl()), + router_name="file_io_instrument.router", + function_name="suggest_path", + visit_name=visit, + session_id=environment.murfey_session, + data={ "base_path": f"{destination}/{visit}/{mid_path.parent if include_mid_path else ''}/raw", "touch": touch, "extra_directory": extra_directory, @@ -266,7 +272,10 @@ def __init__( def compose(self): machine_data = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=instrument_name, ).json() self._dir_tree = _DirectoryTree( str(self._selected_dir), @@ -471,16 +480,26 @@ def _write_params( self.app._start_dc(params) if model == ProcessingParametersTomo: capture_post( - url=f"{self.app._environment.url.geturl()}/{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self.app._environment.murfey_session)}", - json=params, + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.tomo_router", + function_name="register_tomo_proc_params", + session_id=self.app._environment.murfey_session, + data=params, ) elif model == ProcessingParametersSPA: capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'register_spa_proc_params', session_id=self.app._environment.murfey_session)}", - json=params, + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="register_spa_proc_params", + session_id=self.app._environment.murfey_session, + data=params, ) capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('workflow.spa_router', 'flush_spa_processing', visit_name=self.app._environment.visit, session_id=self.app._environment.murfey_session)}", + base_url=str(self.app._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="flush_spa_processing", + visit_name=self.app._environment.visit, + session_id=self.app._environment.murfey_session, ) def on_switch_changed(self, event): @@ -623,16 +642,25 @@ def on_button_pressed(self, event: Button.Pressed): self.app.pop_screen() session_name = "Client connection" self.app._environment.murfey_session = capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'link_client_to_session', instrument_name=self.app._environment.instrument_name, client_id=self.app._environment.client_id)}", - json={"session_id": session_id, "session_name": session_name}, + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="link_client_to_session", + instrument_name=self.app._environment.instrument_name, + client_id=self.app._environment.client_id, + data={"session_id": session_id, "session_name": session_name}, ).json() def _remove_session(self, session_id: int, **kwargs): capture_delete( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=session_id)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="remove_session", + session_id=session_id, ) exisiting_sessions = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'get_sessions')}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="get_sessions", ).json() self.app.uninstall_screen("session-select-screen") if exisiting_sessions: @@ -654,8 +682,12 @@ def _remove_session(self, session_id: int, **kwargs): else: session_name = "Client connection" resp = capture_post( - f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'link_client_to_session', instrument_name=self.app._environment.instrument_name, client_id=self.app._environment.client_id)}", - json={"session_id": None, "session_name": session_name}, + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="link_client_to_session", + instrument_name=self.app._environment.instrument_name, + client_id=self.app._environment.client_id, + data={"session_id": None, "session_name": session_name}, ) if resp: self.app._environment.murfey_session = resp.json() @@ -676,12 +708,18 @@ def on_button_pressed(self, event: Button.Pressed): self.app._visit = text self.app._environment.visit = text response = capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_client_to_visit', visit_name=text)}", - json={"id": self.app._environment.client_id}, + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="register_client_to_visit", + visit_name=text, + data={"id": self.app._environment.client_id}, ) log.info(f"Posted visit registration: {response.status_code}") machine_data = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=instrument_name, ).json() if self._switch_status: @@ -715,7 +753,10 @@ def on_button_pressed(self, event: Button.Pressed): if machine_data.get("upstream_data_directories"): upstream_downloads = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'find_upstream_visits', session_id=self.app._environment.murfey_session)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.correlative_router", + function_name="find_upstream_visits", + session_id=self.app._environment.murfey_session, ).json() self.app.install_screen( UpstreamDownloads(upstream_downloads), "upstream-downloads" @@ -743,12 +784,18 @@ def on_button_pressed(self, event: Button.Pressed): self.app._visit = text self.app._environment.visit = text response = capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_client_to_visit', visit_name=text)}", - json={"id": self.app._environment.client_id}, + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="register_client_to_visit", + visit_name=text, + data={"id": self.app._environment.client_id}, ) log.info(f"Posted visit registration: {response.status_code}") machine_data = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=instrument_name, ).json() self.app.install_screen( @@ -777,7 +824,10 @@ def on_button_pressed(self, event: Button.Pressed): if machine_data.get("upstream_data_directories"): upstream_downloads = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'find_upstream_visits', session_id=self.app._environment.murfey_session)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.correlative_router", + function_name="find_upstream_visits", + session_id=self.app._environment.murfey_session, ).json() self.app.install_screen( UpstreamDownloads(upstream_downloads), "upstream-downloads" @@ -799,7 +849,10 @@ def compose(self): def on_button_pressed(self, event: Button.Pressed): machine_data = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=instrument_name)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="machine_info_by_instrument", + instrument_name=instrument_name, ).json() if machine_data.get("upstream_data_download_directory"): # Create the directory locally to save files to @@ -810,7 +863,11 @@ def on_button_pressed(self, event: Button.Pressed): # Get the paths to the TIFF files generated previously under the same session ID upstream_tiff_paths_response = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'gather_upstream_tiffs', visit_name=event.button.label, session_id=self.app._environment.murfey_session)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.correlative_router", + function_name="gather_upstream_tiffs", + visit_name=event.button.label, + session_id=self.app._environment.murfey_session, ) upstream_tiff_paths = upstream_tiff_paths_response.json() or [] @@ -819,7 +876,12 @@ def on_button_pressed(self, event: Button.Pressed): (download_dir / tp).parent.mkdir(exist_ok=True, parents=True) # Write TIFF to the specified file path stream_response = capture_get( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff', visit_name=event.button.label, session_id=self.app._environment.murfey_session, tiff_path=tp)}", + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.correlative_router", + function_name="get_tiff", + visit_name=event.button.label, + session_id=self.app._environment.murfey_session, + tiff_path=tp, ) # Write the file chunk-by-chunk to avoid hogging memory with open(download_dir / tp, "wb") as utiff: @@ -882,8 +944,11 @@ def on_button_pressed(self, event): f"Gain reference file {posix_path(self._dir_tree._gain_reference)!r} was not successfully transferred to {visit_path}/processing" ) process_gain_response = capture_post( - url=f"{str(self.app._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'process_gain', session_id=self.app._environment.murfey_session)}", - json={ + base_url=str(self.app._environment.url.geturl()), + router_name="file_io_instrument.router", + function_name="process_gain", + session_id=self.app._environment.murfey_session, + data={ "gain_ref": str(self._dir_tree._gain_reference), "eer": bool( self.app._machine_config.get("external_executables_eer") @@ -1183,10 +1248,16 @@ def file_copied(self, *args, **kwargs): self.query_one(ProgressBar).advance(1) if self.query_one(ProgressBar).progress == self.query_one(ProgressBar).total: capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'register_processing_success_in_ispyb', session_id=self.app._environment.murfey_session)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="register_processing_success_in_ispyb", + session_id=self.app._environment.murfey_session, ) capture_delete( - url=f"{self.app._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.app._environment.murfey_session)}" + base_url=str(self.app._environment.url.geturl()), + router_name="session_control.router", + function_name="remove_session", + session_id=self.app._environment.murfey_session, ) self.app.exit() @@ -1216,5 +1287,9 @@ def compose(self): def on_mount(self, event): capture_post( - url=f"{self.app._environment.url.geturl()}{url_path_for('prometheus.router', 'change_monitoring_status', visit_name=self.app._environment.visit, on=1)}" + base_url=str(self.app._environment.url.geturl()), + router_name="prometheus.router", + function_name="change_monitoring_status", + visit_name=self.app._environment.visit, + on=1, ) From a8370009598b59c02322e8c8078493b5caa1359f Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Fri, 22 Aug 2025 08:53:39 +0100 Subject: [PATCH 09/12] Update test --- tests/client/tui/test_main.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/client/tui/test_main.py b/tests/client/tui/test_main.py index 77608ebd6..e0a5c5561 100644 --- a/tests/client/tui/test_main.py +++ b/tests/client/tui/test_main.py @@ -17,9 +17,9 @@ @pytest.mark.parametrize("test_params", test_get_visit_list_params_matrix) -@mock.patch("murfey.client.tui.main.requests") +@mock.patch("murfey.client.tui.main.capture_get") def test_get_visit_list( - mock_request, + mock_request_get, test_params: tuple[str], mock_client_configuration, ): @@ -49,17 +49,19 @@ def test_get_visit_list( mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = example_visits - mock_request.get.return_value = mock_response + mock_request_get.return_value = mock_response # read_config() has to be patched using fixture, so has to be done in function with mock.patch("murfey.util.client.read_config", mock_client_configuration): visits = _get_visit_list(urlparse(server_url), instrument_name) # Check that request was sent with the correct URL - expected_url = ( - f"{server_url}/session_control/instruments/{instrument_name}/visits_raw" + mock_request_get.assert_called_once_with( + base_url=server_url, + router_name="session_control.router", + function_name="get_current_visits", + instrument_name=instrument_name, ) - mock_request.get.assert_called_once_with(expected_url) # Check that expected outputs are correct (order-sensitive) for v, visit in enumerate(visits): From bc72b9313740292b3d4892658e7c9859526eb7bd Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Fri, 22 Aug 2025 09:13:49 +0100 Subject: [PATCH 10/12] Try and fix clem calls --- src/murfey/server/api/clem.py | 64 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 7f7fb28b2..8e749d032 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -196,14 +196,25 @@ def get_db_entry( """ +class LifInfo(BaseModel): + lif_file: Path + master_metadata: Optional[Path] = None + child_metadata: list[Path] = [] + child_series: list[str] = [] + child_stacks: list[Path] = [] + + +class TiffInfo(BaseModel): + tiff_file: Path + associated_metadata: Optional[Path] = None + associated_series: Optional[str] = None + associated_stack: Optional[Path] = None + + @router.post("/sessions/{session_id}/clem/lif_files") def register_lif_file( - lif_file: Path, + lif_file: LifInfo, session_id: int, - master_metadata: Optional[Path] = None, - child_metadata: list[Path] = [], - child_series: list[str] = [], - child_stacks: list[Path] = [], db: Session = murfey_db, ): # Return or register the LIF file entry @@ -212,7 +223,7 @@ def register_lif_file( db=db, table=CLEMLIFFile, session_id=session_id, - file_path=lif_file, + file_path=lif_file.lif_file, ) except Exception: logger.error( @@ -223,9 +234,11 @@ def register_lif_file( return False # Add metadata information if provided - if master_metadata is not None: + if lif_file.master_metadata is not None: try: - master_metadata = validate_and_sanitise(master_metadata, session_id, db) + master_metadata = validate_and_sanitise( + lif_file.master_metadata, session_id, db + ) clem_lif_file.master_metadata = str(master_metadata) except Exception: logger.warning( @@ -235,7 +248,7 @@ def register_lif_file( ) # Register child metadata if provided - for metadata in child_metadata: + for metadata in lif_file.child_metadata: try: metadata_db_entry: CLEMImageMetadata = get_db_entry( db=db, @@ -255,7 +268,7 @@ def register_lif_file( continue # Register child image series if provided - for series in child_series: + for series in lif_file.child_series: try: series_db_entry: CLEMImageSeries = get_db_entry( db=db, @@ -275,7 +288,7 @@ def register_lif_file( continue # Register child image stacks if provided - for stack in child_stacks: + for stack in lif_file.child_stacks: try: stack_db_entry: CLEMImageStack = get_db_entry( db=db, @@ -303,11 +316,8 @@ def register_lif_file( @router.post("/sessions/{session_id}/clem/tiff_files") def register_tiff_file( - tiff_file: Path, + tiff_file: TiffInfo, session_id: int, - associated_metadata: Optional[Path] = None, - associated_series: Optional[str] = None, - associated_stack: Optional[Path] = None, db: Session = murfey_db, ): # Get or register the database entry @@ -316,7 +326,7 @@ def register_tiff_file( db=db, table=CLEMTIFFFile, session_id=session_id, - file_path=tiff_file, + file_path=tiff_file.tiff_file, ) except Exception: logger.error( @@ -327,58 +337,58 @@ def register_tiff_file( return False # Add metadata if provided - if associated_metadata is not None: + if tiff_file.associated_metadata is not None: try: metadata_db_entry: CLEMImageMetadata = get_db_entry( db=db, table=CLEMImageMetadata, session_id=session_id, - file_path=associated_metadata, + file_path=tiff_file.associated_metadata, ) # Link database entries clem_tiff_file.associated_metadata = metadata_db_entry except Exception: logger.warning( "Unable to register " - f"metadata file {sanitise(str(associated_metadata))!r} in association with " + f"metadata file {sanitise(str(tiff_file.associated_metadata))!r} in association with " f"TIFF file {sanitise(str(tiff_file))!r}: \n" f"{traceback.format_exc()}" ) # Add series information if provided - if associated_series is not None: + if tiff_file.associated_series is not None: try: series_db_entry: CLEMImageSeries = get_db_entry( db=db, table=CLEMImageSeries, session_id=session_id, - series_name=associated_series, + series_name=tiff_file.associated_series, ) # Link database entries clem_tiff_file.child_series = series_db_entry except Exception: logger.warning( "Unable to register " - f"CLEM series {sanitise(associated_series)!r} in association with " + f"CLEM series {sanitise(tiff_file.associated_series)!r} in association with " f"TIFF file {sanitise(str(tiff_file))!r}: \n" f"{traceback.format_exc()}" ) # Add image stack information if provided - if associated_stack is not None: + if tiff_file.associated_stack is not None: try: stack_db_entry: CLEMImageStack = get_db_entry( db=db, table=CLEMImageStack, session_id=session_id, - file_path=associated_stack, + file_path=tiff_file.associated_stack, ) # Link database entries clem_tiff_file.child_stack = stack_db_entry except Exception: logger.warning( "Unable to register " - f"image stack {sanitise(str(associated_stack))!r} in association with " + f"image stack {sanitise(str(tiff_file.associated_stack))!r} in association with " f"{traceback.format_exc()}" ) @@ -737,7 +747,7 @@ def register_image_stack( ) # API posts to this URL def process_raw_lifs( session_id: int, - lif_file: Path, + lif_file: LifInfo, db: Session = murfey_db, ): try: @@ -759,7 +769,7 @@ def process_raw_lifs( # Pass arguments along to the correct workflow workflow.load()( # Match the arguments found in murfey.workflows.clem.process_raw_lifs - file=lif_file, + file=lif_file.lif_file, root_folder="images", session_id=session_id, instrument_name=instrument_name, From 2b91db4f4f35c36faf11793d2bed7d89e4d992cf Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Fri, 22 Aug 2025 09:16:10 +0100 Subject: [PATCH 11/12] Revert this --- src/murfey/server/api/session_control.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index d9df164ee..3dc27adbb 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -296,8 +296,8 @@ def delete_rsyncer(session_id: int, rsyncer_source: StringOfPathModel, db=murfey db.commit() except Exception: logger.error( - "Failed to delete rsyncer for source directory " - f"{sanitise(str(rsyncer_source.path))!r} in session {session_id}.", + f"Failed to delete rsyncer for source directory {sanitise(str(rsyncer_source.path))!r} " + f"in session {session_id}.", exc_info=True, ) From 3373fd519a05ab3748fd4e53de79b2c22b65b51d Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 26 Aug 2025 16:46:47 +0100 Subject: [PATCH 12/12] Couple of typos --- src/murfey/server/api/workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index fdaa9f662..71fe71cc0 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -745,7 +745,7 @@ async def request_tomography_preprocessing( return proc_file -@tomo_router.post("/visits/{visit_name}/sesisons{session_id}/completed_tilt_series") +@tomo_router.post("/visits/{visit_name}/sessions/{session_id}/completed_tilt_series") def register_completed_tilt_series( visit_name: str, session_id: MurfeySessionID, @@ -876,7 +876,7 @@ class TiltInfo(BaseModel): source: str -@tomo_router.post("/visits/{visit_name}/sessions{session_id}/tilt") +@tomo_router.post("/visits/{visit_name}/sessions/{session_id}/tilt") async def register_tilt( visit_name: str, session_id: MurfeySessionID, tilt_info: TiltInfo, db=murfey_db ):