diff --git a/docs/Camera/GPHOTO2_ADD_CAMERA.md b/docs/Camera/GPHOTO2_ADD_CAMERA.md new file mode 100644 index 0000000..5699186 --- /dev/null +++ b/docs/Camera/GPHOTO2_ADD_CAMERA.md @@ -0,0 +1,94 @@ +# Add a GPhoto2 Camera Profile + +This guide shows how to add support for a new gphoto2-compatible camera using a +Python profile class. + +## 1. Detect your camera + +Connect camera over USB and run: + +```bash +gphoto2 --auto-detect +``` + +Copy the detected model string. You will use parts of this string in +`_MODEL_MARKERS`. + +## 2. Inspect config keys and choices + +List available keys: + +```bash +gphoto2 --list-config +``` + +Inspect important keys: + +```bash +gphoto2 --get-config /main/settings/capturetarget +gphoto2 --get-config /main/capturesettings/shutterspeed +gphoto2 --get-config /main/imgsettings/imageformat +gphoto2 --get-config /main/imgsettings/iso +``` + +## 3. Copy the template profile + +Copy: + +`openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/template_camera.py` + +Create a new file with your camera name, for example: + +`openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/my_camera.py` + +Then update: + +- `profile_id` +- `_MODEL_MARKERS` +- config key lists (`_SHUTTER_KEYS`, `_ISO_KEYS`, `_RAW_FORMAT_KEYS`, ...) +- startup defaults in `apply_startup_config` +- optional RAW behavior in `capture_dng` + +## 4. Register the profile + +No manual registry edit is needed. + +All Python modules in: + +`openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/` + +are auto-discovered by the profile registry at startup. + +Requirements: + +- your class must inherit from `GPhoto2Profile` (or `GenericGPhoto2Profile`) +- `register_in_registry` must be `True` (default) +- `matches(identity)` should return `True` only for your target camera model + +## 5. Run a JPEG test + +Use the firmware API/flow to capture a JPEG and check: + +- image is captured successfully +- expected shutter and quality values are applied +- diagnostics show expected config keys + +## 6. Run a RAW test + +Capture RAW/DNG via the firmware flow and verify: + +- file extension is RAW-like for your camera (`.nef`, `.cr2`, `.raw`, ...) +- profile can switch to RAW mode +- profile restores previous image format after capture + +## 7. Debug setting failures + +`write_first_config(...)` returns explicit result details: + +- attempted keys +- requested value +- success/failure +- failure message + +If a setting fails, inspect these values first, then compare with +`gphoto2 --get-config ` output. diff --git a/openscan_firmware/config/scan.py b/openscan_firmware/config/scan.py index 09c5db0..7ac8e56 100644 --- a/openscan_firmware/config/scan.py +++ b/openscan_firmware/config/scan.py @@ -11,7 +11,7 @@ class ScanSetting(BaseModel): ) points: int = Field(130, ge=1, le=999, description="Number of points in scanning path.") - image_format: Literal['jpeg','dng','rgb_array', 'yuv_array'] = Field( + image_format: Literal['jpeg', 'raw', 'dng', 'rgb_array', 'yuv_array'] = Field( default='jpeg', description='Output image format (JPEG, DNG, RGB array or YUV array).' ) @@ -49,4 +49,4 @@ def focus_positions(self) -> list[float]: return [ min_focus + i * (max_focus - min_focus) / (self.focus_stacks - 1) for i in range(self.focus_stacks) - ] \ No newline at end of file + ] diff --git a/openscan_firmware/controllers/hardware/cameras/camera.py b/openscan_firmware/controllers/hardware/cameras/camera.py index 37bed3a..31b8cbb 100644 --- a/openscan_firmware/controllers/hardware/cameras/camera.py +++ b/openscan_firmware/controllers/hardware/cameras/camera.py @@ -123,6 +123,7 @@ def photo(self, image_format: str = "jpeg") -> PhotoData: """ handler = { "jpeg": self.capture_jpeg, + "raw": self.capture_dng, # legacy implementation hook kept as capture_dng "dng": self.capture_dng, "rgb_array": self.capture_rgb_array, "yuv_array": self.capture_yuv_array, diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/controller.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/controller.py index 3b9f248..2b3b30d 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/controller.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/controller.py @@ -40,6 +40,88 @@ def __init__(self, camera: Camera): def cleanup(self): self._session.close() + def get_diagnostics(self) -> dict: + """Return diagnostics gathered from the active gphoto2 controller session.""" + relevant_keys = [ + "/main/settings/capturetarget", + "/main/settings/capture", + "/main/settings/recordingmedia", + "/main/settings/remotemode", + "/main/capturesettings/shutterspeed", + "/main/capturesettings/aperture", + "/main/capturesettings/autoexposuremode", + "/main/capturesettings/focusmode", + "/main/imgsettings/imageformat", + "/main/imgsettings/imagequality", + "/main/imgsettings/iso", + "/main/status/liveviewstatus", + "/main/status/liveviewselector", + "/main/other/applicationmode", + "capturetarget", + "capture", + "recordingmedia", + "remotemode", + "shutterspeed", + "aperture", + "autoexposuremode", + "focusmode", + "imageformat", + "imagequality", + "iso", + "liveviewstatus", + "liveviewselector", + "applicationmode", + ] + with self._hw_lock: + camera = self._session.ensure_connected() + summary = None + about = None + groups: list[str] = [] + relevant: list[dict] = [] + + try: + summary = str(getattr(camera.get_summary(), "text", "")).strip() or None + except Exception: + summary = None + try: + about = str(getattr(camera.get_about(), "text", "")).strip() or None + except Exception: + about = None + + try: + config = camera.get_config() + child_count = config.count_children() + for child_idx in range(child_count): + child = config.get_child(child_idx) + groups.append(f"{child.get_name()}: {child.get_label()}") + except Exception: + groups = [] + + seen_paths: set[str] = set() + for key in relevant_keys: + read_result = self._session.read_config(key) + if not read_result.success or read_result.details is None: + continue + details = read_result.details + key_path = str(details.get("key", key)) + if key_path in seen_paths: + continue + seen_paths.add(key_path) + relevant.append(details) + + identity = self._session.identity + return { + "model": identity.model or self.camera.name, + "path": identity.port or self.camera.path, + "summary": summary, + "about": about, + "config_groups": groups, + "relevant_config": relevant, + "profile": self._profile.profile_id, + "in_use_by_openscan": True, + "error": None, + } + def _apply_settings_to_hardware(self, settings: CameraSettings): self._set_busy(True) try: @@ -67,7 +149,7 @@ def capture_dng(self) -> PhotoData: self._set_busy(True) try: content, extra = self._profile.capture_dng(self._session) - return self._create_photo_data(io.BytesIO(content), "dng", extra) + return self._create_photo_data(io.BytesIO(content), "raw", extra) finally: self._set_busy(False) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profile.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profile.py index dc97fe4..134126d 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/profile.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profile.py @@ -8,6 +8,8 @@ from openscan_firmware.config.camera import CameraSettings +from .profile_helpers import select_best_shutter_choice + logger = logging.getLogger(__name__) @@ -18,9 +20,10 @@ class CameraIdentity: class GPhoto2Profile: - """Base profile for model-specific GPhoto2 tuning.""" + """Base profile contract for camera model-specific GPhoto2 behavior.""" profile_id = "generic" + register_in_registry = True def matches(self, identity: CameraIdentity) -> bool: return True @@ -51,3 +54,16 @@ def build_metadata( if extra: metadata.update(extra) return metadata + + # Shared helper methods for profile implementations. + def _set_first(self, session: Any, keys: list[str], value: Any) -> bool: + return session.write_first_config(keys, value).success + + def _get_first_details(self, session: Any, keys: list[str]) -> dict[str, Any] | None: + result = session.read_first_config(keys) + return result.details if result.success else None + + def _pick_best_shutter(self, session: Any, keys: list[str], shutter_ms: float) -> str: + details = self._get_first_details(session, keys) + choices = [] if details is None else list(details.get("choices") or []) + return select_best_shutter_choice(shutter_ms=shutter_ms, available_choices=choices) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_helpers.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_helpers.py new file mode 100644 index 0000000..634466b --- /dev/null +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_helpers.py @@ -0,0 +1,121 @@ +"""Reusable helpers for GPhoto2 profile implementations.""" + +from __future__ import annotations + +import time +from fractions import Fraction +from typing import Any + + +def format_shutter_value_ms(shutter_ms: float) -> str: + seconds = max(shutter_ms / 1000.0, 0.000125) + if seconds >= 1.0: + return f"{seconds:.1f}".rstrip("0").rstrip(".") + reciprocal = round(1.0 / seconds) + return f"1/{max(reciprocal, 1)}" + + +def parse_shutter_choice_seconds(value: str) -> float | None: + normalized = value.strip().lower() + if not normalized or normalized == "bulb": + return None + if "/" in normalized: + try: + return float(Fraction(normalized)) + except Exception: + return None + try: + return float(normalized) + except Exception: + return None + + +def select_best_shutter_choice(shutter_ms: float, available_choices: list[Any]) -> str: + target_seconds = max(shutter_ms / 1000.0, 0.000125) + if not available_choices: + return format_shutter_value_ms(shutter_ms) + + best_choice: str | None = None + best_error = float("inf") + for choice in available_choices: + parsed_seconds = parse_shutter_choice_seconds(str(choice)) + if parsed_seconds is None: + continue + error = abs(parsed_seconds - target_seconds) + if error < best_error: + best_error = error + best_choice = str(choice) + return best_choice or format_shutter_value_ms(shutter_ms) + + +def map_gain_to_iso_choice(gain: float | None, iso_choices: list[int]) -> str | None: + if gain is None: + return None + target = max(float(gain), 0.0) * 100.0 + nearest = min(iso_choices, key=lambda iso: abs(iso - target)) + return str(nearest) + + +def is_raw_filename(name: str, raw_extensions: tuple[str, ...]) -> bool: + return name.lower().endswith(raw_extensions) + + +def pick_raw_choice_from_details(details: dict[str, Any] | None, markers: tuple[str, ...] = ("raw", "nef")) -> str: + if not details: + return "RAW" + choices = details.get("choices") or [] + for choice in choices: + text = str(choice).strip().lower() + if any(marker in text for marker in markers): + return str(choice) + return "RAW" + + +def restore_previous_config_value(session, keys: list[str], previous_value: Any | None) -> None: + if previous_value is None: + return + session.write_first_config(keys, previous_value) + + +def capture_with_route_fallbacks( + session, + routes: list[dict[str, str]], + capture_route_applier, + raw_filename_checker, + attempts_per_route: int = 3, + retry_delay_step_s: float = 0.15, +) -> tuple[bytes, dict[str, Any], dict[str, Any]]: + """Capture and try fallback routes until a RAW filename is observed.""" + capture_name = "" + last_error: Exception | None = None + + for route_index, route in enumerate(routes): + capture_route_applier(session, route) + for attempt in range(1, attempts_per_route + 1): + try: + content, extra = session.capture_image() + except Exception as exc: + last_error = exc + if attempt < attempts_per_route: + time.sleep(retry_delay_step_s * attempt) + continue + break + + capture_name = str(extra.get("capture_name", "")).lower() + if raw_filename_checker(capture_name): + diagnostics = { + "capture_route_index": route_index, + "capture_route": route, + "capture_attempt": attempt, + } + return content, extra, diagnostics + + if attempt < attempts_per_route: + time.sleep(retry_delay_step_s * attempt) + + if last_error is not None: + raise RuntimeError(f"All RAW capture routes failed: {last_error}") from last_error + raise RuntimeError( + "Camera returned a non-RAW file while RAW was requested " + f"(last capture_name='{capture_name or 'unknown'}')." + ) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_registry.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_registry.py index 83cfd8e..0cdf409 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_registry.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profile_registry.py @@ -2,13 +2,63 @@ from __future__ import annotations +import importlib +import inspect +import logging +import pkgutil + from .profile import CameraIdentity, GPhoto2Profile -from .profiles import CanonEOS700DProfile, GenericGPhoto2Profile +from .profiles.generic import GenericGPhoto2Profile + +logger = logging.getLogger(__name__) + + +def _iter_profile_classes() -> list[type[GPhoto2Profile]]: + profile_classes: list[type[GPhoto2Profile]] = [] + + # Import every module in the profiles package so new profile files are + # discovered automatically without manual registry edits. + import openscan_firmware.controllers.hardware.cameras.gphoto2.profiles as profiles_package + + for module_info in pkgutil.iter_modules(profiles_package.__path__, profiles_package.__name__ + "."): + try: + module = importlib.import_module(module_info.name) + except Exception: + logger.exception("Failed to import gphoto2 profile module '%s'.", module_info.name) + continue + + for _, class_obj in inspect.getmembers(module, inspect.isclass): + if not issubclass(class_obj, GPhoto2Profile): + continue + if class_obj is GPhoto2Profile: + continue + if not getattr(class_obj, "register_in_registry", True): + continue + if class_obj in profile_classes: + continue + profile_classes.append(class_obj) + + return _sorted_profile_classes(profile_classes) + + +def _sorted_profile_classes(profile_classes: list[type[GPhoto2Profile]]) -> list[type[GPhoto2Profile]]: + # Keep generic as final fallback regardless of module filename ordering. + generic_classes: list[type[GPhoto2Profile]] = [] + specific_classes: list[type[GPhoto2Profile]] = [] + + for profile_class in profile_classes: + if profile_class is GenericGPhoto2Profile or getattr(profile_class, "profile_id", "") == "generic": + generic_classes.append(profile_class) + else: + specific_classes.append(profile_class) + + specific_classes.sort(key=lambda cls: f"{cls.__module__}.{cls.__name__}") + if not generic_classes: + generic_classes = [GenericGPhoto2Profile] + return specific_classes + generic_classes + -_PROFILE_CLASSES: list[type[GPhoto2Profile]] = [ - CanonEOS700DProfile, - GenericGPhoto2Profile, -] +_PROFILE_CLASSES: list[type[GPhoto2Profile]] = _iter_profile_classes() def get_profile_for_identity(identity: CameraIdentity) -> GPhoto2Profile: diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/__init__.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/__init__.py index 8a20eb6..4da7bcf 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/__init__.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/__init__.py @@ -2,5 +2,6 @@ from .canon_eos_700d import CanonEOS700DProfile from .generic import GenericGPhoto2Profile +from .nikon_d7100 import NikonD7100Profile -__all__ = ["CanonEOS700DProfile", "GenericGPhoto2Profile"] +__all__ = ["CanonEOS700DProfile", "NikonD7100Profile", "GenericGPhoto2Profile"] diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/canon_eos_700d.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/canon_eos_700d.py index 72ae186..b666c15 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/canon_eos_700d.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/canon_eos_700d.py @@ -7,6 +7,7 @@ from openscan_firmware.config.camera import CameraSettings from ..profile import CameraIdentity +from ..profile_helpers import is_raw_filename, map_gain_to_iso_choice, restore_previous_config_value from .generic import GenericGPhoto2Profile logger = logging.getLogger(__name__) @@ -32,9 +33,9 @@ def matches(self, identity: CameraIdentity) -> bool: def apply_startup_config(self, session, settings: CameraSettings) -> None: # For tethered capture on EOS 700D we prefer Internal RAM. - session.set_first_config_value(self._CAPTURE_TARGET_KEYS, "Internal RAM") - session.set_first_config_value(self._EXPOSURE_MODE_KEYS, "Manual") - session.set_first_config_value(self._FOCUS_MODE_KEYS, "One Shot") + self._set_first(session, self._CAPTURE_TARGET_KEYS, "Internal RAM") + self._set_first(session, self._EXPOSURE_MODE_KEYS, "Manual") + self._set_first(session, self._FOCUS_MODE_KEYS, "One Shot") self.apply_settings(session, settings) def apply_settings(self, session, settings: CameraSettings) -> None: @@ -42,7 +43,7 @@ def apply_settings(self, session, settings: CameraSettings) -> None: iso_value = _map_gain_to_iso_choice(settings.gain) if iso_value is not None: - applied = session.set_first_config_value(self._ISO_KEYS, iso_value) + applied = self._set_first(session, self._ISO_KEYS, iso_value) if not applied: logger.debug("ISO mapping unsupported on this EOS 700D config tree.") @@ -50,26 +51,36 @@ def supports_dng(self) -> bool: return True def capture_dng(self, session): - previous = session.get_first_config_details(self._DNG_KEYS) + previous = self._get_first_details(session, self._DNG_KEYS) previous_value = None if previous is None else previous.get("value") - session.set_first_config_value(self._DNG_KEYS, "RAW") + write_result = session.write_first_config(self._DNG_KEYS, "RAW") + if not write_result.success: + raise RuntimeError( + "Could not set Canon RAW mode " + f"(requested='RAW', attempted_keys={write_result.attempted_keys}, " + f"error={write_result.error!r})." + ) try: - import gphoto2 as gp - - return session.capture_image(gp_file_type=gp.GP_FILE_TYPE_RAW) - except Exception: - logger.debug("RAW file capture path failed; falling back to normal file type.", exc_info=True) - return session.capture_image() + # EOS 700D is more stable with normal file download after forcing + # imageformat=RAW than with GP_FILE_TYPE_RAW. + content, extra = session.capture_image() + capture_name = str(extra.get("capture_name", "")).lower() + if _is_raw_filename(capture_name): + return content, extra + raise RuntimeError( + "Camera returned a non-RAW file while RAW was requested " + f"(capture_name='{capture_name or 'unknown'}')." + ) + except Exception as exc: + raise RuntimeError(f"RAW capture failed on Canon EOS 700D: {exc}") from exc finally: - if previous_value: - session.set_first_config_value(self._DNG_KEYS, previous_value) + restore_previous_config_value(session, self._DNG_KEYS, previous_value) def _map_gain_to_iso_choice(gain: float | None) -> str | None: - if gain is None: - return None # CameraSettings.gain is generic analogue gain; for DSLR map to nearest ISO stop. - target = max(float(gain), 0.0) * 100.0 - iso_choices = [100, 200, 400, 800, 1600, 3200, 6400, 12800] - nearest = min(iso_choices, key=lambda iso: abs(iso - target)) - return str(nearest) + return map_gain_to_iso_choice(gain, [100, 200, 400, 800, 1600, 3200, 6400, 12800]) + + +def _is_raw_filename(name: str) -> bool: + return is_raw_filename(name, (".cr2", ".cr3", ".crw", ".raw", ".dng")) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/generic.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/generic.py index 4cb853c..7aece32 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/generic.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/generic.py @@ -3,21 +3,21 @@ from __future__ import annotations import logging -from fractions import Fraction from openscan_firmware.config.camera import CameraSettings from ..profile import CameraIdentity, GPhoto2Profile +from ..profile_helpers import ( + format_shutter_value_ms, + parse_shutter_choice_seconds, + select_best_shutter_choice, +) logger = logging.getLogger(__name__) def _format_shutter_value_ms(shutter_ms: float) -> str: - seconds = max(shutter_ms / 1000.0, 0.000125) - if seconds >= 1.0: - return f"{seconds:.1f}".rstrip("0").rstrip(".") - reciprocal = round(1.0 / seconds) - return f"1/{max(reciprocal, 1)}" + return format_shutter_value_ms(shutter_ms) class GenericGPhoto2Profile(GPhoto2Profile): @@ -43,49 +43,24 @@ def matches(self, identity: CameraIdentity) -> bool: return True def apply_startup_config(self, session, settings: CameraSettings) -> None: - session.set_first_config_value(self._CAPTURE_TARGET_KEYS, "Memory card") + self._set_first(session, self._CAPTURE_TARGET_KEYS, "Memory card") self.apply_settings(session, settings) def apply_settings(self, session, settings: CameraSettings) -> None: if settings.shutter is not None: shutter_str = self._select_best_shutter_choice(session, settings.shutter) - applied = session.set_first_config_value(self._SHUTTER_KEYS, shutter_str) + applied = self._set_first(session, self._SHUTTER_KEYS, shutter_str) if not applied: logger.debug("No generic shutter config key found on camera.") if settings.jpeg_quality is not None and settings.jpeg_quality >= 85: - session.set_first_config_value(self._JPEG_QUALITY_KEYS, "JPEG Fine") + self._set_first(session, self._JPEG_QUALITY_KEYS, "JPEG Fine") def _select_best_shutter_choice(self, session, shutter_ms: float) -> str: - details = session.get_first_config_details(self._SHUTTER_KEYS) - target_seconds = max(shutter_ms / 1000.0, 0.000125) - if not details or not details.get("choices"): - return _format_shutter_value_ms(shutter_ms) - - best = None - best_err = float("inf") - for choice in details["choices"]: - parsed = _parse_shutter_choice_seconds(str(choice)) - if parsed is None: - continue - err = abs(parsed - target_seconds) - if err < best_err: - best_err = err - best = str(choice) - - return best or _format_shutter_value_ms(shutter_ms) + details = self._get_first_details(session, self._SHUTTER_KEYS) + choices = [] if details is None else list(details.get("choices") or []) + return select_best_shutter_choice(shutter_ms, choices) def _parse_shutter_choice_seconds(value: str) -> float | None: - v = value.strip().lower() - if not v or v == "bulb": - return None - if "/" in v: - try: - return float(Fraction(v)) - except Exception: - return None - try: - return float(v) - except Exception: - return None + return parse_shutter_choice_seconds(value) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/nikon_d7100.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/nikon_d7100.py new file mode 100644 index 0000000..f725852 --- /dev/null +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/nikon_d7100.py @@ -0,0 +1,130 @@ +"""Nikon D7100 specific GPhoto2 profile.""" + +from __future__ import annotations + +import logging +import time + +from openscan_firmware.config.camera import CameraSettings + +from ..profile import CameraIdentity +from ..profile_helpers import ( + capture_with_route_fallbacks, + is_raw_filename, + map_gain_to_iso_choice, + pick_raw_choice_from_details, + restore_previous_config_value, +) +from .generic import GenericGPhoto2Profile + +logger = logging.getLogger(__name__) + + +class NikonD7100Profile(GenericGPhoto2Profile): + """Nikon D7100 tuning on top of generic DSLR behavior.""" + + profile_id = "nikon_d7100" + + _MODEL_MARKERS = ("nikon dsc d7100", "nikon d7100") + _CAPTURE_TARGET_KEYS = ["/main/settings/capturetarget", "capturetarget"] + _RECORDING_MEDIA_KEYS = ["/main/settings/recordingmedia", "recordingmedia"] + _APPLICATION_MODE_KEYS = ["/main/other/applicationmode", "applicationmode"] + _SHUTTER_KEYS = ["/main/capturesettings/shutterspeed", "/main/settings/shutterspeed", "shutterspeed"] + _JPEG_QUALITY_KEYS = ["/main/imgsettings/imagequality", "/main/imgsettings/imageformat", "imagequality", "imageformat"] + _DNG_KEYS = ["/main/imgsettings/imagequality", "/main/imgsettings/imageformat", "imagequality", "imageformat"] + _ISO_KEYS = ["/main/imgsettings/iso", "/main/capturesettings/iso", "iso"] + + def matches(self, identity: CameraIdentity) -> bool: + model = (identity.model or "").strip().lower() + return any(marker in model for marker in self._MODEL_MARKERS) + + def apply_startup_config(self, session, settings: CameraSettings) -> None: + # Keep startup conservative and prefer the camera's normal card-backed routing. + super().apply_startup_config(session, settings) + self._set_first(session, self._CAPTURE_TARGET_KEYS, "Memory card") + self._set_first(session, self._RECORDING_MEDIA_KEYS, "Card") + + def apply_settings(self, session, settings: CameraSettings) -> None: + super().apply_settings(session, settings) + iso_value = _map_gain_to_iso_choice(settings.gain) + if iso_value is not None: + applied = self._set_first(session, self._ISO_KEYS, iso_value) + if not applied: + logger.debug("ISO mapping unsupported on Nikon D7100 config tree.") + + def supports_dng(self) -> bool: + return True + + def capture_dng(self, session): + previous = self._get_first_details(session, self._DNG_KEYS) + previous_value = None if previous is None else previous.get("value") + + raw_choice = _pick_nikon_raw_choice(previous) + write_result = session.write_first_config(self._DNG_KEYS, raw_choice) + if not write_result.success: + raise RuntimeError( + "Could not set Nikon RAW mode " + f"(requested choice='{raw_choice}', attempted_keys={write_result.attempted_keys}, " + f"error={write_result.error!r})." + ) + + try: + # Nikon bodies can need a short settling delay after mode switch. + time.sleep(0.12) + content, extra, diagnostics = capture_with_route_fallbacks( + session=session, + routes=_capture_routes(), + capture_route_applier=_apply_capture_route, + raw_filename_checker=_is_raw_filename, + ) + extra.update(diagnostics) + return content, extra + except Exception as exc: + logger.exception("RAW capture failed in Nikon D7100 profile.") + raise RuntimeError(f"RAW capture failed on Nikon D7100: {exc}") from exc + finally: + restore_previous_config_value(session, self._DNG_KEYS, previous_value) + + +def _pick_nikon_raw_choice(details: dict | None) -> str: + return pick_raw_choice_from_details(details, markers=("raw", "nef")) + + +def _capture_routes() -> list[dict[str, str]]: + # Try the camera's current routing first, then explicit card-backed capture, + # and only fall back to the older remote/RAM mode last. + return [ + {}, + { + "capturetarget": "Memory card", + "recordingmedia": "Card", + "applicationmode": "Application Mode 0", + }, + { + "capturetarget": "Internal RAM", + "recordingmedia": "SDRAM", + "applicationmode": "Application Mode 1", + }, + ] + + +def _apply_capture_route(session, route: dict[str, str]) -> None: + capturetarget = route.get("capturetarget") + if capturetarget: + session.write_first_config(NikonD7100Profile._CAPTURE_TARGET_KEYS, capturetarget) + + recordingmedia = route.get("recordingmedia") + if recordingmedia: + session.write_first_config(NikonD7100Profile._RECORDING_MEDIA_KEYS, recordingmedia) + + applicationmode = route.get("applicationmode") + if applicationmode: + session.write_first_config(NikonD7100Profile._APPLICATION_MODE_KEYS, applicationmode) + + +def _map_gain_to_iso_choice(gain: float | None) -> str | None: + return map_gain_to_iso_choice(gain, [100, 200, 400, 800, 1600, 3200, 6400]) + + +def _is_raw_filename(name: str) -> bool: + return is_raw_filename(name, (".nef", ".nrw", ".raw", ".dng", ".tif", ".tiff")) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/template_camera.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/template_camera.py new file mode 100644 index 0000000..b334cb3 --- /dev/null +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/profiles/template_camera.py @@ -0,0 +1,60 @@ +"""Template profile for adding a new gphoto2-compatible camera.""" + +from __future__ import annotations + +from openscan_firmware.config.camera import CameraSettings + +from ..profile import CameraIdentity +from ..profile_helpers import map_gain_to_iso_choice, restore_previous_config_value +from .generic import GenericGPhoto2Profile + + +class TemplateCameraProfile(GenericGPhoto2Profile): + """Copy this class and replace values for your own camera model.""" + + profile_id = "template_camera" + register_in_registry = False + + # 1) Model markers: use lowercase fragments from `gphoto2 --auto-detect`. + _MODEL_MARKERS = ("replace with model marker",) + + # 2) Key lists: inspect keys with `gphoto2 --list-config`. + _CAPTURE_TARGET_KEYS = ["/main/settings/capturetarget", "capturetarget"] + _SHUTTER_KEYS = ["/main/capturesettings/shutterspeed", "shutterspeed"] + _JPEG_QUALITY_KEYS = ["/main/imgsettings/imagequality", "imagequality"] + _ISO_KEYS = ["/main/imgsettings/iso", "iso"] + _RAW_FORMAT_KEYS = ["/main/imgsettings/imageformat", "imageformat"] + + def matches(self, identity: CameraIdentity) -> bool: + model = (identity.model or "").strip().lower() + return any(marker in model for marker in self._MODEL_MARKERS) + + def apply_startup_config(self, session, settings: CameraSettings) -> None: + # 3) Startup defaults: configure stable tethered behavior first. + self._set_first(session, self._CAPTURE_TARGET_KEYS, "Memory card") + self.apply_settings(session, settings) + + def apply_settings(self, session, settings: CameraSettings) -> None: + # 4) Runtime settings: keep mapping logic explicit and readable. + super().apply_settings(session, settings) + iso_value = map_gain_to_iso_choice(settings.gain, [100, 200, 400, 800, 1600, 3200]) + if iso_value is not None: + self._set_first(session, self._ISO_KEYS, iso_value) + + def supports_dng(self) -> bool: + return True + + def capture_dng(self, session): + # 5) RAW capture: only override if generic capture is not enough. + previous = self._get_first_details(session, self._RAW_FORMAT_KEYS) + previous_value = None if previous is None else previous.get("value") + write_result = session.write_first_config(self._RAW_FORMAT_KEYS, "RAW") + if not write_result.success: + raise RuntimeError( + f"Could not set RAW mode (attempted_keys={write_result.attempted_keys}, " + f"error={write_result.error!r})." + ) + try: + return session.capture_image() + finally: + restore_previous_config_value(session, self._RAW_FORMAT_KEYS, previous_value) diff --git a/openscan_firmware/controllers/hardware/cameras/gphoto2/session.py b/openscan_firmware/controllers/hardware/cameras/gphoto2/session.py index cfbeea8..bdaa4ca 100644 --- a/openscan_firmware/controllers/hardware/cameras/gphoto2/session.py +++ b/openscan_firmware/controllers/hardware/cameras/gphoto2/session.py @@ -4,6 +4,7 @@ import logging import time +from dataclasses import dataclass, field from typing import Any import gphoto2 as gp @@ -13,6 +14,29 @@ logger = logging.getLogger(__name__) +@dataclass(frozen=True) +class ConfigReadResult: + requested_key: str + matched_key: str | None + success: bool + value: Any | None = None + details: dict[str, Any] | None = None + choices: list[Any] = field(default_factory=list) + error: str | None = None + attempted_keys: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class ConfigWriteResult: + requested_key: str + requested_value: Any + matched_key: str | None + actual_value: Any | None + success: bool + error: str | None = None + attempted_keys: list[str] = field(default_factory=list) + + class GPhoto2Session: """Manage a gphoto2 camera session for one physical device.""" @@ -85,7 +109,20 @@ def capture_preview(self) -> bytes: def capture_image(self, gp_file_type: int = gp.GP_FILE_TYPE_NORMAL) -> tuple[bytes, dict[str, Any]]: camera = self.ensure_connected() - file_path = camera.capture(gp.GP_CAPTURE_IMAGE) + try: + file_path = camera.capture(gp.GP_CAPTURE_IMAGE) + except Exception as exc: + message = str(exc) + # Nikon (and some other DSLRs) can fail with "Unspecified error" + # on camera.capture(), but succeed via trigger + event polling. + if "Unspecified error" in message or "[-1]" in message: + logger.debug( + "camera.capture failed with '%s'; trying trigger-capture fallback.", + message, + ) + file_path = self._trigger_capture_and_wait_for_file(camera) + else: + raise camera_file = camera.file_get(file_path.folder, file_path.name, gp_file_type) payload = bytes(camera_file.get_data_and_size()) metadata = { @@ -95,31 +132,82 @@ def capture_image(self, gp_file_type: int = gp.GP_FILE_TYPE_NORMAL) -> tuple[byt } return payload, metadata - def set_config_value(self, key: str, value: Any) -> bool: + def trigger_capture_and_wait_for_file(self, timeout_s: float = 12.0): + camera = self.ensure_connected() + return self._trigger_capture_and_wait_for_file(camera, timeout_s=timeout_s) + + def _trigger_capture_and_wait_for_file(self, camera: Any, timeout_s: float = 12.0): + start = time.monotonic() + + if hasattr(camera, "trigger_capture"): + camera.trigger_capture() + else: + gp.gp_camera_trigger_capture(camera) + + while time.monotonic() - start < timeout_s: + event_type, event_data = camera.wait_for_event(1000) + if event_type == gp.GP_EVENT_FILE_ADDED and event_data is not None: + return event_data + if event_type == gp.GP_EVENT_TIMEOUT: + continue + if event_type == gp.GP_EVENT_UNKNOWN: + continue + + raise RuntimeError("Trigger-capture fallback timed out waiting for GP_EVENT_FILE_ADDED.") + + def write_config(self, key: str, value: Any) -> ConfigWriteResult: camera = self.ensure_connected() - config = self._get_config_with_retry(camera, key_context=key) + config, config_error = self._get_config_with_retry(camera, key_context=key) if config is None: - return False + return ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=None, + actual_value=None, + success=False, + error=config_error or f"Failed to read config tree for key '{key}'.", + attempted_keys=[key], + ) + child = self._find_widget(config, key) if child is None: - return False - # Normalize enum-like choices to avoid trivial casing mismatches. - choices = self._extract_choices(child) - if choices: - selected = self._match_choice(choices, value) - else: - selected = value + return ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=None, + actual_value=None, + success=False, + error=f"Config key '{key}' was not found.", + attempted_keys=[key], + ) + choices = self._extract_choices(child) + selected = self._match_choice(choices, value) if choices else value current = self._safe_call(child, "get_value") if current is not None and str(current) == str(selected): - return True + return ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=key, + actual_value=current, + success=True, + attempted_keys=[key], + ) try: child.set_value(selected) camera.set_config(config) except Exception as exc: logger.debug("Setting config '%s' to '%s' failed: %s", key, selected, exc) - return False + return ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=key, + actual_value=current, + success=False, + error=f"Writing key '{key}' failed: {exc}", + attempted_keys=[key], + ) verified = self._safe_call(child, "get_value") if verified is not None and str(verified) != str(selected): @@ -129,26 +217,91 @@ def set_config_value(self, key: str, value: Any) -> bool: selected, verified, ) - return False - return True + return ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=key, + actual_value=verified, + success=False, + error=( + f"Config key '{key}' did not persist expected value " + f"(requested={selected} actual={verified})." + ), + attempted_keys=[key], + ) - def set_first_config_value(self, keys: list[str], value: Any) -> bool: + return ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=key, + actual_value=verified if verified is not None else selected, + success=True, + attempted_keys=[key], + ) + + def write_first_config(self, keys: list[str], value: Any) -> ConfigWriteResult: + last_result: ConfigWriteResult | None = None for key in keys: try: - if self.set_config_value(key, value): - return True - except Exception: + result = self.write_config(key, value) + except Exception as exc: logger.debug("Setting config '%s' failed.", key, exc_info=True) - return False - - def get_config_details(self, key: str) -> dict[str, Any] | None: + result = ConfigWriteResult( + requested_key=key, + requested_value=value, + matched_key=None, + actual_value=None, + success=False, + error=f"Writing key '{key}' raised an exception: {exc}", + attempted_keys=[key], + ) + if result.success: + return ConfigWriteResult( + requested_key=keys[0] if keys else key, + requested_value=value, + matched_key=result.matched_key, + actual_value=result.actual_value, + success=True, + attempted_keys=list(keys), + ) + last_result = result + + error = ( + last_result.error + if last_result is not None and last_result.error + else "No provided config key accepted the requested value." + ) + return ConfigWriteResult( + requested_key=keys[0] if keys else "", + requested_value=value, + matched_key=None, + actual_value=None, + success=False, + error=error, + attempted_keys=list(keys), + ) + + def read_config(self, key: str) -> ConfigReadResult: camera = self.ensure_connected() - config = self._get_config_with_retry(camera, key_context=key) + config, config_error = self._get_config_with_retry(camera, key_context=key) if config is None: - return None + return ConfigReadResult( + requested_key=key, + matched_key=None, + success=False, + error=config_error or f"Failed to read config tree for key '{key}'.", + attempted_keys=[key], + ) + child = self._find_widget(config, key) if child is None: - return None + return ConfigReadResult( + requested_key=key, + matched_key=None, + success=False, + error=f"Config key '{key}' was not found.", + attempted_keys=[key], + ) details: dict[str, Any] = { "key": key, @@ -159,32 +312,70 @@ def get_config_details(self, key: str) -> dict[str, Any] | None: "value": self._safe_call(child, "get_value"), "choices": self._extract_choices(child), } - return details - - def get_first_config_details(self, keys: list[str]) -> dict[str, Any] | None: + return ConfigReadResult( + requested_key=key, + matched_key=key, + success=True, + value=details.get("value"), + details=details, + choices=list(details.get("choices") or []), + attempted_keys=[key], + ) + + def read_first_config(self, keys: list[str]) -> ConfigReadResult: + last_result: ConfigReadResult | None = None for key in keys: try: - details = self.get_config_details(key) - except Exception: + result = self.read_config(key) + except Exception as exc: logger.debug("Reading config '%s' failed.", key) - continue - if details is not None: - return details - return None - - def _get_config_with_retry(self, camera: Any, key_context: str) -> Any | None: + result = ConfigReadResult( + requested_key=key, + matched_key=None, + success=False, + error=f"Reading key '{key}' raised an exception: {exc}", + attempted_keys=[key], + ) + if result.success: + return ConfigReadResult( + requested_key=keys[0] if keys else key, + matched_key=result.matched_key, + success=True, + value=result.value, + details=result.details, + choices=result.choices, + attempted_keys=list(keys), + ) + last_result = result + + error = ( + last_result.error + if last_result is not None and last_result.error + else "None of the provided config keys were readable." + ) + return ConfigReadResult( + requested_key=keys[0] if keys else "", + matched_key=None, + success=False, + error=error, + attempted_keys=list(keys), + ) + + def _get_config_with_retry(self, camera: Any, key_context: str) -> tuple[Any | None, str | None]: + last_error: str | None = None for attempt in range(self._io_retry_attempts): try: - return camera.get_config() + return camera.get_config(), None except Exception as exc: message = str(exc) is_io_in_progress = "I/O in progress" in message or "[-110]" in message if is_io_in_progress and attempt < self._io_retry_attempts - 1: time.sleep(self._io_retry_delay_s) continue - logger.debug("Reading config '%s' failed: %s", key_context, exc) - return None - return None + last_error = f"Reading config '{key_context}' failed: {exc}" + logger.debug(last_error) + return None, last_error + return None, last_error or f"Reading config '{key_context}' failed." @staticmethod def _find_widget(config_root: Any, key: str) -> Any | None: diff --git a/openscan_firmware/controllers/services/cloud.py b/openscan_firmware/controllers/services/cloud.py index 28230b4..fb3ed48 100644 --- a/openscan_firmware/controllers/services/cloud.py +++ b/openscan_firmware/controllers/services/cloud.py @@ -23,7 +23,7 @@ REQUEST_TIMEOUT = 60 ALLOWED_PHOTO_SUFFIXES = {".jpg", ".jpeg"} -UNSUPPORTED_PHOTO_SUFFIXES = {".png", ".dng", ".npy"} +UNSUPPORTED_PHOTO_SUFFIXES = {".png", ".dng", ".raw", ".cr2", ".cr3", ".crw", ".npy"} class CloudServiceError(RuntimeError): @@ -484,4 +484,4 @@ def _count_project_photos(project: Project) -> int: def _iter_chunks(file_obj: BinaryIO, chunk_size: int) -> Iterator[io.BytesIO]: file_obj.seek(0) while chunk := file_obj.read(chunk_size): - yield io.BytesIO(chunk) \ No newline at end of file + yield io.BytesIO(chunk) diff --git a/openscan_firmware/controllers/services/projects.py b/openscan_firmware/controllers/services/projects.py index a7488ce..6c7937a 100644 --- a/openscan_firmware/controllers/services/projects.py +++ b/openscan_firmware/controllers/services/projects.py @@ -191,6 +191,7 @@ async def _save_photo_async(photo_data: PhotoData, photo_path: str) -> str: handlers = { "jpeg": (_save_photo_jpeg, ".jpg"), "dng": (_save_photo_dng, ".dng"), + "raw": (_save_photo_dng, _raw_extension_from_metadata(photo_data)), "rgb_array": (_save_photo_rgb, ".npy"), "yuv_array": (_save_photo_yuv, ".npy"), } @@ -205,6 +206,15 @@ async def _save_photo_async(photo_data: PhotoData, photo_path: str) -> str: logger.info("Saved %s to %s", photo_data.format, final_path) return final_path + +def _raw_extension_from_metadata(photo_data: PhotoData) -> str: + raw_metadata = photo_data.camera_metadata.raw_metadata if photo_data.camera_metadata else {} + capture_name = str(raw_metadata.get("capture_name", "")).lower() + for ext in (".cr2", ".cr3", ".crw", ".dng", ".raw"): + if capture_name.endswith(ext): + return ext + return ".raw" + async def _save_photo_jpeg(photo_data: PhotoData, file_path: str): """Save a JPEG photo to a file. @@ -879,4 +889,4 @@ def get_project_manager(path: Optional[pathlib.PurePath] = None) -> ProjectManag raise RuntimeError( "ProjectManager is already initialized with a different path. " f"Current: '{current_manager_path}', Requested: '{resolved_path}'" - ) \ No newline at end of file + ) diff --git a/openscan_firmware/models/camera.py b/openscan_firmware/models/camera.py index 77b7d22..a22da4c 100644 --- a/openscan_firmware/models/camera.py +++ b/openscan_firmware/models/camera.py @@ -39,7 +39,7 @@ class PhotoData(BaseModel): ..., description="Image data (JPEG/DNG) or as numpy array" ) - format: Literal['jpeg','dng','rgb_array', 'yuv_array'] + format: Literal['jpeg', 'raw', 'dng', 'rgb_array', 'yuv_array'] camera_metadata: CameraMetadata scan_metadata: Optional[ScanMetadata] = None diff --git a/openscan_firmware/routers/next/cameras.py b/openscan_firmware/routers/next/cameras.py index 51ffebf..e68b59c 100644 --- a/openscan_firmware/routers/next/cameras.py +++ b/openscan_firmware/routers/next/cameras.py @@ -1,9 +1,11 @@ import asyncio import io +import logging import time from dataclasses import dataclass from threading import Lock from typing import Literal, Optional +from urllib.parse import quote, urlsplit, urlunsplit from uuid import uuid4 import numpy as np @@ -28,7 +30,9 @@ responses={404: {"description": "Not found"}}, ) -PhotoFormat = Literal["jpeg", "dng", "rgb_array", "yuv_array"] +logger = logging.getLogger(__name__) + +PhotoFormat = Literal["jpeg", "raw", "dng", "rgb_array", "yuv_array"] _PAYLOAD_TTL_SECONDS = 90 _MAX_PAYLOAD_CACHE_ENTRIES = 8 _MAX_PAYLOAD_CACHE_BYTES = 256 * 1024 * 1024 @@ -90,16 +94,15 @@ def _serialize_photo_payload(photo: PhotoData) -> tuple[bytes, str, str]: if photo.format == "jpeg": media_type = "image/jpeg" filename = "photo.jpg" - elif photo.format == "dng": - media_type = "image/x-adobe-dng" - filename = "photo.dng" + elif photo.format in ("raw", "dng"): + media_type, filename = _infer_raw_file_info(photo) elif photo.format in ("rgb_array", "yuv_array"): media_type = "application/x-npy" filename = f"photo_{photo.format}.npy" else: raise ValueError(f"Unsupported photo format: {photo.format}") - if photo.format in ("jpeg", "dng"): + if photo.format in ("jpeg", "raw", "dng"): if isinstance(photo.data, io.BytesIO): content = photo.data.getvalue() elif isinstance(photo.data, (bytes, bytearray)): @@ -119,6 +122,28 @@ def _serialize_photo_payload(photo: PhotoData) -> tuple[bytes, str, str]: return content, media_type, filename +def _infer_raw_file_info(photo: PhotoData) -> tuple[str, str]: + raw_metadata = photo.camera_metadata.raw_metadata if photo.camera_metadata else {} + capture_name = str(raw_metadata.get("capture_name", "")).lower() + + if capture_name.endswith(".cr2"): + return "image/x-canon-cr2", "photo.cr2" + if capture_name.endswith(".cr3"): + return "image/x-canon-cr3", "photo.cr3" + if capture_name.endswith(".crw"): + return "image/x-canon-crw", "photo.crw" + if capture_name.endswith(".dng"): + return "image/x-adobe-dng", "photo.dng" + if capture_name.endswith(".raw"): + return "application/octet-stream", "photo.raw" + + # Legacy fallback for controllers that still report dng without capture_name. + if photo.format == "dng": + return "image/x-adobe-dng", "photo.dng" + + return "application/octet-stream", "photo.raw" + + def _store_photo_payload( camera_name: str, content: bytes, @@ -142,6 +167,12 @@ def _store_photo_payload( return payload_id, _PAYLOAD_TTL_SECONDS +def _encode_url_path(url: str) -> str: + split = urlsplit(url) + encoded_path = quote(split.path, safe="/") + return urlunsplit((split.scheme, split.netloc, encoded_path, split.query, split.fragment)) + + def _get_cached_photo_payload(camera_name: str, payload_id: str) -> _CachedPhotoPayload: now_monotonic = time.monotonic() with _photo_payload_cache_lock: @@ -280,10 +311,13 @@ async def get_photo( try: photo = await controller.photo_async(image_format=image_format) except ValueError as exc: + logger.warning("Photo request failed for camera '%s' (bad request): %s", camera_name, exc) raise HTTPException(status_code=400, detail=str(exc)) from exc except RuntimeError as exc: + logger.warning("Photo request failed for camera '%s' (runtime): %s", camera_name, exc) raise HTTPException(status_code=503, detail=str(exc)) from exc except Exception as exc: + logger.exception("Photo request failed for camera '%s' (unexpected error).", camera_name) raise HTTPException(status_code=500, detail=str(exc)) from exc try: @@ -300,11 +334,13 @@ async def get_photo( media_type=media_type, filename=filename, ) - payload_url = str( - request.url_for( - "get_photo_payload", - camera_name=camera_name, - payload_id=payload_id, + payload_url = _encode_url_path( + str( + request.url_for( + "get_photo_payload", + camera_name=camera_name, + payload_id=payload_id, + ) ) ) return PhotoMetadataResponse( diff --git a/openscan_firmware/routers/next/develop.py b/openscan_firmware/routers/next/develop.py index 4997038..57d9862 100644 --- a/openscan_firmware/routers/next/develop.py +++ b/openscan_firmware/routers/next/develop.py @@ -14,7 +14,9 @@ from fastapi import APIRouter, HTTPException, status, Response, Query from fastapi.responses import PlainTextResponse +from openscan_firmware.controllers.hardware.cameras.camera import get_all_camera_controllers from openscan_firmware.controllers.services.tasks.task_manager import get_task_manager +from openscan_firmware.models.camera import CameraType from openscan_firmware.models.task import TaskStatus, Task from openscan_firmware.models.paths import PolarPoint3D @@ -145,10 +147,52 @@ def _collect_gphoto2_diagnostics() -> dict: "cameras": [], } + gphoto2_controllers = [] + for controller in get_all_camera_controllers().values(): + camera_model = getattr(controller, "camera", None) + if camera_model is None: + continue + if getattr(camera_model, "type", None) != CameraType.GPHOTO2: + continue + gphoto2_controllers.append(controller) + + def _find_active_controller(model: str | None, path: str | None): + for ctrl in gphoto2_controllers: + cam = getattr(ctrl, "camera", None) + if cam is None: + continue + if path and getattr(cam, "path", None) == path: + return ctrl + if model and getattr(cam, "name", None) == model: + return ctrl + return None + cameras: list[dict] = [] for row in rows: model = row.get("model") path = row.get("path") + active_controller = _find_active_controller(model, path) + if active_controller is not None: + get_diag = getattr(active_controller, "get_diagnostics", None) + if callable(get_diag): + try: + cameras.append(get_diag()) + continue + except Exception as exc: + cameras.append( + { + "model": model, + "path": path, + "summary": None, + "about": None, + "config_groups": [], + "relevant_config": [], + "in_use_by_openscan": True, + "error": f"controller diagnostics failed: {exc}", + } + ) + continue + camera_diag = { "model": model, "path": path, @@ -156,6 +200,7 @@ def _collect_gphoto2_diagnostics() -> dict: "about": None, "config_groups": [], "relevant_config": [], + "in_use_by_openscan": False, "error": None, } camera = None diff --git a/tests/routers/test_next_cameras_router.py b/tests/routers/test_next_cameras_router.py index 9bbfe4c..6516f74 100644 --- a/tests/routers/test_next_cameras_router.py +++ b/tests/routers/test_next_cameras_router.py @@ -261,6 +261,58 @@ async def test_get_photo_with_metadata_returns_payload_url_for_dng( assert payload_response.content == b"dng-bytes" +@pytest.mark.asyncio +async def test_get_photo_with_metadata_returns_payload_url_for_raw_cr2( + monkeypatch, + cameras_client, + cameras_router_path, +): + module_path = cameras_router_path("cameras") + photo = _make_photo_data(io.BytesIO(b"raw-bytes"), "raw") + photo.camera_metadata.raw_metadata["capture_name"] = "IMG_0001.CR2" + controller = _FakeCameraController(photo) + monkeypatch.setattr(f"{module_path}.get_camera_controller", lambda _name: controller) + + response = await cameras_client.get( + "/next/cameras/cam0/photo", + params={"image_format": "raw", "with_metadata": "true"}, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["format"] == "raw" + assert payload["media_type"] == "image/x-canon-cr2" + assert payload["filename"] == "photo.cr2" + assert controller.requested_formats == ["raw"] + + payload_response = await cameras_client.get(payload["payload_url"]) + assert payload_response.status_code == 200 + assert payload_response.headers["content-type"] == "image/x-canon-cr2" + assert payload_response.content == b"raw-bytes" + + +@pytest.mark.asyncio +async def test_get_photo_with_metadata_encodes_camera_name_in_payload_url( + monkeypatch, + cameras_client, + cameras_router_path, +): + module_path = cameras_router_path("cameras") + photo = _make_photo_data(io.BytesIO(b"jpeg-bytes"), "jpeg") + controller = _FakeCameraController(photo) + monkeypatch.setattr(f"{module_path}.get_camera_controller", lambda _name: controller) + + response = await cameras_client.get( + "/next/cameras/Canon%20EOS%20700D/photo", + params={"image_format": "jpeg", "with_metadata": "true"}, + ) + + assert response.status_code == 200 + payload = response.json() + assert "Canon%20EOS%20700D" in payload["payload_url"] + assert "Canon EOS 700D" not in payload["payload_url"] + + @pytest.mark.asyncio async def test_get_photo_rgb_array_returns_npy_payload(monkeypatch, cameras_client, cameras_router_path): module_path = cameras_router_path("cameras")