Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 11 additions & 91 deletions src/murfey/client/contexts/tomo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime
from pathlib import Path
from threading import RLock
from typing import Callable, Dict, List, NamedTuple, Optional, OrderedDict
from typing import Callable, Dict, List, Optional, OrderedDict

import requests
import xmltodict
Expand All @@ -27,95 +27,34 @@
requests.get, requests.post, requests.put, requests.delete = authorised_requests()


class TiltInfoExtraction(NamedTuple):
series: Callable[[Path], str]
angle: Callable[[Path], str]
tag: Callable[[Path], str]


def _get_tilt_series_v5_7(p: Path) -> str:
return p.name.split("_")[1]


def _get_tilt_angle_v5_7(p: Path) -> str:
return p.name.split("[")[1].split("]")[0]


def _get_tilt_tag_v5_7(p: Path) -> str:
return p.name.split("_")[0]


def _get_slice_index_v5_11(tag: str) -> int:
slice_index = 0
for i, ch in enumerate(tag[::-1]):
if not ch.isnumeric():
slice_index = -i
break
if not slice_index:
raise ValueError(
f"The file tag {tag} does not end in numeric characters or is entirely numeric: cannot parse"
)
return slice_index


def _get_tilt_series_v5_11(p: Path) -> str:
tag = p.name.split("_")[0]
slice_index = _get_slice_index_v5_11(tag)
return tag[slice_index:]


def _get_tilt_tag_v5_11(p: Path) -> str:
tag = p.name.split("_")[0]
slice_index = _get_slice_index_v5_11(tag)
return tag[:slice_index]


def _get_tilt_angle_v5_11(p: Path) -> str:
_split = p.name.split("_")[2].split(".")
return ".".join(_split[:-1])


def _find_angle_index(split_name: List[str]) -> int:
for i, part in enumerate(split_name):
if "." in part:
if "." in part and part[0].isnumeric():
return i
return 0


def _get_tilt_series_v5_12(p: Path) -> str:
split_name = p.name.split("_")
angle_idx = _find_angle_index(split_name)
if split_name[angle_idx - 2].isnumeric():
return split_name[angle_idx - 2]
return ""
return -1


def _get_tilt_angle_v5_12(p: Path) -> str:
split_name = p.name.split("_")
split_name = p.stem.split("_")
angle_idx = _find_angle_index(split_name)
if angle_idx == -1:
return ""
return split_name[angle_idx]


def _get_tilt_tag_v5_12(p: Path) -> str:
split_name = p.name.split("_")
angle_idx = _find_angle_index(split_name)
if split_name[angle_idx - 2].isnumeric():
return "_".join(split_name[: angle_idx - 2])
return "_".join(split_name[: angle_idx - 1])


tomo_tilt_info = {
"5.7": TiltInfoExtraction(
_get_tilt_series_v5_7, _get_tilt_angle_v5_7, _get_tilt_tag_v5_7
),
"5.11": TiltInfoExtraction(
_get_tilt_series_v5_11, _get_tilt_angle_v5_11, _get_tilt_tag_v5_11
),
"5.12": TiltInfoExtraction(
_get_tilt_series_v5_12,
_get_tilt_angle_v5_12,
_get_tilt_tag_v5_12,
),
"5.7": _get_tilt_angle_v5_7,
"5.11": _get_tilt_angle_v5_11,
"5.12": _get_tilt_angle_v5_12,
}


Expand Down Expand Up @@ -177,8 +116,6 @@ def __init__(self, acquisition_software: str, basepath: Path):
self._processing_job_stash: dict = {}
self._preprocessing_triggers: dict = {}
self._lock: RLock = RLock()
self._extract_tilt_series: Callable[[Path], str] | None = None
self._extract_tilt_tag: Callable[[Path], str] | None = None

def _flush_data_collections(self):
logger.info(
Expand Down Expand Up @@ -232,14 +169,7 @@ def _check_for_alignment(
manual_tilt_offset: Optional[float],
pixel_size: Optional[float],
):
if self._extract_tilt_series and self._extract_tilt_tag:
tilt_series = (
f"{self._extract_tilt_tag(movie_path)}_{self._extract_tilt_series(movie_path)}"
if self._extract_tilt_tag(movie_path)
else self._extract_tilt_series(movie_path)
)
else:
return
tilt_series = _construct_tilt_series_name(movie_path)

if self._motion_corrected_tilt_series.get(
tilt_series
Expand Down Expand Up @@ -370,9 +300,7 @@ def _get_source(
def _add_tilt(
self,
file_path: Path,
extract_tilt_series: Callable[[Path], str],
extract_tilt_angle: Callable[[Path], str],
extract_tilt_tag: Callable[[Path], str],
environment: MurfeyInstanceEnvironment | None = None,
required_position_files: List[Path] | None = None,
required_strings: List[str] | None = None,
Expand All @@ -390,10 +318,6 @@ def _add_tilt(
)
if required_strings and not any(r in file_path.name for r in required_strings):
return []
if not self._extract_tilt_series:
self._extract_tilt_series = extract_tilt_series
if not self._extract_tilt_tag:
self._extract_tilt_tag = extract_tilt_tag
try:
tilt_angle = extract_tilt_angle(file_path)
try:
Expand Down Expand Up @@ -733,9 +657,7 @@ def _add_tomo_tilt(
tilt_series = _construct_tilt_series_name(file_path)
return self._add_tilt(
file_path,
tilt_info_extraction.series,
tilt_info_extraction.angle,
tilt_info_extraction.tag,
tilt_info_extraction,
environment=environment,
required_position_files=(
required_position_files
Expand Down Expand Up @@ -767,9 +689,7 @@ def _extract_tilt_series(p: Path) -> str:

return self._add_tilt(
file_path,
_extract_tilt_series,
lambda x: ".".join(x.name.split(delimiter)[-1].split(".")[:-1]),
lambda x: "",
environment=environment,
required_strings=[],
)
Expand Down