diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index de47ddeea..dae0e7fbc 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -378,6 +378,7 @@ def _start_rsyncer( stop_callback=self._rsyncer_stopped, do_transfer=self.do_transfer, remove_files=remove_files, + chmod=self._machine_config.get("rsync_chmod", "D0750,F0750"), substrings_blacklist=self._machine_config.get( "substrings_blacklist", {"directories": [], "files": []} ), diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 5d27dc0d5..475d31579 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -61,6 +61,7 @@ def __init__( local: bool = False, do_transfer: bool = True, remove_files: bool = False, + chmod: str = "D0750,F0750", required_substrings_for_removal: List[str] = [], substrings_blacklist: dict[str, list[str]] = {}, notify: bool = True, @@ -75,6 +76,7 @@ def __init__( self._local = local self._do_transfer = do_transfer self._remove_files = remove_files + self._chmod = chmod self._required_substrings_for_removal = required_substrings_for_removal self._substrings_blacklist = substrings_blacklist self._notify = notify @@ -501,7 +503,7 @@ def parse_stderr(line: str): # Needed as a pair to trigger permission modifications # Ref: https://serverfault.com/a/796341 "-p", - "--chmod=D0750,F0750", # Use extended chmod format + f"--chmod={self._chmod}", # Set permissions for transferred files and folders ] # Add file locations rsync_cmd.extend([".", self._remote]) diff --git a/src/murfey/server/api/file_io_instrument.py b/src/murfey/server/api/file_io_instrument.py index bbe12f1b0..59a2e826c 100644 --- a/src/murfey/server/api/file_io_instrument.py +++ b/src/murfey/server/api/file_io_instrument.py @@ -1,3 +1,4 @@ +import os from datetime import datetime from logging import getLogger from pathlib import Path @@ -33,7 +34,9 @@ class SuggestedPathParameters(BaseModel): - base_path: Path + base_path: ( + Path # Partial Path starting from immediately after the rsync destination + ) touch: bool = False extra_directory: str = "" @@ -86,9 +89,12 @@ def suggest_path( count = count + 1 if count else 2 check_path = check_path.parent / f"{check_path_name}{count}" if params.touch: - check_path.mkdir(mode=0o750) + check_path.mkdir() + os.chmod(check_path, mode=machine_config.mkdir_chmod) if params.extra_directory: - (check_path / secure_filename(params.extra_directory)).mkdir(mode=0o750) + extra_dir = check_path / secure_filename(params.extra_directory) + extra_dir.mkdir() + os.chmod(extra_dir, mode=machine_config.mkdir_chmod) return {"suggested_path": check_path.relative_to(rsync_basepath)} @@ -100,19 +106,35 @@ class Dest(BaseModel): def make_rsyncer_destination(session_id: int, destination: Dest, db=murfey_db): secure_path_parts = [secure_filename(p) for p in destination.destination.parts] destination_path = "/".join(secure_path_parts) - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + session_entry = db.exec(select(Session).where(Session.id == session_id)).one() + instrument_name = session_entry.instrument_name + visit = session_entry.visit machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] if not machine_config: raise ValueError("No machine configuration set when making rsyncer destination") + + # Make the destination directory and all parents full_destination_path = ( machine_config.rsync_basepath or Path("") ).resolve() / destination_path - for parent_path in full_destination_path.parents: - parent_path.mkdir(mode=0o750, exist_ok=True) + full_destination_path.mkdir(parents=True, exist_ok=True) + + # Change permissions for every folder after the visit directory + try: + visit_index = full_destination_path.parts.index(visit) + except ValueError: + logger.error(f"Could not find directory level {visit!r} in destination path") + raise + current_path = full_destination_path.parents[-(visit_index + 1)] + for part in full_destination_path.parts[visit_index + 1 :]: + current_path = current_path / part + try: + os.chmod(current_path, mode=machine_config.mkdir_chmod) + except PermissionError: + logger.warning(f"Unable to change permissions for {current_path}") + continue return destination diff --git a/src/murfey/server/api/file_io_shared.py b/src/murfey/server/api/file_io_shared.py index d510d732a..ceac71388 100644 --- a/src/murfey/server/api/file_io_shared.py +++ b/src/murfey/server/api/file_io_shared.py @@ -78,6 +78,7 @@ async def process_gain( env, rescale=gain_reference_params.rescale, tag=gain_reference_params.tag, + chmod=machine_config.mkdir_chmod, ) if new_gain_ref and new_gain_ref_superres: return { diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index 4d27fe9b5..ce63b6e84 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -1,4 +1,5 @@ import asyncio +import os from datetime import datetime from logging import getLogger from pathlib import Path @@ -607,9 +608,9 @@ async def request_spa_preprocessing( db.close() if not mrc_out.parent.exists(): - Path(secure_filename(str(mrc_out))).parent.mkdir( - parents=True, exist_ok=True - ) + mrc_out_dir = Path(secure_filename(str(mrc_out))).parent + mrc_out_dir.mkdir(parents=True, exist_ok=True) + os.chmod(mrc_out_dir, mode=machine_config.mkdir_chmod) recipe_name = machine_config.recipes.get( "em-spa-preprocess", "em-spa-preprocess" ) @@ -836,6 +837,7 @@ async def request_tomography_preprocessing( murfey_ids = _murfey_id(appid, db, number=1, close=False) if not mrc_out.parent.exists(): mrc_out.parent.mkdir(parents=True, exist_ok=True) + os.chmod(mrc_out.parent, mode=machine_config.mkdir_chmod) session_processing_parameters = db.exec( select(SessionProcessingParameters).where( @@ -988,6 +990,7 @@ def register_completed_tilt_series( ) if not stack_file.parent.exists(): stack_file.parent.mkdir(parents=True) + os.chmod(stack_file.parent, mode=machine_config.mkdir_chmod) tilt_offset = midpoint([float(get_angle(t)) for t in tilts]) zocalo_message = { "recipes": ["em-tomo-align"], @@ -1222,8 +1225,10 @@ async def make_gif( / "processed" ) output_dir.mkdir(exist_ok=True) + os.chmod(output_dir, mode=machine_config.mkdir_chmod) output_dir = output_dir / secure_filename(gif_params.raw_directory) output_dir.mkdir(exist_ok=True) + os.chmod(output_dir, mode=machine_config.mkdir_chmod) output_path = output_dir / f"lamella_{gif_params.lamella_number}_milling.gif" if Image is not None: diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 1fc0c635f..f12d3389f 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -9,6 +9,7 @@ import logging import math +import os import subprocess import time from datetime import datetime @@ -1402,6 +1403,7 @@ def _flush_tomography_preprocessing(message: dict, _db): p = Path(f.mrc_out) if not p.parent.exists(): p.parent.mkdir(parents=True) + os.chmod(p.parent, mode=machine_config.mkdir_chmod) movie = db.Movie( murfey_id=murfey_ids[0], data_collection_id=detached_ids[1], @@ -1876,6 +1878,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None: ) if not stack_file.parent.exists(): stack_file.parent.mkdir(parents=True) + os.chmod(stack_file.parent, machine_config.mkdir_chmod) tilt_offset = midpoint([float(get_angle(t)) for t in tilts]) zocalo_message = { "recipes": ["em-tomo-align"], diff --git a/src/murfey/server/gain.py b/src/murfey/server/gain.py index efdc5eba8..c5f131059 100644 --- a/src/murfey/server/gain.py +++ b/src/murfey/server/gain.py @@ -35,6 +35,7 @@ async def prepare_gain( env: Dict[str, str], rescale: bool = True, tag: str = "", + chmod: int = 0o750, ) -> Tuple[Path | None, Path | None]: if not all(executables.get(s) for s in ("dm2mrc", "clip", "newstack")): logger.error("No executables were provided to prepare the gain reference with") @@ -57,10 +58,10 @@ async def prepare_gain( return gain_out, gain_out_superres if rescale else gain_out for k, v in env.items(): os.environ[k] = v - if tag: - secure_path(gain_path.parent / f"gain_{tag}").mkdir(exist_ok=True) - else: - secure_path(gain_path.parent / "gain").mkdir(exist_ok=True) + gain_tag = f"gain_{tag}" if tag else "gain" + gain_dir = secure_path(gain_path.parent / gain_tag) + gain_dir.mkdir(exist_ok=True) + os.chmod(gain_dir, chmod) gain_path = _sanitise(gain_path, tag) flip = "flipx" if camera == Camera.K3_FLIPX else "flipy" gain_path_mrc = gain_path.with_suffix(".mrc") @@ -109,7 +110,10 @@ async def prepare_gain( async def prepare_eer_gain( - gain_path: Path, executables: Dict[str, str], env: Dict[str, str], tag: str = "" + gain_path: Path, + executables: Dict[str, str], + env: Dict[str, str], + tag: str = "", ) -> Tuple[Path | None, Path | None]: if not executables.get("tif2mrc"): logger.error( diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 5e50d7c22..11e8db570 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -58,11 +58,13 @@ class MachineConfig(BaseModel): # type: ignore "directories": [], "files": [], } + mkdir_chmod: int = 0o750 # Rsync setup rsync_url: str = "" rsync_module: str = "" rsync_basepath: Optional[Path] = None + rsync_chmod: str = "D0750,F0750" allow_removal: bool = False # Upstream data download setup @@ -151,6 +153,15 @@ def validate_software_versions(cls, v: dict[str, Any]) -> dict[str, str]: # Let it validate and fail as-is return v + @field_validator("mkdir_chmod", mode="before") + @classmethod + def parse_octal_int(cls, value): + # Attempt to parse the string as an octal int + if isinstance(value, str) and value.startswith("0o") and value[2:].isdigit(): + return int(value, 8) + # Return value as-is otherwise + return value + @lru_cache(maxsize=1) def machine_config_from_file( diff --git a/src/murfey/workflows/spa/atlas.py b/src/murfey/workflows/spa/atlas.py index 873843f4f..9502c33aa 100644 --- a/src/murfey/workflows/spa/atlas.py +++ b/src/murfey/workflows/spa/atlas.py @@ -1,4 +1,5 @@ import logging +import os from pathlib import Path import mrcfile @@ -37,6 +38,7 @@ def atlas_jpg_from_mrc(instrument_name: str, visit_name: str, atlas_mrc: Path): / secure_filename(f"{sample_id}_{atlas_mrc.stem}_fullres.jpg") ) atlas_jpg_file.parent.mkdir(parents=True, exist_ok=True) + os.chmod(atlas_jpg_file.parent, mode=machine_config.mkdir_chmod) data = data - data.min() data = data.astype(float) * 255 / data.max() diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py index e3d52d614..d52c18c27 100644 --- a/src/murfey/workflows/spa/flush_spa_preprocess.py +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -1,4 +1,5 @@ import logging +import os from pathlib import Path from typing import Optional @@ -164,11 +165,15 @@ def register_grid_square( ".tiff" ).is_file(): secured_grid_square_image_path_full_res = ( - secured_grid_square_image_path_full_res.with_suffix(".tiff") + secured_grid_square_image_path_full_res.with_suffix( + ".tiff" + ) ) else: secured_grid_square_image_path_full_res = ( - secured_grid_square_image_path_full_res.with_suffix(".mrc") + secured_grid_square_image_path_full_res.with_suffix( + ".mrc" + ) ) smartem_client = SmartEMAPIClient( base_url=machine_config.smartem_api_url, logger=logger @@ -543,6 +548,7 @@ def flush_spa_preprocess(message: dict, murfey_db: Session) -> dict[str, bool]: ppath = Path(f.file_path) if not mrcp.parent.exists(): mrcp.parent.mkdir(parents=True) + os.chmod(mrcp.parent, mode=machine_config.mkdir_chmod) movie = Movie( murfey_id=murfey_ids[2 * i], data_collection_id=collected_ids[1].id, diff --git a/tests/server/api/test_file_io_instrument.py b/tests/server/api/test_file_io_instrument.py new file mode 100644 index 000000000..eaefd909f --- /dev/null +++ b/tests/server/api/test_file_io_instrument.py @@ -0,0 +1,160 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from murfey.server.api.file_io_instrument import ( + Dest, + SuggestedPathParameters, + make_rsyncer_destination, + suggest_path, +) +from murfey.util.config import MachineConfig + + +@pytest.mark.parametrize( + "test_params", + ( # Touch | Extra directory | Has raw + (True, "extra", False), + (False, "extra", False), + (True, "", False), + (False, "", False), + (True, "extra", True), + (False, "extra", True), + (True, "", True), + (False, "", True), + ), +) +def test_suggest_path( + mocker: MockerFixture, + test_params: tuple[bool, str, bool], + tmp_path: Path, +): + # Unpack test params + touch, extra_dir, has_raw = test_params + + # Set other parameters + instrument_name = "test" + year = "2026" + visit_name = "visit" + session_id = 1 + + rsync_basepath = tmp_path / "data" + visit_dir = rsync_basepath / year / visit_name + visit_dir.mkdir(parents=True, exist_ok=True) + if has_raw: + (visit_dir / "raw").mkdir(parents=True, exist_ok=True) + + params = SuggestedPathParameters( + base_path=visit_dir.relative_to(rsync_basepath) / "raw", + touch=touch, + extra_directory=extra_dir, + ) + + # Mock the database call + mock_session = MagicMock() + mock_session.instrument_name = instrument_name + mock_db = MagicMock() + mock_db.exec.return_value.one.return_value = mock_session + + # Mock 'get_machine_config' + machine_config = MachineConfig( + **{ + "rsync_basepath": str(rsync_basepath), + "mkdir_chmod": "0o775", + } + ) + mocker.patch( + "murfey.server.api.file_io_instrument.get_machine_config", + return_value={ + instrument_name: machine_config, + }, + ) + + # Run the function and check outputs + result = suggest_path( + visit_name=visit_name, + session_id=session_id, + params=params, + db=mock_db, + ) + + # Check that the correct suggestion was returned + dir_name = "raw" if not has_raw else "raw2" + assert result["suggested_path"] == visit_dir.relative_to(rsync_basepath) / dir_name + + # Check that folders are made only if 'touch' is set + assert ( + (visit_dir / dir_name).exists() + if touch + else not (visit_dir / dir_name).exists() + ) + if touch and extra_dir: + assert (visit_dir / dir_name / extra_dir).exists() + + +@pytest.mark.parametrize( + "dir_name", + ( + # General + "images", + "screenshots", + # SPA/Tomo-specific + "raw", + "raw2", + "raw3", + "atlas", + # FIB-specific + "autotem", + "maps", + "meteor", + "extras", + ), +) +def test_make_rsyncer_destination( + mocker: MockerFixture, + dir_name: str, + tmp_path: Path, +): + # Set other parameters + instrument_name = "test" + year = "2026" + visit_name = "visit" + session_id = 1 + + rsync_basepath = tmp_path / "data" + visit_dir = rsync_basepath / year / visit_name + destination = visit_dir / dir_name + + dest = Dest(destination=destination.relative_to(rsync_basepath)) + + # Mock the database call + mock_session = MagicMock() + mock_session.instrument_name = instrument_name + mock_session.visit = visit_name + mock_db = MagicMock() + mock_db.exec.return_value.one.return_value = mock_session + + # Mock 'get_machine_config' + machine_config = MachineConfig( + **{ + "rsync_basepath": str(rsync_basepath), + "mkdir_chmod": "0o775", + } + ) + mocker.patch( + "murfey.server.api.file_io_instrument.get_machine_config", + return_value={ + instrument_name: machine_config, + }, + ) + + # Run the function and check expected outputs + result = make_rsyncer_destination( + session_id=session_id, + destination=dest, + db=mock_db, + ) + assert result == dest + assert destination.exists() diff --git a/tests/server/api/test_workflow.py b/tests/server/api/test_workflow.py index a52387ff7..1f6ee0c54 100644 --- a/tests/server/api/test_workflow.py +++ b/tests/server/api/test_workflow.py @@ -487,6 +487,7 @@ async def test_make_gif( # Mock the machine config and 'get_machine_config' mock_machine_config = MagicMock() mock_machine_config.rsync_basepath = rsync_basepath + mock_machine_config.mkdir_chmod = 0o775 mocker.patch( "murfey.server.api.workflow.get_machine_config", return_value={ diff --git a/tests/server/test_gain.py b/tests/server/test_gain.py new file mode 100644 index 000000000..5df67ee4f --- /dev/null +++ b/tests/server/test_gain.py @@ -0,0 +1,184 @@ +import os +from pathlib import Path +from unittest import mock +from unittest.mock import AsyncMock, MagicMock + +import pytest +from pytest_mock import MockerFixture + +from murfey.server.gain import Camera, prepare_gain +from murfey.util import secure_path + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "test_params", + ( # Has executables | Camera | Suffix | Tag | Return Codes | Exists | Rescale + # Early exit cases on top + (False, 1, ".dm4", "", (0, 0, 0), False, True), # No executables + (True, 3, ".dm4", "", (0, 0, 0), False, True), # Falcon camera + (True, 1, ".mrc", "", (0, 0, 0), False, True), # Not .dm4 file + (True, 1, ".dm4", "", (1, 0, 0), False, True), # 'dm2mrc' failure + (True, 1, ".dm4", "", (0, 1, 0), False, True), # 'clip' failure + (True, 1, ".dm4", "", (0, 0, 1), False, True), # 'newstack' failure + # Normal cases + (True, 1, ".dm4", "20250123T123456", (0, 0, 0), False, True), + (True, 2, ".dm4", "", (0, 0, 0), False, True), + (True, 1, ".dm4", "20250123T123456", (0, 0, 0), False, False), + (True, 2, ".dm4", "", (0, 0, 0), False, False), + (True, 1, ".dm4", "20250123T123456", (0, 0, 0), True, True), + (True, 2, ".dm4", "", (0, 0, 0), True, True), + (True, 1, ".dm4", "20250123T123456", (0, 0, 0), True, False), + (True, 2, ".dm4", "", (0, 0, 0), True, False), + ), +) +async def test_prepare_gain( + mocker: MockerFixture, + test_params: tuple[bool, int, str, str, tuple[int, int, int], bool, bool], + tmp_path: Path, +): + # Unpack test params + has_executables, camera, suffix, tag, return_codes, exists, rescale = test_params + + # Visit directory + visit_dir = tmp_path / "data" / "2026" / "visit" + + # Original gain reference + gain_path = visit_dir / "processing" / f"K3-18480071_Gain_Ref._x1.m1.kv300{suffix}" + gain_path.parent.mkdir(parents=True, exist_ok=True) + gain_path.touch(exist_ok=True) + + # Output files + gain_out = ( + gain_path.parent / f"gain_{tag}.mrc" if tag else gain_path.parent / "gain.mrc" + ) + if exists: + gain_out.touch(exist_ok=True) + gain_out_superres = ( + gain_path.parent / f"gain_{tag}_superres.mrc" + if tag + else gain_path.parent / "gain_superres.mrc" + ) + # Create additional gain paths in a nested directory + gain_dir = f"gain_{tag}" if tag else "gain" + gain_path_new = gain_path.parent / gain_dir / gain_path.name + gain_path_mrc = gain_path_new.with_suffix(".mrc") + gain_path_superres = gain_path_new.parent / (gain_path_new.name + "_superres.mrc") + + # Dummy executables + executables = ( + { + "dm2mrc": mock.ANY, + "clip": mock.ANY, + "newstack": mock.ANY, + } + if has_executables + else {} + ) + + # Dummy environment variables + env = { + "dummy1": "dummy1", + "dummy2": "dummy2", + } + + # Mock the logger to check that expected messages are there + mock_logger = mocker.patch("murfey.server.gain.logger") + + # Create mocks for the different subprocess calls + mock_subprocesses = [] + for returncode in return_codes: + mock_subprocess = MagicMock() + mock_subprocess.communicate = AsyncMock(return_value=(b"dummy", b"dummy")) + mock_subprocess.returncode = returncode + mock_subprocesses.append(mock_subprocess) + + # Patch 'asyncio.create_subprocess_shell' + mock_shell = mocker.patch( + "murfey.server.gain.asyncio.create_subprocess_shell", + new_callable=AsyncMock, + side_effect=mock_subprocesses, + ) + + # Create the commands that the subprocesses are expected to be called with, in order + flip = "flipx" if camera == Camera.K3_FLIPX else "flipy" + commands = ( + [ + f"{executables['dm2mrc']} {gain_path_new} {gain_path_mrc}", + f"{executables['clip']} {flip} {secure_path(gain_path_mrc)} {secure_path(gain_path_superres) if rescale else secure_path(gain_out)}", + f"{executables['newstack']} -bin 2 {secure_path(gain_path_superres)} {secure_path(gain_out)}", + ] + if has_executables + else [] + ) + + result = await prepare_gain( + camera=camera, + gain_path=gain_path, + executables=executables, + env=env, + rescale=rescale, + tag=tag, + chmod=0o750, + ) + + # Check early exit cases + # No executables + if not has_executables: + mock_logger.error.assert_called_with( + "No executables were provided to prepare the gain reference with" + ) + assert result == (None, None) + # Falcon camera + elif camera == Camera.FALCON: + mock_logger.info.assert_called_with( + "Gain reference preparation not needed for Falcon detector" + ) + assert result == (None, None) + # Not a DM file + elif suffix != ".dm4": + assert result == (None, None) + # 'dm2mrc' fails + elif return_codes[0]: + assert mock_shell.call_count == 1 + mock_logger.error.assert_called_with( + "Error encountered while trying to process the gain reference with 'dm2mrc': \n" + "dummy" + ) + assert result == (None, None) + # 'clip' fails + elif return_codes[1]: + assert mock_shell.call_count == 2 + mock_logger.error.assert_called_with( + "Error encountered while trying to process the gain reference with 'clip': \n" + "dummy" + ) + assert result == (None, None) + # 'newstack' fails + elif return_codes[2] and rescale: + assert mock_shell.call_count == 3 + mock_logger.error.assert_called_with( + "Error encountered while trying to process the gain reference with 'newstack': \n" + "dummy" + ) + assert result == (None, None) + # File already exists + elif exists: + assert mock_shell.call_count == 0 + assert result == (gain_out, (gain_out_superres if rescale else gain_out)) + + # Check that the expected calls were made + else: + # Environment variables were set + for k, v in env.items(): + assert os.getenv(k) == v + assert mock_shell.call_count == 3 if rescale else 2 + for i, awaited in enumerate(mock_shell.await_args_list): + args, _ = awaited + assert args[0] == commands[i] + assert result == (gain_out, (gain_out_superres if rescale else gain_out)) + + +@pytest.mark.asyncio +async def test_prepare_eer_gain(): + pass diff --git a/tests/workflows/spa/test_atlas_workflow.py b/tests/workflows/spa/test_atlas_workflow.py index 1b1ea5e0c..0e4b514e4 100644 --- a/tests/workflows/spa/test_atlas_workflow.py +++ b/tests/workflows/spa/test_atlas_workflow.py @@ -39,6 +39,7 @@ def test_atlas_jpg_from_mrc( # Mock the return result of 'get_machine_config()' mock_machine_config = MagicMock() mock_machine_config.processed_directory_name = processed_dir_name + mock_machine_config.mkdir_chmod = 0o775 mocker.patch( "murfey.workflows.spa.atlas.get_machine_config", return_value={"test": mock_machine_config},