diff --git a/docs/source/03_contributing/01_contributing_guidelines/01_contributing_guidelines.rst b/docs/source/03_contributing/01_contributing_guidelines/01_contributing_guidelines.rst index 6163fd334..afb133d7a 100644 --- a/docs/source/03_contributing/01_contributing_guidelines/01_contributing_guidelines.rst +++ b/docs/source/03_contributing/01_contributing_guidelines/01_contributing_guidelines.rst @@ -1,12 +1,32 @@ .. _contributing_guidelines: -======================= +************************ Contributing Guidelines +************************ + +.. contents:: Table of Contents + :depth: 3 + +General Overview ======================= -We welcome contributions in the form of bug reports, bug fixes, new features and documentation. If you are contributing code, please create it in a fork and branch separate from the main ``develop`` branch and then make a pull request to the ``develop`` branch for code review. Some best practices for new code are outlined below. +We welcome contributions in the form of bug reports, bug fixes, new features and documentation. Some best practices for contributing code are outlined below. If you are considering refactoring the code, please reach out to us prior to starting this process. -If you are considering refactoring part of the code, please reach out to us prior to starting this process. We are happy to invite you to our regular software development meeting. +------------------- + +Project Philosophy +================== + +**navigate** is designed with the following principles in mind: + +* Prioritize standard library imports for maximum stability, and minimize external dependencies. +* Abstraction layer to drive different camera types, etc. +* Plugin architecture for extensibility. +* Maximize productivity for biological users through robust graphical user interface-based workflows. +* Performant and responsive. +* Brutally obvious, well-documented, clean code organized in an industry standard Model-View-Controller architecture. + +We ask that all contributions adhere to these principles. ------------------- @@ -17,45 +37,64 @@ General Principles - Please do not create new configuration variables unless absolutely necessary, especially in the ``configuration.yaml`` and ``experiment.yaml`` files. A new variable is necessary only if no variable stores similar information or there is no way to use the most similar variable without disrupting another part of the code base. - We are happy to discuss code refactors for improved clarity and speed. However, please do not modify something that is already working without discussing this with the software team in advance. - All code that modifies microscope control behavior must be reviewed and tested on a live system prior to merging into the ``develop`` branch. +- Scientific Units - Please express quantities in the following units when they are in the standard model/view/controller code. Deviations from this can occur where it is necessary to pass a different unit to a piece of hardware. -------------------- + * Time - Milliseconds + * Distance - Micrometers + * Voltage - Volts + * Rotation - Degrees -Coding Style -============ - -We follow the `PEP8 code style guide `_. All class names are written in ``CamelCase`` and all variable names are ``lowercase_and_separated_by_underscores``. ------------------- -Communicating with Hardware -=========================== +Getting Started +============== -In handling hardware devices, such as Sutter's MP-285A stage, using threads can introduce complexities, especially when simultaneous read and write operations occur over a shared resource like a serial line. An encountered issue demonstrated the challenges when two different threads attempted to write to and read from the same serial port simultaneously. This action led to data corruption due to interleaving of read/write calls that require precise handshaking, characteristic of the MP-285A's communication protocol. The solution involved implementing a blocking mechanism using `threading.Event()` to ensure that operations on the serial port do not overlap, showcasing the difficulties of multithreading sequential processes. To mitigate such issues, a design where each hardware device operates within its own dedicated thread is advisable. This approach simplifies the management of device communications by enforcing sequential execution, eliminating the need to handle complex concurrency issues inherent in multithreading environments. This strategy ensures robust and error-free interaction with hardware devices. +1. **Fork and Clone**: Fork the repository and clone it locally +2. **Set Up Environment**: `pip install -e .[dev]` to install in development mode +3. **Install Pre-commit Hooks**: `pre-commit install` to set up linting hooks +4. **Run Tests**: Ensure `pytest` passes before making changes ------------------- -Documentation -============= +Pull Request Process +=================== + +1. Create a branch from `develop` with a descriptive name +2. Make your changes following the guidelines in this document +3. Add tests for new functionality +4. Ensure all tests pass and linting is clean +5. Update documentation as needed +6. Submit PR to the `develop` branch with clear description of changes -We use `Sphinx `_ to generate documentation from documented methods, attributes, and classes. Please document all new methods, attributes, and classes using a Sphinx compatible version of `Numpydoc `_. ------------------- -Scientific Units -================ +Coding Style +============ + +Naming Conventions +------------------ +We follow the `PEP8 code style guide `_. All class names are written in ``CamelCase`` and all variable names are ``lowercase_and_separated_by_underscores``. -Please express quantities in the following units when they are in the standard model/view/controller code. Deviations from this can occur where it is necessary to pass a different unit to a piece of hardware. +Type Hints +---------------- +Type hints are used throughout the code base. Please add type hints to any new methods or functions you create. If you are unsure how to do this, please see `PEP 484 `_ for more information. -* Time - Milliseconds -* Distance - Micrometers -* Voltage - Volts -* Rotation - Degrees +Numpydoc +---------------- +We use `Numpydoc `_ style docstrings throughout the code base. Please use this style for any new methods or functions you create. -------------------- +Sphinx +---------------- +We use `Sphinx `_ to generate documentation from documented methods, attributes, and classes. Please document all new methods, attributes, and classes using a Sphinx compatible version of `Numpydoc `_. -Pre-Commit Hooks -================ +Linters +---------------- +We use `Ruff `_ to enforce consistent code formatting. Please run Ruff on your code before making a pull request. Ideally, these actions should be integrated as part of a pre-commit hook (see below). +Pre-commit Hooks +---------------- We use `pre-commit hooks `_ to enforce consistent code formatting and automate some of the code review process. In some rare cases, the linter may complain about a line of code that is actually fine. For example, in the example code below, Ruff linter complains that the start_stage class is imported but not used. However, it is actually used in as part of an ``exec`` statement. .. code-block:: python @@ -70,11 +109,12 @@ To avoid this error, add a ``# noqa`` comment to the end of the line to tell Ruf from navigate.model.device_startup_functions import start_stage # noqa -------------------- +Unit Tests +---------------- +Each line of code is unit tested to ensure it behaves appropriately and alert future coders to modifications that break expected functionality. Guidelines for writing good unit tests can be found `here `_ and `over here `_, or in examples of unit tests in this repository's ``test`` folder. We use the `pytest library `_ to evaluate unit tests. Please check that unit tests pass on your machine before making a pull request. Dictionary Parsing -================== - +------------------ The :ref:`configuration file ` is loaded as a large dictionary object, and it is easy to create small errors in the dictionary that can crash the program. To avoid this, when getting properties from the configuration dictionary, it is best to use the ``.get()`` command, which provides you with the opportunity to also have a default value should the key provided not be found. For example, .. code-block:: python @@ -86,35 +126,47 @@ Here, we try to retrieve the ``waveform`` key from a the ``self.device_config`` ------------------- -Unit Tests -========== +Communicating with Hardware +=========================== -Each line of code is unit tested to ensure it behaves appropriately and alert future coders to modifications that break expected functionality. Guidelines for writing good unit tests can be found `here `_ and `over here `_, or in examples of unit tests in this repository's ``test`` folder. We use the `pytest library `_ to evaluate unit tests. Please check that unit tests pass on your machine before making a pull request. +Threads and Blocking +--------------------------- +In handling hardware devices, such as Sutter's MP-285A stage, using threads can introduce complexities, especially when simultaneous read and write operations occur over a shared resource like a serial line. An encountered issue demonstrated the challenges when two different threads attempted to write to and read from the same serial port simultaneously. This action led to data corruption due to interleaving of read/write calls that require precise handshaking, characteristic of the MP-285A's communication protocol. The solution involved implementing a blocking mechanism using `threading.Event()` to ensure that operations on the serial port do not overlap, showcasing the difficulties of multithreading sequential processes. To mitigate such issues, a design where each hardware device operates within its own dedicated thread is advisable. This approach simplifies the management of device communications by enforcing sequential execution, eliminating the need to handle complex concurrency issues inherent in multithreading environments. This strategy ensures robust and error-free interaction with hardware devices. -------------------- -Developing with a Mac -===================== +Dedicated Device Interfaces +--------------------------- -Many of us have Apple products and use them for development. However, there are some issues that you may encounter when developing on a Mac. Below are some of the issues we have encountered and how to resolve them. +**navigate** implements a robust hardware abstraction layer through dedicated device interfaces. When integrating new hardware devices: -------------------- +* Each hardware device type (cameras, stages, etc.) has its own dedicated interface that must be implemented +* All hardware classes inherit from a base class specific to the device type +* Base classes include AbstractMethods that define the required interface for any derived hardware class +* These abstract methods clearly communicate which functions must be implemented for any new hardware +* Failure to override these abstract methods in derived classes will result in runtime errors + +This architecture ensures consistency across different hardware implementations while providing clear guidance for developers adding support for new devices. When adding support for a new hardware device, first identify the appropriate base class and ensure you implement all required abstract methods. + + + + +.. Developing with a Mac +.. ===================== -Shared memory limits -^^^^^^^^^^^^^^^^^^^^ +.. Many of us have Apple products and use them for development. However, there are some issues that you may encounter when developing on a Mac. Below are some of the issues we have encountered and how to resolve them. -.. code-block:: console +.. .. code-block:: console - OSError: You tried to simultaneously open more SharedNDArrays than are allowed by your system! +.. OSError: You tried to simultaneously open more SharedNDArrays than are allowed by your system! -This results from a limitation in the number of shared memory objects that can be created on a Mac. To figure out how many objects can open, open a terminal and run the following command +.. This results from a limitation in the number of shared memory objects that can be created on a Mac. To figure out how many objects can open, open a terminal and run the following command -.. code-block:: console +.. .. code-block:: console - ulimit -n +.. ulimit -n -To increase this number, simply add an integer value after it. In our hands, 1000 typically works: +.. To increase this number, simply add an integer value after it. In our hands, 1000 typically works: -.. code-block:: console +.. .. code-block:: console - ulimit -n 1000 +.. ulimit -n 1000 diff --git a/docs/source/index.rst b/docs/source/index.rst index a69f02563..3004905d5 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -5,20 +5,36 @@ **navigate** ############ +.. image:: https://img.shields.io/badge/Published%20in-Nature%20Methods-blue + :target: https://doi.org/10.1038/s41592-024-02413-4 + :alt: Published in Nature Methods -**navigate** is an open-source Python software for light-sheet microscope control. It focuses on smart microscopy applications by providing reusable acquisition and analysis routines, termed :ref:`features `, that can be chained in arbitrary orders to create custom acquisition protocols. **navigate** is designed to accommodate the needs of a diverse user base, from biologists with no programming experience to advanced technology developers. +.. image:: https://img.shields.io/github/stars/TheDeanLab/navigate?style=social + :target: https://github.com/TheDeanLab/navigate + :alt: GitHub Stars -**Project Philosophy** -========================= +.. image:: https://img.shields.io/pypi/v/navigate-micro + :target: https://pypi.org/project/navigate-micro/ + :alt: PyPI Version -* Prioritize standard library imports for maximum stability, and minimize external dependencies. -* Abstraction layer to drive different camera types, etc. -* Plugin architecture for extensibility. -* Maximize productivity for biological users through robust graphical user interface-based workflows. -* Performant and responsive. -* Brutally obvious, well-documented, clean code organized in an industry standard Model-View-Controller architecture. +.. image:: https://img.shields.io/pypi/pyversions/navigate-micro + :target: https://pypi.org/project/navigate-micro/ + :alt: Python Versions -.. note:: + +**navigate** is an open-source Python software for light-sheet microscope control. It focuses on smart microscopy applications by providing reusable acquisition and analysis routines, termed :ref:`features `, that can be chained together arbitrarily to create custom acquisition protocols. **navigate** is accompanied by `Altair `_, our open-source light-sheet microscope designs. **navigate** is designed to accommodate the needs of a diverse user base, from biologists with no programming experience to advanced technology developers. + +**Key Features** +================ + +- Smart microscopy control with customizable acquisition protocols +- Hardware abstraction layer supporting multiple device vendors +- Intuitive GUI for biologists with no programming experience +- Extensible plugin architecture for developers +- Integration with open-source light-sheet microscope designs + + +.. seealso:: This project is under active development. See our `GitHub repository for updates `_. @@ -87,13 +103,15 @@ **Authors** ============ -**navigate** includes key contributions from numerous individuals, both past and present, in `The Dean Lab `_. Please see the accompanying manuscript for a full list of contributors. :ref:`Outside contributors ` are welcome. + +**navigate** includes key contributions from individuals both inside and outside of `The Dean Lab `_. Please see the GtiHub repository for a full list of contributors. We welcome community contributions - see our :ref:`contributing guidelines ` for more information on how to get involved. + **Funding** ============ -**navigate** is supported by the +- Cancer Prevention and Research Institute of Texas (10068451). - NIH National Institute of General Medical Science (RM1GM145399). - NIH National Cancer Institute (1U54CA268072). -- `Simmons Comprehensive Cancer Center `_ Translational Seed Grant. -- `UTSW President's Research Council `_ +- Simmons Comprehensive Cancer Center Translational Seed Grant. +- UTSW President's Research Council diff --git a/src/navigate/model/devices/camera/base.py b/src/navigate/model/devices/camera/base.py index 1f70be377..0b5d01a03 100644 --- a/src/navigate/model/devices/camera/base.py +++ b/src/navigate/model/devices/camera/base.py @@ -33,7 +33,8 @@ # Standard Library Imports import logging import os -from typing import Any, Dict, Optional +from typing import Any, Optional +from abc import ABC, abstractmethod # Third Party Imports import tifffile @@ -48,17 +49,21 @@ @log_initialization -class CameraBase: - """CameraBase - Parent camera class.""" +class CameraBase(ABC): + """Abstract base class for cameras. + + This class provides the interface and common functionality for controlling + cameras with navigate. + """ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], - ): + ) -> None: """Initialize CameraBase class. Parameters @@ -120,16 +125,11 @@ def __init__( self.minimum_exposure_time = 0.001 self.camera_parameters["x_pixels"] = 2048 self.camera_parameters["y_pixels"] = 2048 - # TODO: trigger_source, readout_speed, - # trigger_active, trigger_mode and trigger_polarity - # can be removed after updating how we get the - # readout time in model and controller - self.camera_parameters["trigger_source"] = 2.0 # external trigger - self.camera_parameters["readout_speed"] = 1.0 - self.camera_parameters["pixel_size_in_microns"] = 6.5 - self.camera_parameters["trigger_active"] = 1.0 - self.camera_parameters["trigger_mode"] = 1.0 # standard trigger mode - self.camera_parameters["trigger_polarity"] = 2.0 + + if "pixel_size_in_microns" not in self.camera_parameters: + self.camera_parameters["pixel_size_in_microns"] = 6.5 + + # Supported modes, not all cameras support all modes self.camera_parameters["supported_sensor_modes"] = ["Normal", "Light-Sheet"] self.camera_parameters["supported_readout_directions"] = [ "Top-to-Bottom", @@ -145,11 +145,73 @@ def __init__( self._offset, self._variance = None, None self.get_offset_variance_maps() - def __str__(self): + def __str__(self) -> str: """Return string representation of CameraBase.""" return "CameraBase" - def get_offset_variance_maps(self): + @abstractmethod + def get_new_frame(self) -> list[int]: + """Get a new frame from the camera. + + This abstract method must be implemented by all subclasses. + + + Returns + ------- + frame_ids : list[int] + New frame ids from the camera. + """ + return [] + + @abstractmethod + def initialize_image_series( + self, + data_buffer: Optional[list]=None, + number_of_frames: int=100 + ) -> None: + """Initialize image series and attach the given data_buffer, + which serves as the destination for incoming images. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + data_buffer : + List of SharedNDArrays of shape=(self.img_height, + self.img_width) and dtype="uint16" + Default is None. + number_of_frames : int + Number of frames. Default is 100. + """ + self.is_acquiring = True + + @abstractmethod + def close_image_series(self) -> None: + """Close image series. + + This abstract method must be implemented by all subclasses. + """ + self.is_acquiring = False + + @abstractmethod + def set_line_interval(self, line_interval_time: float) -> bool: + """Set the camera line interval time. + + This abstract method must be implemented by all subclasses. + + Returns + ------- + result: bool + True if successful, False otherwise. + """ + return True + + @abstractmethod + def set_exposure_time(self, exposure_time: float) -> bool: + """Set the camera exposure time.""" + return True + + def get_offset_variance_maps(self) -> Any: """Get offset and variance maps from file. Returns @@ -179,7 +241,7 @@ def get_offset_variance_maps(self): return self._offset, self._variance @property - def offset(self): + def offset(self) -> Any: """Return offset map. If not present, load from file. Returns @@ -192,7 +254,7 @@ def offset(self): return self._offset @property - def variance(self): + def variance(self) -> Any: """Return variance map. If not present, load from file. Returns @@ -205,7 +267,8 @@ def variance(self): self.get_offset_variance_maps() return self._variance - def set_readout_direction(self, mode) -> None: + @abstractmethod + def set_readout_direction(self, mode: str) -> None: """Set HamamatsuOrca readout direction. Parameters @@ -215,9 +278,10 @@ def set_readout_direction(self, mode) -> None: """ logger.info(f"Camera readout direction set to: {mode}.") + @abstractmethod def calculate_light_sheet_exposure_time( - self, full_chip_exposure_time, shutter_width - ): + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: """Convert normal mode exposure time to light-sheet mode exposure time. Calculate the parameters for an acquisition @@ -251,7 +315,7 @@ def close_camera(self) -> None: """Close camera.""" pass - def get_line_interval(self) -> float: + def get_line_interval(self) -> Optional[float]: """Return stored camera line interval. Returns @@ -261,8 +325,58 @@ def get_line_interval(self) -> float: """ return self.camera_parameters.get("line_interval", None) + @abstractmethod + def set_ROI( + self, + roi_width: int = 2048, + roi_height: int = 2048, + center_x: int = 1024, + center_y: int = 1024, + ) -> bool: + """Change the size of the active region on the camera. - def set_ROI_and_binning(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024, binning='1x1') -> bool: + Parameters + ---------- + roi_width : int + Width of active camera region. + roi_height : int + Height of active camera region. + center_x : int + X position of the center of view + center_y : int + Y position of the center of view + + Returns + ------- + result: bool + True if successful, False otherwise. + """ + return True + + @abstractmethod + def set_binning(self, binning: str = "1x1") -> bool: + """Set the camera binning mode. + + Parameters + ---------- + binning : str + Desired binning properties (e.g., '1x1', '2x2', '4x4', '1x2', '2x4') + + Returns + ------- + result: bool + True if successful, False otherwise. + """ + return True + + def set_ROI_and_binning( + self, + roi_width: int = 2048, + roi_height: int = 2048, + center_x: int = 1024, + center_y: int = 1024, + binning: str = "1x1", + ) -> bool: """Change the size of the active region on the camera and set the binning mode. Parameters @@ -278,7 +392,7 @@ def set_ROI_and_binning(self, roi_width=2048, roi_height=2048, center_x=1024, ce binning : str Desired binning properties (e.g., '1x1', '2x2', '4x4', '8x8', '16x16', '1x2', '2x4') - + Returns ------- result: bool @@ -292,13 +406,27 @@ def set_ROI_and_binning(self, roi_width=2048, roi_height=2048, center_x=1024, ce # Set Binning result = self.set_binning(binning) return result - - def set_trigger_mode(self, trigger_source:str="External") -> None: + + @abstractmethod + def set_trigger_mode(self, trigger_source: str = "External") -> None: """Set the camera trigger source to external or internal free run mode. + + This abstract method must be implemented by all subclasses. + Parameters ---------- trigger_source : str Trigger source. Options are 'External' or 'Internal'. """ - # Only supports external triggering by default + pass + + @abstractmethod + def set_sensor_mode(self, mode: str) -> None: + """Set camera sensor mode. + + Parameters + ---------- + mode : str + Sensor mode. Options are 'Normal' or 'Light-Sheet'. + """ pass diff --git a/src/navigate/model/devices/camera/hamamatsu.py b/src/navigate/model/devices/camera/hamamatsu.py index afbb7ead3..a388b7595 100644 --- a/src/navigate/model/devices/camera/hamamatsu.py +++ b/src/navigate/model/devices/camera/hamamatsu.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports @@ -59,7 +59,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -107,10 +107,8 @@ def __init__( self.camera_controller.set_property_value( "readout_speed", int(speed_max) ) - self.camera_parameters["readout_speed"] = int(speed_max) else: self.camera_controller.set_property_value("readout_speed", 1) - self.camera_parameters["readout_speed"] = 1 self.camera_parameters[ "pixel_size_in_microns" @@ -139,7 +137,7 @@ def __del__(self): # self.close_camera() @classmethod - def connect(cls, camera_id): + def connect(cls, camera_id: str) -> DCAM: """Connect to HamamatsuOrca camera. Parameters @@ -156,7 +154,7 @@ def connect(cls, camera_id): return camera_controller @property - def serial_number(self): + def serial_number(self) -> str: """Get Camera Serial Number Returns @@ -166,7 +164,7 @@ def serial_number(self): """ return self.camera_controller._serial_number - def report_settings(self): + def report_settings(self) -> None: """Print Camera Settings. Prints the current camera settings to the console and the log file.""" @@ -198,11 +196,11 @@ def report_settings(self): self.camera_controller.get_property_range("exposure_time"), ) - def close_camera(self): + def close_camera(self) -> None: """Close HamamatsuOrca Camera""" self.camera_controller.dev_close() - def set_trigger_mode(self, trigger_source="External"): + def set_trigger_mode(self, trigger_source:str="External") -> None: """Set Hamamatsu trigger source and trigger mode. Parameters @@ -222,20 +220,20 @@ def set_trigger_mode(self, trigger_source="External"): "defect_correct_mode", self.camera_parameters["defect_correct_mode"] ) self.camera_controller.set_property_value( - "trigger_active", self.camera_parameters["trigger_active"] + "trigger_active", 1.0 ) self.camera_controller.set_property_value( - "trigger_mode", self.camera_parameters["trigger_mode"] + "trigger_mode", 1 ) self.camera_controller.set_property_value( - "trigger_polarity", self.camera_parameters["trigger_polarity"] + "trigger_polarity", 2.0 ) self.camera_controller.set_property_value( - "trigger_source", self.camera_parameters["trigger_source"] + "trigger_source", 2 # External trigger. ) logger.debug("Set camera trigger mode: External Edge Trigger.") - def set_sensor_mode(self, mode): + def set_sensor_mode(self, mode: str) -> None: """Set HamamatsuOrca sensor mode. Parameters @@ -275,7 +273,7 @@ def set_sensor_mode(self, mode): print("Camera mode not supported") logger.debug("Camera mode not supported") - def set_readout_direction(self, mode): + def set_readout_direction(self, mode: str) -> None: """Set HamamatsuOrca readout direction. Parameters @@ -300,7 +298,7 @@ def set_readout_direction(self, mode): print("Camera readout direction not supported") logger.debug("Camera readout direction not supported") - def calculate_readout_time(self): + def calculate_readout_time(self) -> float: """Get the duration of time needed to read out an image. Returns @@ -313,7 +311,7 @@ def calculate_readout_time(self): # with camera internal delay return readout_time # + 4 * self.minimum_exposure_time - def set_exposure_time(self, exposure_time): + def set_exposure_time(self, exposure_time: float) -> bool: """Set HamamatsuOrca exposure time. Note @@ -324,22 +322,32 @@ def set_exposure_time(self, exposure_time): ---------- exposure_time : float Exposure time in seconds. + + Returns + ------- + result: bool + True if successful, False otherwise. """ return self.camera_controller.set_property_value("exposure_time", exposure_time) - def set_line_interval(self, line_interval_time): + def set_line_interval(self, line_interval_time: float) -> bool: """Set HamamatsuOrca line interval. Parameters ---------- line_interval_time : float Line interval duration. + + Returns + ------- + result: bool + True if successful, False otherwise. """ return self.camera_controller.set_property_value( "internal_line_interval", line_interval_time ) - def get_line_interval(self): + def get_line_interval(self) -> float: """Get HamamatsuOrca line interval. Returns @@ -347,11 +355,12 @@ def get_line_interval(self): line_interval_time : float Line interval duration. """ - self.line_interval = self.camera_controller.get_property_value( + line_interval = self.camera_controller.get_property_value( "internal_line_interval" ) + return line_interval - def set_binning(self, binning_string): + def set_binning(self, binning_string: str) -> bool: """Set HamamatsuOrca binning mode. Parameters @@ -383,7 +392,7 @@ def set_binning(self, binning_string): ) return True - def set_ROI(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024): + def set_ROI(self, roi_width: int=2048, roi_height: int=2048, center_x: int=1024, center_y: int=1024) -> bool: """Change the size of the active region on the camera. Parameters @@ -451,7 +460,7 @@ def set_ROI(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024) return self.x_pixels == roi_width and self.y_pixels == roi_height - def initialize_image_series(self, data_buffer=None, number_of_frames=100): + def initialize_image_series(self, data_buffer: Optional[list]=None, number_of_frames: int=100) -> None: """Initialize HamamatsuOrca image series. Parameters @@ -466,7 +475,7 @@ def initialize_image_series(self, data_buffer=None, number_of_frames=100): self.camera_controller.start_acquisition(data_buffer, number_of_frames) self.is_acquiring = True - def close_image_series(self): + def close_image_series(self) -> None: """Close image series. Stops the acquisition and sets is_acquiring flag to False. @@ -474,12 +483,12 @@ def close_image_series(self): self.camera_controller.stop_acquisition() self.is_acquiring = False - def get_new_frame(self): + def get_new_frame(self) -> list[int]: """Get frame from HamamatsuOrca camera. Returns ------- - frame : numpy.ndarray + frame_ids : list[int] Frame ids from HamamatsuOrca camera. """ return self.camera_controller.get_frames() @@ -493,7 +502,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -525,8 +534,8 @@ def __str__(self): return "HamamatsuOrcaLightning" def calculate_light_sheet_exposure_time( - self, full_chip_exposure_time, shutter_width - ): + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: """Calculate light sheet exposure time. Parameters @@ -568,7 +577,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -612,8 +621,8 @@ def __str__(self): return "HamamatsuOrcaFire" def calculate_light_sheet_exposure_time( - self, full_chip_exposure_time, shutter_width - ): + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: """Convert normal mode exposure time to light-sheet mode exposure time. Calculate the parameters for an acquisition @@ -658,7 +667,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -695,8 +704,8 @@ def __str__(self): return "HamamatsuOrca" def calculate_light_sheet_exposure_time( - self, full_chip_exposure_time, shutter_width - ): + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: """Convert normal mode exposure time to light-sheet mode exposure time. Calculate the parameters for an acquisition @@ -742,7 +751,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -775,8 +784,8 @@ def __str__(self): return "HamamatsuOrcaFusion" def calculate_light_sheet_exposure_time( - self, full_chip_exposure_time, shutter_width - ): + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: """Calculate light sheet exposure time. Parameters diff --git a/src/navigate/model/devices/camera/photometrics.py b/src/navigate/model/devices/camera/photometrics.py index 3c41971b6..b373bfcb8 100644 --- a/src/navigate/model/devices/camera/photometrics.py +++ b/src/navigate/model/devices/camera/photometrics.py @@ -32,13 +32,13 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports from ctypes import * # noqa import numpy as np from pyvcam import pvc -from pyvcam.camera import Camera +from pyvcam.camera import Camera as PyvcamCamera # Local Imports from navigate.model.devices.camera.base import CameraBase @@ -67,7 +67,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *_: Optional[Any], **__: Optional[Any], ) -> None: @@ -139,13 +139,11 @@ def __init__( self.camera_controller.prog_scan_dir = 0 # Photometrics camera settings from config file - self.camera_controller.readout_port = self.camera_parameters["readout_port"] - self.camera_controller.speed_table_index = self.camera_parameters[ - "speed_table_index" - ] - self.camera_controller.gain = self.camera_parameters["gain"] + self.camera_controller.readout_port = self.camera_parameters.get("readout_port", 0) + self.camera_controller.speed_table_index = self.camera_parameters.get("speed_table_index", 1) + self.camera_controller.gain = self.camera_parameters.get("gain", 1) - def __str__(self): + def __str__(self) -> str: """Return string representation of PhotometricsBase object. Returns @@ -161,7 +159,7 @@ def __del__(self): self.camera_controller.close() @classmethod - def get_connect_params(cls): + def get_connect_params(cls) -> list[str]: """Register the parameters required to connect to the camera. Returns @@ -172,7 +170,7 @@ def get_connect_params(cls): return ["camera_connection"] @classmethod - def connect(cls, camera_connection): + def connect(cls, camera_connection: str) -> PyvcamCamera: """Build Photometrics Stage Serial Port connection Import Photometrics API and Initialize Camera Controller. @@ -189,8 +187,7 @@ def connect(cls, camera_connection): """ try: pvc.init_pvcam() - # camera_names = Camera.get_available_camera_names() - camera_to_open = Camera.select_camera(camera_connection) + camera_to_open = PyvcamCamera.select_camera(camera_connection) camera_to_open.open() return camera_to_open except Exception as e: @@ -200,7 +197,7 @@ def connect(cls, camera_connection): ) @property - def serial_number(self): + def serial_number(self) -> str: """Get Camera Serial Number Returns @@ -210,16 +207,12 @@ def serial_number(self): """ return self.camera_controller.serial_no - def report_settings(self): + def report_settings(self) -> None: """Print Camera Settings.""" # TODO: complete param recording print("sensor_mode: " + str(self.camera_controller.prog_scan_mode)) print("binning: " + str(self.camera_controller.binning)) print("readout_speed" + str(self.camera_controller.readout_time)) - print("trigger_active") - print("trigger_mode") - print("trigger_polarity") - print("trigger_source") print("internal_line_interval") print("sensor size" + str(self.camera_controller.sensor_size)) print("image_height and width" + str(self.x_pixels) + ", " + str(self.y_pixels)) @@ -229,7 +222,7 @@ def close_camera(self): """Close Photometrics Camera""" self.camera_controller.close() - def set_sensor_mode(self, mode): + def set_sensor_mode(self, mode: str) -> None: """Set Photometrics sensor mode Can be normal or programmable scan mode (e.g., ASLM). @@ -247,7 +240,24 @@ def set_sensor_mode(self, mode): print("Camera mode not supported" + str(modes_dict[mode])) logger.debug("Camera mode not supported" + str(modes_dict[mode])) - def set_readout_direction(self, mode): + def set_trigger_mode(self, trigger_source: str = "External") -> None: + """Set the camera trigger source to external or internal free run mode. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + trigger_source : str + Trigger source. Options are 'External' or 'Internal'. + """ + if trigger_source == "External": + self.camera_controller.trig_mode = "External" + elif trigger_source == "Internal": + self.camera_controller.trig_mode = "Internal" + else: + logger.debug("Camera trigger mode not supported") + + def set_readout_direction(self, mode: str) -> None: """Set Photometrics readout direction. Parameters @@ -289,7 +299,7 @@ def calculate_readout_time(self): return readout_time_ms / 1000 - def set_exposure_time(self, exposure_time): + def set_exposure_time(self, exposure_time: float) -> bool: """Set Photometrics exposure time. Note: Units of the Photometrics API are in milliseconds @@ -301,15 +311,15 @@ def set_exposure_time(self, exposure_time): Returns ------- - exposure_time : float - Exposure time in milliseconds. + result : bool + True if exposure time was set successfully, False otherwise. """ self._exposure_time = int(exposure_time * 1000) self.camera_controller.exp_time = self._exposure_time self.camera_controller.start_live(self._exposure_time) - return exposure_time + return True - def set_line_interval(self, line_interval_time): + def set_line_interval(self, line_interval_time: float) -> bool: """Set Photometrics line interval. Parameters @@ -320,10 +330,11 @@ def set_line_interval(self, line_interval_time): # todo calculate line delay from scan delay self._scan_delay = line_interval_time self.camera_controller.prog_scan_line_delay = line_interval_time + return True def calculate_light_sheet_exposure_time( - self, full_chip_exposure_time, shutter_width - ): + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: """Convert normal mode exposure time to light-sheet mode exposure time. Calculate the parameters for an ASLM acquisition @@ -376,7 +387,7 @@ def calculate_light_sheet_exposure_time( self._scan_delay = aslm_line_delay return aslm_line_exposure / 1000, aslm_line_delay, aslm_acquisition_time / 1000 - def set_binning(self, binning_string): + def set_binning(self, binning_string: str) -> bool: """Set Photometrics binning mode. Parameters @@ -416,7 +427,13 @@ def set_binning(self, binning_string): self.y_pixels = int(self.y_pixels / self.y_binning) return True - def set_ROI(self, roi_width=3200, roi_height=3200, center_x=1600, center_y=1600): + def set_ROI( + self, + roi_width: int=3200, + roi_height: int=3200, + center_x: int=1600, + center_y: int=1600 + ) -> bool: """Change the size of the active region on the camera. Parameters @@ -465,7 +482,7 @@ def set_ROI(self, roi_width=3200, roi_height=3200, center_x=1600, center_y=1600) self.x_pixels, self.y_pixels = self.camera_controller.shape() return self.x_pixels == roi_width and self.y_pixels == roi_height - def initialize_image_series(self, data_buffer=None, number_of_frames=100): + def initialize_image_series(self, data_buffer: Optional[list]=None, number_of_frames: int=100) -> None: """Initialize Photometrics image series. This is for starting stacks etc. Parameters @@ -513,14 +530,14 @@ def initialize_image_series(self, data_buffer=None, number_of_frames=100): # with the current exposure time. self.camera_controller.start_live() - def _receive_images(self): + def _receive_images(self) -> list[int]: """ Update image in the data buffer if the Photometrics camera acquired a new image and return frame ids. Returns ------- - frame : numpy.ndarray + frame : list Frame ids from Photometrics camera that point to newly acquired data in data buffer """ @@ -547,19 +564,19 @@ def _receive_images(self): return [] - def get_new_frame(self): + def get_new_frame(self) -> list[int]: """ Call update function for data buffer and get frame ids from Photometrics camera. Returns ------- - frame : numpy.ndarray + frame : list[int] Frame ids from Photometrics camera that point to newly acquired data in data buffer """ return self._receive_images() - def close_image_series(self): + def close_image_series(self) -> None: """Close Photometrics image series. Stops the acquisition and sets is_acquiring flag to False. diff --git a/src/navigate/model/devices/camera/synthetic.py b/src/navigate/model/devices/camera/synthetic.py index 231293420..f5381bc8a 100644 --- a/src/navigate/model/devices/camera/synthetic.py +++ b/src/navigate/model/devices/camera/synthetic.py @@ -34,7 +34,7 @@ import logging import time import ctypes -from typing import Optional, Any, Dict, List +from typing import Optional, Any, List # Third Party Imports import numpy as np @@ -97,7 +97,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -226,7 +226,7 @@ def set_exposure_time(self, exposure_time: float) -> None: """ self.camera_exposure_time = exposure_time - def set_line_interval(self, line_interval_time: float) -> None: + def set_line_interval(self, line_interval_time: float) -> bool: """Set SyntheticCamera line interval. Parameters @@ -234,7 +234,7 @@ def set_line_interval(self, line_interval_time: float) -> None: line_interval_time : float Line interval duration. """ - pass + super().set_line_interval(line_interval_time) def set_binning(self, binning_string: str) -> bool: """Set SyntheticCamera binning mode. @@ -273,7 +273,7 @@ def initialize_image_series( self, data_buffer: Optional[List[SharedNDArray]] = None, number_of_frames: int = 100, - ): + ) -> None: """Initialize SyntheticCamera image series. Parameters @@ -298,9 +298,22 @@ def close_image_series(self) -> None: self.current_frame_idx = 0 self.is_acquiring = False - def load_images(self, filenames: Optional[str] = None, ds=None) -> None: + def load_images(self, filenames: Optional[list[str]] = None, ds=None) -> None: """Pre-populate the buffer with images. Can either come from TIFF files or - Numpy stacks.""" + Numpy stacks. + + Parameters + ---------- + filenames : list[str], optional + List of TIFF filenames, by default None + ds : list, optional + List of Numpy stacks, by default None + + Raises + ------ + tifffile.TiffFileError + If the TIFF file cannot be opened. + """ self.random_image = False #: int: current image id self.img_id = 0 @@ -333,6 +346,8 @@ def load_images(self, filenames: Optional[str] = None, ds=None) -> None: return else: self.random_image = True + del self.tif_images[:] + self.tif_images = [] def generate_new_frame(self) -> None: """Generate a synthetic image.""" @@ -397,7 +412,7 @@ def set_ROI( X position of the center of view center_y : int Y position of the center of view - + Returns ------- bool @@ -432,3 +447,34 @@ def set_trigger_mode(self, trigger_source: str="External") -> None: Trigger source, either 'External' or 'Internal'. """ logger.debug(f"Set camera trigger mode: {trigger_source}") + + def calculate_light_sheet_exposure_time( + self, full_chip_exposure_time: float, shutter_width: float + ) -> tuple[float, float, float]: + """Calculate the light sheet exposure time. + + Parameters + ---------- + full_chip_exposure_time : float + Full chip exposure time in seconds. + shutter_width : float + Shutter width in pixels. + + Returns + ------- + tuple[float, float, float] + Tuple containing the light sheet exposure time, the line interval time, + and the readout time. + """ + ( + exposure_time, + camera_line_interval, + full_chip_exposure_time, + ) = super().calculate_light_sheet_exposure_time( + full_chip_exposure_time, shutter_width + ) + + return exposure_time, camera_line_interval, full_chip_exposure_time + + def set_readout_direction(self, mode) -> None: + super().set_readout_direction(mode) diff --git a/src/navigate/model/devices/camera/ximea.py b/src/navigate/model/devices/camera/ximea.py index 22354ae30..5dd2914e0 100644 --- a/src/navigate/model/devices/camera/ximea.py +++ b/src/navigate/model/devices/camera/ximea.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports from ximea import xiapi @@ -55,7 +55,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -108,7 +108,7 @@ def __init__( self.camera_parameters["supported_readout_directions"] = ["Top-to-Bottom"] - def __str__(str): + def __str__(self) -> str: """Return string representation of Ximea Base class Returns @@ -123,7 +123,7 @@ def __del__(self): self.cam.close_device() @classmethod - def get_connect_params(cls): + def get_connect_params(cls) -> list[str]: """Register the parameters required to connect to the camera. Returns @@ -134,7 +134,7 @@ def get_connect_params(cls): return ["serial_number"] @classmethod - def connect(cls, serial_number): + def connect(cls, serial_number: str) -> xiapi.Camera: """Build Photometrics Stage Serial Port connection Import Photometrics API and Initialize Camera Controller. @@ -161,7 +161,7 @@ def connect(cls, serial_number): ) @property - def serial_number(self): + def serial_number(self) -> str: """Get Camera Serial Number Returns @@ -171,7 +171,7 @@ def serial_number(self): """ self.cam.get_param("device_sn").decode("utf-8") - def report_settings(self): + def report_settings(self) -> None: """Print Camera Settings. Prints the current camera settings to the console and the log file. @@ -182,7 +182,7 @@ def report_settings(self): print(param, value) logger.info(f"{param}, {value}") - def set_sensor_mode(self, mode): + def set_sensor_mode(self, mode: str) -> None: """Set Ximea sensor mode. On the manual page 72: @@ -200,10 +200,26 @@ def set_sensor_mode(self, mode): """ self.cam.set_param("acq_timing_mode", "XI_ACQ_TIMING_MODE_FREE_RUN") self.cam.set_param("shutter_type", "XI_SHUTTER_ROLLING") - - def set_readout_direction(self, mode): - """Set readout direction""" + + def set_trigger_mode(self, trigger_source = "External") -> None: + """Set Ximea trigger mode. + + Parameters + ---------- + trigger_source : str + 'External' or 'Internal' + """ pass + + def set_readout_direction(self, mode: str) -> None: + """Set readout direction + + Parameters + ---------- + mode : str + 'Top-to-Bottom' + """ + logger.info("Ximea camera only supports Top-to-Bottom readout direction.") def calculate_readout_time(self): """Get the duration of time needed to read out an image. @@ -215,7 +231,30 @@ def calculate_readout_time(self): """ return 0 - def set_exposure_time(self, exposure_time): + def calculate_light_sheet_exposure_time( + self, full_chip_exposure_time: float, shutter_width: int + ) -> tuple[float, float, float]: + """Calculate the exposure time for light-sheet imaging. + + Parameters + ---------- + full_chip_exposure_time : float + Exposure time for full chip acquisition. + shutter_width : int + Width of the light-sheet shutter in pixels. + + Returns + ------- + exposure_time : float + Exposure time for light-sheet imaging. + line_interval : float + Line interval for light-sheet imaging. + readout_time : float + Readout time for light-sheet imaging. + """ + return super().calculate_light_sheet_exposure_time(full_chip_exposure_time, shutter_width) + + def set_exposure_time(self, exposure_time: float) -> bool: """Set Ximea exposure time. Note @@ -226,12 +265,17 @@ def set_exposure_time(self, exposure_time): ---------- exposure_time : float Exposure time in seconds. + + Returns + ------- + result: bool + True if successful, False otherwise. """ #seconds to us. self.cam.set_param("exposure", exposure_time * 1000000) return True - def set_line_interval(self, line_interval_time): + def set_line_interval(self, line_interval_time: float) -> bool: """Set line interval. Parameters @@ -241,17 +285,7 @@ def set_line_interval(self, line_interval_time): """ return False - def get_line_interval(self): - """Get line interval. - - Returns - ------- - line_interval_time : float - Line interval duration. - """ - self.line_interval = 0 - - def set_binning(self, binning_string): + def set_binning(self, binning_string) -> bool: """Set Ximea Camera binning mode. Parameters @@ -288,7 +322,7 @@ def set_binning(self, binning_string): return True - def set_ROI(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024): + def set_ROI(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024) -> bool: """Change the size of the active region on the camera. Parameters @@ -347,39 +381,8 @@ def set_ROI(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024) self.y_pixels = self.cam.get_param("height") * binning_value return self.x_pixels == roi_width * binning_value and self.y_pixels == roi_height * binning_value - - def set_ROI_and_binning(self, roi_width=2048, roi_height=2048, center_x=1024, center_y=1024, binning='1x1'): - """Change the size of the active region on the camera and set the binning mode. - - Parameters - ---------- - roi_width : int - Width of active camera region. - roi_height : int - Height of active camera region. - center_x : int - X position of the center of view - center_y : int - Y position of the center of view - binning : str - Desired binning properties (e.g., '1x1', '2x2', '4x4', '8x8', '16x16', - '1x2', '2x4') - - Returns - ------- - result: bool - True if successful, False otherwise. - """ - # Set Binning - result = self.set_binning(binning) - if not result: - return False - - # Set ROI - result = self.set_ROI(roi_width, roi_height, center_x, center_y) - return result - def initialize_image_series(self, data_buffer=None, number_of_frames=100): + def initialize_image_series(self, data_buffer: Optional[list]=None, number_of_frames=100) -> None: """Initialize Ximea Camera image series. Parameters @@ -406,7 +409,7 @@ def initialize_image_series(self, data_buffer=None, number_of_frames=100): self.cam.start_acquisition() self.is_acquiring = True - def close_image_series(self): + def close_image_series(self) -> None: """Close image series. Stops the acquisition and sets is_acquiring flag to False. @@ -414,7 +417,7 @@ def close_image_series(self): self.cam.stop_acquisition() self.is_acquiring = False - def get_new_frame(self): + def get_new_frame(self) -> list[int]: """Get frame from Ximea camera. Returns @@ -450,7 +453,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -472,7 +475,7 @@ def __init__( # need to reset the trigger mode to XI_GPI_TRIGGER, otherwise the trigger mode is XI_GPI_OFF self.cam.set_param("gpi_mode", "XI_GPI_TRIGGER") - def __str__(str): + def __str__(str) -> str: """Return string representation of Ximea MU196XR class Returns diff --git a/src/navigate/model/devices/daq/asi.py b/src/navigate/model/devices/daq/asi.py index 32856edf4..ad89da2aa 100644 --- a/src/navigate/model/devices/daq/asi.py +++ b/src/navigate/model/devices/daq/asi.py @@ -33,7 +33,7 @@ # Standard Imports import logging from threading import Lock -from typing import Dict, Any +from typing import Any # Third Party Imports from multiprocessing.managers import DictProxy, ListProxy @@ -62,7 +62,7 @@ def __init__( self, microscope_name, device_connection, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id, ) -> None: """Initialize the ASI DAQ. @@ -260,3 +260,7 @@ def stop_acquisition(self) -> None: # if self.wait_to_run_lock.locked(): # self.wait_to_run_lock.release() + + def wait_acquisition_done(self) -> None: + """Wait for acquisition to be done.""" + super().wait_acquisition_done() diff --git a/src/navigate/model/devices/daq/base.py b/src/navigate/model/devices/daq/base.py index e3c934571..b20430717 100644 --- a/src/navigate/model/devices/daq/base.py +++ b/src/navigate/model/devices/daq/base.py @@ -32,7 +32,8 @@ # Standard Imports import logging -from typing import Any, Dict +from typing import Any +from abc import ABC, abstractmethod # Third Party Imports @@ -46,10 +47,16 @@ @log_initialization -class DAQBase: - """DAQBase - Parent class for Data Acquisition (DAQ) classes.""" +class DAQBase(ABC): + """Abstract base class for Data Acquisition (DAQ) devices. - def __init__(self, configuration: Dict[str, Any]) -> None: + This class provides the interface and common functionality for controlling + data acquisition hardware with navigate. It handles waveform generation based on + configuration parameters, exposure times, and sweep times for different imaging + channels. + """ + + def __init__(self, configuration: dict[str, Any]) -> None: """Initializes the DAQBase class. Parameters @@ -100,10 +107,56 @@ def __init__(self, configuration: Dict[str, Any]) -> None: #: int: Number of times to expand the waveform self.waveform_expand_num = 1 + #: str: Trigger mode. Self-trigger or external-trigger. + self.trigger_mode = "self-trigger" + def __str__(self) -> str: """Returns the string representation of the DAQBase class""" return "DAQBase" + @abstractmethod + def stop_acquisition(self) -> None: + """Stops the acquisition. + + This abstract method must be implemented by all subclasses. + """ + pass + + @abstractmethod + def prepare_acquisition(self, channel_key: str) -> None: + """Prepare the acquisition. + + This abstract method must be implemented by all subclasses. + + + Parameters + ---------- + channel_key : str + Channel key for current channel. + """ + pass + + @abstractmethod + def run_acquisition(self, wait_until_done: bool = True) -> None: + """Run acquisition. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + wait_until_done : bool, optional + Whether to wait until the acquisition is done, by default True + """ + pass + + @abstractmethod + def wait_acquisition_done(self) -> None: + """Wait acquisition tasks done + + This abstract method must be implemented by all subclasses. + """ + pass + def calculate_all_waveforms(self, microscope_name, exposure_times, sweep_times): """Pre-calculates all waveforms necessary for the acquisition and organizes in a dictionary format. @@ -168,3 +221,28 @@ def enable_microscope(self, microscope_name: str) -> None: self.sample_rate = self.configuration["configuration"]["microscopes"][ microscope_name ]["daq"]["sample_rate"] + + def update_analog_task(self, board_name: str) -> None: + """Update the analog task. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + board_name : str + Name of board. + """ + pass + + def set_external_trigger(self, external_trigger: str=None) -> None: + """Set the external trigger. + + Parameters + ---------- + external_trigger : str, optional + Name of external trigger. + """ + + self.trigger_mode = ( + "self-trigger" if external_trigger is None else "external-trigger" + ) diff --git a/src/navigate/model/devices/daq/ni.py b/src/navigate/model/devices/daq/ni.py index de424e76d..93b757e96 100644 --- a/src/navigate/model/devices/daq/ni.py +++ b/src/navigate/model/devices/daq/ni.py @@ -34,7 +34,7 @@ from threading import Lock import traceback import time -from typing import Union, Dict, Any +from typing import Any import gc # Third Party Imports @@ -58,7 +58,7 @@ class NIDAQ(DAQBase): """NIDAQ class for Control of NI Data Acquisition Cards.""" - def __init__(self, configuration: Dict[str, Any]) -> None: + def __init__(self, configuration: dict[str, Any]) -> None: """Initialize NIDAQ class. Parameters @@ -580,7 +580,7 @@ def enable_microscope(self, microscope_name: str) -> None: except KeyError: pass - def update_analog_task(self, board_name: str) -> Union[bool, None]: + def update_analog_task(self, board_name: str) -> bool: """Update analog task. Parameters @@ -590,7 +590,7 @@ def update_analog_task(self, board_name: str) -> Union[bool, None]: Returns ------- - bool, None + bool True if task is updated, False otherwise. """ # if there is no such analog task, @@ -637,6 +637,7 @@ def update_analog_task(self, board_name: str) -> Union[bool, None]: self.is_updating_analog_task = False self.wait_to_run_lock.release() + return True def reset(self, device_name: str = None) -> None: """Reset the DAQ device. diff --git a/src/navigate/model/devices/daq/synthetic.py b/src/navigate/model/devices/daq/synthetic.py index 9d0761599..f91c9e0e4 100644 --- a/src/navigate/model/devices/daq/synthetic.py +++ b/src/navigate/model/devices/daq/synthetic.py @@ -34,7 +34,7 @@ import logging import time from threading import Lock -from typing import Any, Dict +from typing import Any, Optional # Third Party Imports @@ -51,7 +51,7 @@ class SyntheticDAQ(DAQBase): """SyntheticDAQ class for Data Acquisition (DAQ).""" - def __init__(self, configuration: Dict[str, Any]) -> None: + def __init__(self, configuration: dict[str, Any]) -> None: """Initialize the Synthetic DAQ. Parameters @@ -83,34 +83,7 @@ def __str__(self) -> str: """String representation of the class.""" return "SyntheticDAQ" - def create_camera_task(self): - """Set up the camera trigger task.""" - pass - - def create_master_trigger_task(self): - """Set up the DO master trigger task.""" - pass - - def create_galvo_remote_focus_tasks(self): - """Create galvo and remote focus tasks""" - pass - - def start_tasks(self): - """Start the tasks for camera triggering and analog outputs. - - If the tasks are configured to be triggered, they won't start until - run_tasks() is called.""" - pass - - def stop_tasks(self): - """Stop the tasks for triggering, analog and counter outputs.""" - pass - - def close_tasks(self): - """Close the tasks for triggering, analog, and counter outputs.""" - pass - - def prepare_acquisition(self, channel_key: str): + def prepare_acquisition(self, channel_key: str) -> None: """Prepare the acquisition. Parameters @@ -123,13 +96,19 @@ def prepare_acquisition(self, channel_key: str): if self.wait_to_run_lock.locked(): self.wait_to_run_lock.release() - def run_acquisition(self, wait_until_done=True): + def run_acquisition(self, wait_until_done=True) -> None: """Run DAQ Acquisition. Run the tasks for triggering, analog and counter outputs. The master trigger initiates all other tasks via a shared trigger For this to work, all analog output and counter tasks have to be started so that - they are waiting for the trigger signal.""" + they are waiting for the trigger signal. + + Parameters + ---------- + wait_until_done : bool, optional + Wait until acquisition is done, by default True. + """ # wait if writing analog tasks if self.is_updating_analog_task: self.wait_to_run_lock.acquire() @@ -137,22 +116,18 @@ def run_acquisition(self, wait_until_done=True): if wait_until_done: self.wait_acquisition_done() - def wait_acquisition_done(self): + def wait_acquisition_done(self) -> None: """Wait for a short time to generate an image""" time.sleep(0.01) if self.trigger_mode == "self-trigger": for microscope_name in self.camera: self.camera[microscope_name].generate_new_frame() - def stop_acquisition(self): + def stop_acquisition(self) -> None: """Stop Acquisition.""" pass - def write_waveforms_to_tasks(self): - """Write the galvo, remote focus, and laser waveforms to each task.""" - pass - - def add_camera(self, microscope_name: str, camera: object): + def add_camera(self, microscope_name: str, camera: object) -> None: """Connect camera with daq: only in syntheticDAQ. Parameters @@ -164,13 +139,18 @@ def add_camera(self, microscope_name: str, camera: object): """ self.camera[microscope_name] = camera - def update_analog_task(self, board_name): + def update_analog_task(self, board_name: str) -> bool: """Update the analog task. Parameters ---------- board_name : str Name of board. + + Returns + ------- + bool + True if task is updated, False otherwise. """ # can't update an analog task while updating one. if self.is_updating_analog_task: @@ -181,16 +161,4 @@ def update_analog_task(self, board_name): self.is_updating_analog_task = False self.wait_to_run_lock.release() - - def set_external_trigger(self, external_trigger=None): - """Set the external trigger. - - Parameters - ---------- - external_trigger : str, optional - Name of external trigger. - """ - - self.trigger_mode = ( - "self-trigger" if external_trigger is None else "external-trigger" - ) + return True diff --git a/src/navigate/model/devices/filter_wheel/asi.py b/src/navigate/model/devices/filter_wheel/asi.py index 1574057a7..a4b3f5259 100644 --- a/src/navigate/model/devices/filter_wheel/asi.py +++ b/src/navigate/model/devices/filter_wheel/asi.py @@ -33,6 +33,7 @@ # Standard Library Imports import logging import time +from typing import Any # Third Party Imports @@ -57,7 +58,13 @@ class ASIFilterWheel(FilterWheelBase, SerialDevice): https://asiimaging.com/docs/fw_1000#fw-1000_ascii_command_set """ - def __init__(self, microscope_name, device_connection, configuration, device_id=0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the ASIFilterWheel class. Parameters @@ -89,12 +96,12 @@ def __init__(self, microscope_name, device_connection, configuration, device_id= #: int: Filter wheel position. self.filter_wheel_position = 0 - def __str__(self): + def __str__(self) -> str: """String representation of the class.""" return "ASIFilterWheel" @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect(cls, port: str, baudrate: int=115200, timeout: float=0.25) -> TigerController: """Build ASIFilterWheel Serial Port connection Parameters @@ -119,7 +126,7 @@ def connect(cls, port, baudrate=115200, timeout=0.25): raise Exception("ASI stage connection failed.") return tiger_controller - def filter_change_delay(self, filter_name): + def filter_change_delay(self, filter_name: str) -> None: """Estimate duration of time necessary to move the filter wheel Assumes that it is ~40ms per adjacent position. @@ -136,7 +143,7 @@ def filter_change_delay(self, filter_name): delta_position = int(abs(old_position - new_position)) self.wait_until_done_delay = delta_position * 0.04 - def set_filter(self, filter_name, wait_until_done=True): + def set_filter(self, filter_name: str, wait_until_done=True) -> None: """Change the filter wheel to the filter designated by the filter position argument. @@ -167,7 +174,7 @@ def set_filter(self, filter_name, wait_until_done=True): if wait_until_done: time.sleep(self.wait_until_done_delay) - def close(self): + def close(self) -> None: """Close the ASI Filter Wheel serial port. Sets the filter wheel to the home position and then closes the port. @@ -177,7 +184,7 @@ def close(self): logger.debug("ASI Filter Wheel - Closing Device.") self.filter_wheel.disconnect_from_serial() - def __del__(self): + def __del__(self) -> None: """Destructor for the ASIFilterWheel class.""" self.close() @@ -196,7 +203,13 @@ class ASICubeSliderFilterWheel(FilterWheelBase, SerialDevice): Typical switch time between adjacent positions is < 250 ms. """ - def __init__(self, microscope_name, device_connection, configuration, device_id=0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the ASICubeSlider class. Parameters @@ -211,7 +224,7 @@ def __init__(self, microscope_name, device_connection, configuration, device_id= The ID of the device. Default is 0. """ - super().__init__(microscope_name, device_connection, configuration) + super().__init__(microscope_name, device_connection, configuration, device_id) #: obj: ASI Tiger Controller object. self.dichroic = device_connection @@ -228,7 +241,7 @@ def __init__(self, microscope_name, device_connection, configuration, device_id= self.dichroic_position = 0 @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect(cls, port: str, baudrate: int=115200, timeout: float=0.25) -> TigerController: """Build ASIFilterWheel Serial Port connection Parameters @@ -253,7 +266,7 @@ def connect(cls, port, baudrate=115200, timeout=0.25): raise Exception("ASI stage connection failed.") return tiger_controller - def filter_change_delay(self, filter_name): + def filter_change_delay(self, filter_name: str) -> None: """Estimate duration of time necessary to move the dichroic Assumes that it is <250 ms per adjacent position. @@ -269,7 +282,7 @@ def filter_change_delay(self, filter_name): delta_position = int(abs(old_position - new_position)) self.wait_until_done_delay = delta_position * 0.25 - def set_filter(self, filter_name, wait_until_done=True): + def set_filter(self, filter_name: str, wait_until_done: bool=True) -> None: """Change the dichroic position. Parameters @@ -300,7 +313,7 @@ def set_filter(self, filter_name, wait_until_done=True): if wait_until_done: time.sleep(self.wait_until_done_delay) - def close(self): + def close(self) -> None: """Close the ASI Filter Wheel serial port. Sets the filter wheel to the home position and then closes the port. diff --git a/src/navigate/model/devices/filter_wheel/base.py b/src/navigate/model/devices/filter_wheel/base.py index 4daf2dadf..0ddc417e0 100644 --- a/src/navigate/model/devices/filter_wheel/base.py +++ b/src/navigate/model/devices/filter_wheel/base.py @@ -32,7 +32,8 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any +from abc import ABC, abstractmethod # Third Party Imports @@ -45,10 +46,20 @@ @log_initialization -class FilterWheelBase: - """FilterWheelBase - Parent class for controlling filter wheels.""" - - def __init__(self, microscope_name: str, device_connection: Any, configuration: Dict[str, Any], device_id: int = 0) -> None: +class FilterWheelBase(ABC): + """Abstract base class for filter wheels. + + This class defines the interface for filter wheel devices used in the Navigate software. + Implementations should handle the specifics of communication with particular hardware. + """ + + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the FilterWheelBase class. Parameters @@ -66,7 +77,9 @@ def __init__(self, microscope_name: str, device_connection: Any, configuration: self.device_connection = device_connection #: Dict[str, Any]: Dictionary of device configuration parameters. - device_config = configuration["configuration"]["microscopes"][microscope_name]["filter_wheel"][device_id] + device_config = configuration["configuration"]["microscopes"][microscope_name][ + "filter_wheel" + ][device_id] self.device_config = device_config #: dict: Dictionary of filters available on the filter wheel. @@ -78,6 +91,19 @@ def __init__(self, microscope_name: str, device_connection: Any, configuration: #: int: index of filter wheel self.filter_wheel_number = device_config["hardware"]["wheel_number"] + @abstractmethod + def set_filter(self, position: Any, wait_until_done: bool=True) -> None: + """Set the filter wheel to the specified position. + + Parameters + ---------- + position : Any + The desired filter position. + wait_until_done : bool + Whether to wait until the filter wheel has completed the move. Default is True. + """ + pass + def __str__(self) -> str: """Return the string representation of the FilterWheelBase class.""" return "FilterWheelBase" diff --git a/src/navigate/model/devices/filter_wheel/ludl.py b/src/navigate/model/devices/filter_wheel/ludl.py index 9efc76644..abf7d8dde 100644 --- a/src/navigate/model/devices/filter_wheel/ludl.py +++ b/src/navigate/model/devices/filter_wheel/ludl.py @@ -34,6 +34,7 @@ import logging import time import io +from typing import Any # Third Party Imports import serial @@ -61,7 +62,13 @@ class LUDLFilterWheel(FilterWheelBase, SerialDevice): """ - def __init__(self, microscope_name, device_connection, configuration, device_id=0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the LUDLFilterWheel class. Parameters @@ -87,12 +94,12 @@ def __init__(self, microscope_name, device_connection, configuration, device_id= #: float: Delay for filter wheel to change positions. self.wait_until_done_delay = self.device_config["filter_wheel_delay"] - def __str__(self): + def __str__(self) -> str: """String representation of the class.""" return "LUDLFilterWheel" @classmethod - def connect(cls, comport, baudrate, timeout=0.25): + def connect(cls, comport: str, baudrate: int=9600, timeout: float=0.25) -> serial.Serial: """Build LUDLFilterWheel Serial Port connection Attributes @@ -130,7 +137,7 @@ def connect(cls, comport, baudrate, timeout=0.25): "Could not communicate with LUDL MAC6000 via COMPORT", comport ) - def set_filter(self, filter_name, wait_until_done=True): + def set_filter(self, filter_name: str, wait_until_done: bool=True) -> None: """Set the filter wheel to a specific filter position. Parameters @@ -155,7 +162,7 @@ def set_filter(self, filter_name, wait_until_done=True): if wait_until_done: time.sleep(self.wait_until_done_delay) - def close(self): + def close(self) -> None: """Close the LUDLFilterWheel serial port. Sets the filter wheel to the Empty-Alignment position and then closes the port. @@ -164,7 +171,7 @@ def close(self): self.set_filter(list(self.filter_dictionary.keys())[0]) self.serial.close() - def __del__(self): + def __del__(self) -> None: """Destructor for the LUDLFilterWheel class.""" if self.serial.is_open: self.close() diff --git a/src/navigate/model/devices/filter_wheel/ni.py b/src/navigate/model/devices/filter_wheel/ni.py index da990e5d8..0843a6402 100644 --- a/src/navigate/model/devices/filter_wheel/ni.py +++ b/src/navigate/model/devices/filter_wheel/ni.py @@ -34,6 +34,7 @@ import logging import time import traceback +from typing import Any # Third Party Imports import nidaqmx @@ -54,7 +55,13 @@ class NIFilterWheel(FilterWheelBase, NIDevice): """DAQFilterWheel - Class for controlling filter wheels with a DAQ.""" - def __init__(self, microscope_name, device_connection, configuration, device_id): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the DAQFilterWheel class. Parameters @@ -76,24 +83,31 @@ def __init__(self, microscope_name, device_connection, configuration, device_id) self.filter_wheel_task = None - def __str__(self): + def __str__(self) -> str: """String representation of the class.""" return "DAQFilterWheel" - def __enter__(self): - """Enter the ASI Filter Wheel context manager.""" + def __enter__(self) -> "NIFilterWheel": + """Enter the NI Filter Wheel context manager.""" return self - def __exit__(self): - """Exit the ASI Filter Wheel context manager.""" + def __exit__(self) -> bool: + """Exit the NI Filter Wheel context manager. + + Returns + ------- + bool + True if the context was exited successfully, False otherwise. + """ if self.filter_wheel_task: try: self.filter_wheel_task.stop() self.filter_wheel_task.close() except Exception: pass + return True - def set_filter(self, filter_name, wait_until_done=True): + def set_filter(self, filter_name: str, wait_until_done: bool=True) -> None: """Change the filter wheel to the filter designated by the filter position argument. Requires a digital port on the DAQ. diff --git a/src/navigate/model/devices/filter_wheel/sutter.py b/src/navigate/model/devices/filter_wheel/sutter.py index 27fdbf6e6..a5f341cb6 100644 --- a/src/navigate/model/devices/filter_wheel/sutter.py +++ b/src/navigate/model/devices/filter_wheel/sutter.py @@ -33,6 +33,7 @@ # Standard Library Imports import logging import time +from typing import Any # Third Party Imports import numpy as np @@ -58,7 +59,13 @@ class SutterFilterWheel(FilterWheelBase, SerialDevice): https://www.sutter.com/manuals/LB10-3_OpMan.pdf """ - def __init__(self, microscope_name, device_connection, configuration, device_id): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the SutterFilterWheel class. Parameters @@ -118,7 +125,7 @@ def __str__(self) -> str: return "SutterFilterWheel" @classmethod - def connect(cls, comport, baudrate, timeout=0.25): + def connect(cls, comport: str, baudrate: int=9600, timeout: float=0.25) -> serial.Serial: """Build SutterFilterWheel Serial Port connection Attributes @@ -185,7 +192,7 @@ def filter_change_delay(self, filter_name: str) -> None: except IndexError: self.wait_until_done_delay = 0.01 - def set_filter(self, filter_name: str, wait_until_done: bool = True): + def set_filter(self, filter_name: str, wait_until_done: bool = True) -> None: """Change the filter wheel to the filter designated by the filter position argument. diff --git a/src/navigate/model/devices/filter_wheel/synthetic.py b/src/navigate/model/devices/filter_wheel/synthetic.py index faeb08915..89dd53f1f 100644 --- a/src/navigate/model/devices/filter_wheel/synthetic.py +++ b/src/navigate/model/devices/filter_wheel/synthetic.py @@ -31,6 +31,7 @@ # Standard Library Imports import logging +from typing import Any # Third Party Imports @@ -47,7 +48,13 @@ class SyntheticFilterWheel(FilterWheelBase): """SyntheticFilterWheel Class""" - def __init__(self, microscope_name, device_connection, configuration, device_id): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the SyntheticFilterWheel. Parameters @@ -64,21 +71,11 @@ def __init__(self, microscope_name, device_connection, configuration, device_id) super().__init__(microscope_name, device_connection, configuration, device_id) - def __str__(self): + def __str__(self) -> str: """Return string representation of the SyntheticFilterWheel.""" return "SyntheticFilterWheel" - def filter_change_delay(self, filter_name): - """Calculate duration of time necessary to change filter wheel positions - - Parameters - ---------- - filter_name : str - Name of the filter that we want to move to - """ - pass - - def set_filter(self, filter_name, wait_until_done=True): + def set_filter(self, filter_name: str, wait_until_done: bool=True) -> None: """Change the filter wheel to the filter designated by the filter position argument. @@ -91,23 +88,13 @@ def set_filter(self, filter_name, wait_until_done=True): """ pass - def read(self, num_bytes): - """Reads the specified number of bytes from the serial port. - - Parameters - ---------- - num_bytes : int - Number of bytes to read from the serial port. - """ - pass - - def close(self): + def close(self) -> None: """Close the SyntheticFilterWheel. Sets the filter wheel to the Empty-Alignment position and then closes the port. """ pass - def __del__(self): + def __del__(self) -> None: """Delete the SyntheticFilterWheel.""" pass diff --git a/src/navigate/model/devices/galvo/asi.py b/src/navigate/model/devices/galvo/asi.py index a925adfe5..979ea1346 100644 --- a/src/navigate/model/devices/galvo/asi.py +++ b/src/navigate/model/devices/galvo/asi.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any # Local Imports @@ -54,7 +54,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the GalvoASI class. @@ -65,7 +65,7 @@ def __init__( Name of the microscope. device_connection : Any Connection to the NI DAQ device. - configuration : Dict[str, Any] + configuration : dict[str, Any] Dictionary of configuration parameters. device_id : int Galvo ID. Default is 0. @@ -84,7 +84,7 @@ def __str__(self) -> str: return "GalvoASI" @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect(cls, port: str, baudrate: int=115200, timeout: float=0.25) -> TigerController: """Build ASILaser Serial Port connection Parameters @@ -109,7 +109,7 @@ def connect(cls, port, baudrate=115200, timeout=0.25): raise Exception("ASI stage connection failed.") return tiger_controller - def adjust(self, exposure_times, sweep_times): + def adjust(self, exposure_times: dict[str, float], sweep_times: dict[str, float]) -> dict[str, Any]: """Adjust the galvo waveform to account for the camera readout time. Parameters @@ -224,7 +224,7 @@ def adjust(self, exposure_times, sweep_times): print("Unknown Galvo waveform specified in configuration file.") continue - def sawtooth(self, period=10, amplitude=1, offset=0, duty_cycle=100): + def sawtooth(self, period: float=10, amplitude: float=1, offset: float=0, duty_cycle: float=100) -> None: """ Sends the tiger controller commands to initiate the sawtooth wave. @@ -266,7 +266,7 @@ def sawtooth(self, period=10, amplitude=1, offset=0, duty_cycle=100): # Waveform is free running after TTL input self.galvo.single_axis_mode(self.axis, 4) - def sine_wave(self, period=10.0, amplitude=1.0, offset=0.0): + def sine_wave(self, period: float=10.0, amplitude: float=1.0, offset: float=0.0) -> None: """Sends the tiger controller commands to initiate the sine wave. Parameters @@ -290,11 +290,11 @@ def sine_wave(self, period=10.0, amplitude=1.0, offset=0.0): # Waveform is free running after it is triggered self.galvo.single_axis_mode(self.axis, 4) - def turn_off(self): + def turn_off(self) -> None: """Stops the galvo waveform""" self.galvo.single_axis_mode(self.axis, 0) - def close(self): + def close(self) -> None: """Close the ASI galvo serial port. Stops the remote focus waveform and then closes the port. @@ -304,6 +304,6 @@ def close(self): logger.debug("ASI Remote Focus - Closing Device.") self.galvo.disconnect_from_serial() - def __del__(self): + def __del__(self) -> None: """Destructor for the ASIGalvo class.""" self.close() diff --git a/src/navigate/model/devices/galvo/base.py b/src/navigate/model/devices/galvo/base.py index 9bf4b1fd6..5268642e0 100644 --- a/src/navigate/model/devices/galvo/base.py +++ b/src/navigate/model/devices/galvo/base.py @@ -33,6 +33,7 @@ # Standard Library Imports import logging from typing import Any, Dict +from abc import ABC, abstractmethod # Third Party Imports @@ -46,14 +47,23 @@ @log_initialization -class GalvoBase: - """GalvoBase Class - Parent class for galvanometers.""" +class GalvoBase(ABC): + """Abstract base class for galvanometer devices. + + This class provides the interface and common functionality for controlling + galvanometers with navigate. It handles waveform generation based on + configuration parameters and experimental settings. + + The class generates appropriate control waveforms (sawtooth, sine, halfsaw) + according to camera exposure times and configuration parameters. Child classes + must implement the turn_off method to control hardware-specific behaviors. + """ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the GalvoBase class. @@ -64,7 +74,7 @@ def __init__( Name of the microscope. device_connection : Any Device connection. - configuration : Dict[str, Any] + configuration : dict[str, Any] Dictionary of configuration parameters. device_id : int Galvo ID. Default is 0. @@ -115,7 +125,7 @@ def __init__( #: dict: Dictionary of galvo waveforms. self.waveform_dict = {} - def __str__(self): + def __str__(self) -> str: """Returns the string representation of the GalvoBase class.""" return "GalvoBase" @@ -123,7 +133,8 @@ def __del__(self): """Destructor""" pass - def adjust(self, exposure_times, sweep_times): + @abstractmethod + def adjust(self, exposure_times: dict[str, float], sweep_times: dict[str, float]) -> dict: """Adjust the galvo waveform to account for the camera readout time. Parameters @@ -243,6 +254,7 @@ def adjust(self, exposure_times, sweep_times): return self.waveform_dict - def turn_off(self): + @abstractmethod + def turn_off(self) -> None: """Turn off the galvo.""" pass diff --git a/src/navigate/model/devices/galvo/ni.py b/src/navigate/model/devices/galvo/ni.py index 64df7ac02..632b84e01 100644 --- a/src/navigate/model/devices/galvo/ni.py +++ b/src/navigate/model/devices/galvo/ni.py @@ -33,7 +33,7 @@ # Standard Library Imports import logging import traceback -from typing import Any, Dict +from typing import Any # Third Party Imports import nidaqmx @@ -56,7 +56,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the GalvoNI class. @@ -67,7 +67,7 @@ def __init__( Name of the microscope. device_connection : Any Connection to the NI DAQ device. - configuration : Dict[str, Any] + configuration : dict[str, Any] Dictionary of configuration parameters. device_id : int Galvo ID. Default is 0. @@ -95,7 +95,7 @@ def __str__(self) -> str: """Return string representation of the GalvoNI.""" return "GalvoNI" - def adjust(self, exposure_times, sweep_times) -> Dict[str, Any]: + def adjust(self, exposure_times: dict[str, float], sweep_times: dict[str, float]) -> dict[str, Any]: """Adjust the galvo to the readout time Parameters diff --git a/src/navigate/model/devices/galvo/synthetic.py b/src/navigate/model/devices/galvo/synthetic.py index 66084dea1..918f4b2b2 100644 --- a/src/navigate/model/devices/galvo/synthetic.py +++ b/src/navigate/model/devices/galvo/synthetic.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports @@ -53,7 +53,7 @@ def __init__( self, microscope_name: str, device_connection: Optional[Any], - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the SyntheticGalvo class. @@ -86,3 +86,23 @@ def __init__( def __str__(self) -> str: """Return string representation of the GalvoNI.""" return "SyntheticGalvo" + + def adjust(self, exposure_times: dict[str, float], sweep_times: dict[str, float]) -> dict[str, Any]: + """Adjust the galvo waveform to account for the camera readout time. + Parameters + ---------- + exposure_times : dict[str, float] + Dictionary of exposure times. + sweep_times : dict[str, float] + Dictionary of sweep times. + + Returns + ------- + dict[str, Any] + Adjusted exposure and sweep times. + """ + return super().adjust(exposure_times, sweep_times) + + def turn_off(self): + """Turn off the galvo.""" + pass diff --git a/src/navigate/model/devices/laser/asi.py b/src/navigate/model/devices/laser/asi.py index 36ea2da1d..a83254b79 100644 --- a/src/navigate/model/devices/laser/asi.py +++ b/src/navigate/model/devices/laser/asi.py @@ -32,9 +32,7 @@ # Standard Library Imports import logging -import time -import traceback -from typing import Any, Dict +from typing import Any # Third Party Imports @@ -56,7 +54,13 @@ class ASILaser(LaserBase, SerialDevice): This class is used to control a laser connected to a ASI Device. """ - def __init__(self, microscope_name, device_connection, configuration, device_id: int = 0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int, + ) -> None: """Initialize the ASILaser class. Parameters @@ -121,12 +125,12 @@ def __init__(self, microscope_name, device_connection, configuration, device_id: #: float: Current laser intensity. self._current_intensity = 0 - def __str__(self): + def __str__(self) -> str: """String representation of the class.""" return "ASILaser" @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect(cls, port: str, baudrate: int=115200, timeout: float=0.25) -> TigerController: """Build ASILaser Serial Port connection Parameters @@ -151,7 +155,7 @@ def connect(cls, port, baudrate=115200, timeout=0.25): raise Exception("ASI stage connection failed.") return tiger_controller - def set_power(self, laser_intensity: float): + def set_power(self, laser_intensity: float) -> None: """Sets the analog laser power. Parameters @@ -165,7 +169,7 @@ def set_power(self, laser_intensity: float): self.laser.move_axis(self.analog_axis, self.output_voltage) self._current_intensity = laser_intensity - def turn_on(self): + def turn_on(self) -> None: """Turns on the laser.""" if self.modulation_type == "mixed": self.set_power(self._current_intensity) @@ -181,7 +185,7 @@ def turn_on(self): logger.info(f"{str(self)} initialized with digital modulation.") - def turn_off(self): + def turn_off(self) -> None: """Turns off the laser.""" if self.modulation_type == "mixed": tmp = self._current_intensity @@ -201,7 +205,7 @@ def turn_off(self): logger.info(f"{str(self)} initialized with digital modulation.") - def close(self): + def close(self) -> None: """Close the ASI Laser serial port. Turns the laser off and then closes the port. diff --git a/src/navigate/model/devices/laser/base.py b/src/navigate/model/devices/laser/base.py index 70682b6ba..410e94b99 100644 --- a/src/navigate/model/devices/laser/base.py +++ b/src/navigate/model/devices/laser/base.py @@ -32,7 +32,8 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any +from abc import ABC, abstractmethod # Third Party Imports @@ -45,14 +46,14 @@ @log_initialization -class LaserBase: +class LaserBase(ABC): """Laser Base Class""" def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int, ) -> None: """Initialize Laser Base Class @@ -86,9 +87,12 @@ def __str__(self) -> str: """Return string representation of the class""" return "LaserBase" + @abstractmethod def set_power(self, laser_intensity: int) -> None: """Set laser power + This abstract method must be implemented by all subclasses. + Parameters ---------- laser_intensity : int @@ -96,12 +100,21 @@ def set_power(self, laser_intensity: int) -> None: """ pass + @abstractmethod def turn_on(self) -> None: - """Turn on the laser""" + """Turn on the laser + + This abstract method must be implemented by all subclasses. + """ pass + @abstractmethod def turn_off(self) -> None: - """Turn off the laser""" + """Turn off the laser + + This abstract method must be implemented by all subclasses. + """ + pass def close(self) -> None: @@ -109,7 +122,3 @@ def close(self) -> None: Close the laser before exit. """ pass - - def initialize_laser(self) -> None: - """Initialize lasers.""" - pass diff --git a/src/navigate/model/devices/laser/ni.py b/src/navigate/model/devices/laser/ni.py index a928c49ac..2fde0e24a 100644 --- a/src/navigate/model/devices/laser/ni.py +++ b/src/navigate/model/devices/laser/ni.py @@ -33,7 +33,7 @@ # Standard Library Imports import logging import traceback -from typing import Any, Dict +from typing import Any # Third Party Imports import nidaqmx @@ -61,7 +61,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int, ) -> None: """Initialize the LaserNI class. @@ -256,7 +256,7 @@ def close(self) -> None: except DaqError as e: logger.exception(e) - def __del__(self): + def __del__(self) -> None: """Delete the NI Task before exit.""" if self.laser_ao_task: try: diff --git a/src/navigate/model/devices/laser/synthetic.py b/src/navigate/model/devices/laser/synthetic.py index 5b5e05013..2972788ec 100644 --- a/src/navigate/model/devices/laser/synthetic.py +++ b/src/navigate/model/devices/laser/synthetic.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any # Third Party Imports @@ -53,7 +53,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int, ) -> None: """Initialize the SyntheticLaser class. @@ -75,8 +75,24 @@ def close(self) -> None: """Close the port before exit.""" pass - def initialize_laser(self) -> None: - """ - Initialize lasers. + def set_power(self, laser_intensity: int) -> None: + """Set the laser power. + + Parameters + ---------- + laser_intensity : int + The laser intensity to set. """ + logger.debug( + f"{self.microscope_name} - SyntheticLaser.set_power({laser_intensity})" + ) + self.laser_intensity = laser_intensity + + def turn_on(self) -> None: + """Turn on the laser""" + pass + + def turn_off(self) -> None: + """Turn off the laser""" + pass diff --git a/src/navigate/model/devices/remote_focus/asi.py b/src/navigate/model/devices/remote_focus/asi.py index 70408f067..04c4fb76b 100644 --- a/src/navigate/model/devices/remote_focus/asi.py +++ b/src/navigate/model/devices/remote_focus/asi.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any, Optional # Third Party Imports @@ -55,7 +55,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args, **kwargs, ) -> None: @@ -81,7 +81,7 @@ def __init__( self.axis = self.device_config["hardware"]["axis"] @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect(cls, port: str, baudrate: int=115200, timeout: float=0.25) -> TigerController: """Build ASILaser Serial Port connection Parameters @@ -106,7 +106,12 @@ def connect(cls, port, baudrate=115200, timeout=0.25): raise Exception("ASI stage connection failed.") return tiger_controller - def adjust(self, exposure_times, sweep_times, offset=None): + def adjust( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> dict[str, Any]: """Adjust the waveform. This method adjusts the waveform parameters. @@ -197,9 +202,9 @@ def adjust(self, exposure_times, sweep_times, offset=None): def triangle( self, - sweep_time=0.24, - amplitude=1, - offset=0, + sweep_time: float=0.24, + amplitude: float=1, + offset: float=0, ): """Sends the tiger controller commands to initiate the triangle wave. @@ -230,9 +235,9 @@ def triangle( def ramp( self, - exposure_time=0.2, - amplitude=1, - offset=0.5, + exposure_time: float=0.2, + amplitude: float=1, + offset: float=0.5, ): """Sends the tiger controller commands to initiate the ramp wave. @@ -271,7 +276,12 @@ def ramp( # The waveform cycles once and waits for another TTL inputs self.remote_focus.single_axis_mode(self.axis, 2) - def move(self, exposure_times, sweep_times, offset=None): + def move( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> None: """Move the remote focus. This method moves the remote focus. diff --git a/src/navigate/model/devices/remote_focus/base.py b/src/navigate/model/devices/remote_focus/base.py index 6e29da5f0..c8341a900 100644 --- a/src/navigate/model/devices/remote_focus/base.py +++ b/src/navigate/model/devices/remote_focus/base.py @@ -32,7 +32,8 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any, Optional +from abc import ABC, abstractmethod # Third Party Imports @@ -50,14 +51,14 @@ @log_initialization -class RemoteFocusBase: +class RemoteFocusBase(ABC): """RemoteFocusBase Class - Parent class for Remote Focusing Device.""" def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args, **kwargs, ) -> None: @@ -113,15 +114,42 @@ def __init__( #: dict: Waveform dictionary. self.waveform_dict = {} - def __str__(self): + def __str__(self) -> str: """String representation of the RemoteFocusBase class.""" return "RemoteFocusBase" - def __del__(self): + def __del__(self) -> None: """Destructor""" pass - def adjust(self, exposure_times, sweep_times, offset=None): + @abstractmethod + def move( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> None: + """Moves the remote focus device to the specified position. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + exposure_times : dict + Dictionary of exposure times for each selected channel + sweep_times : dict + Dictionary of sweep times for each selected channel + offset : float, optional + Offset value for the remote focus waveform, by default None + """ + pass + + def adjust( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> dict[str, Any]: """Adjusts the remote focus waveform based on the readout time. Parameters diff --git a/src/navigate/model/devices/remote_focus/equipment_solutions.py b/src/navigate/model/devices/remote_focus/equipment_solutions.py index 125e05fa4..a21d9ad3a 100644 --- a/src/navigate/model/devices/remote_focus/equipment_solutions.py +++ b/src/navigate/model/devices/remote_focus/equipment_solutions.py @@ -66,7 +66,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args, **kwargs, ) -> None: @@ -204,7 +204,7 @@ def read_bytes(self, num_bytes: int) -> bytes: received_bytes = self.serial.read(num_bytes) return received_bytes - def send_command(self, message: str): + def send_command(self, message: str) -> None: """Send write command to the RemoteFocusEquipmentSolutions device. Parameters diff --git a/src/navigate/model/devices/remote_focus/ni.py b/src/navigate/model/devices/remote_focus/ni.py index ddba5f36c..f170f43b2 100644 --- a/src/navigate/model/devices/remote_focus/ni.py +++ b/src/navigate/model/devices/remote_focus/ni.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any, Optional # Third Party Imports @@ -54,7 +54,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args, **kwargs, ) -> None: @@ -89,7 +89,12 @@ def __del__(self): Deletion of the NIDAQ task is handled by the NIDAQ object.""" pass - def adjust(self, exposure_times, sweep_times, offset=None): + def adjust( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> dict[str, Any]: """Adjust the waveform. This method adjusts the waveform. @@ -117,7 +122,12 @@ def adjust(self, exposure_times, sweep_times, offset=None): return waveform_dict - def move(self, exposure_times, sweep_times, offset=None): + def move( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> None: """Move the remote focus. This method moves the remote focus. diff --git a/src/navigate/model/devices/remote_focus/synthetic.py b/src/navigate/model/devices/remote_focus/synthetic.py index 2dd3f7fcb..543003782 100644 --- a/src/navigate/model/devices/remote_focus/synthetic.py +++ b/src/navigate/model/devices/remote_focus/synthetic.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict +from typing import Any, Optional # Third Party Imports @@ -53,7 +53,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args, **kwargs, ) -> None: @@ -71,8 +71,12 @@ def __init__( super().__init__(microscope_name, device_connection, configuration) pass - @staticmethod - def move(readout_time, offset=None): + def move( + self, + exposure_times: dict[str, float], + sweep_times: dict[str, float], + offset: Optional[float] = None, + ) -> None: """Moves the remote focus. This method moves the remote focus. @@ -84,4 +88,4 @@ def move(readout_time, offset=None): offset : float The offset of the signal in volts. """ - logger.debug(f"Remote focus offset and readout time: {offset}, {readout_time}") + logger.debug(f"Remote focus offset: {offset}.") diff --git a/src/navigate/model/devices/shutter/asi.py b/src/navigate/model/devices/shutter/asi.py index c4e51d304..791a9dc21 100644 --- a/src/navigate/model/devices/shutter/asi.py +++ b/src/navigate/model/devices/shutter/asi.py @@ -33,7 +33,7 @@ # Standard Library Imports import logging import traceback -from typing import Any, Dict +from typing import Any # Local Imports from navigate.model.devices.shutter.base import ShutterBase @@ -59,7 +59,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], address=None, ) -> None: """Initialize the ASIShutter. @@ -92,7 +92,7 @@ def __init__( ]["hardware"]["port"] @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect(cls, port: str, baudrate: int=115200, timeout: float=0.25) -> TigerController: """Build ASILaser Serial Port connection Parameters diff --git a/src/navigate/model/devices/shutter/base.py b/src/navigate/model/devices/shutter/base.py index 87fd274ef..f31a64c38 100644 --- a/src/navigate/model/devices/shutter/base.py +++ b/src/navigate/model/devices/shutter/base.py @@ -32,7 +32,8 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional +from abc import ABC, abstractmethod # Third Party Imports @@ -45,14 +46,14 @@ @log_initialization -class ShutterBase: +class ShutterBase(ABC): """ShutterBase Class - Parent class for the laser shutters.""" def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -87,13 +88,21 @@ def __del__(self) -> None: """Close the Shutter at exit.""" pass + @abstractmethod def open_shutter(self) -> None: - """Open the Shutter.""" - self.shutter_state = True + """Open the Shutter. + This abstract method must be implemented by all subclasses. + """ + pass + + @abstractmethod def close_shutter(self) -> None: - """Close the Shutter.""" - self.shutter_state = False + """Close the Shutter. + + This abstract method must be implemented by all subclasses. + """ + pass @property def state(self) -> bool: diff --git a/src/navigate/model/devices/shutter/ni.py b/src/navigate/model/devices/shutter/ni.py index 3077f59a2..b4edd50ca 100644 --- a/src/navigate/model/devices/shutter/ni.py +++ b/src/navigate/model/devices/shutter/ni.py @@ -33,7 +33,7 @@ # Standard Library Imports import logging import traceback -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports import nidaqmx @@ -61,7 +61,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -99,7 +99,7 @@ def __del__(self): logger.exception(f"Error stopping task: {traceback.format_exc()}") - def open_shutter(self): + def open_shutter(self) -> None: """Open the shutter""" #: bool: Shutter state self.shutter_state = True @@ -116,7 +116,7 @@ def open_shutter(self): ) logger.debug(e) - def close_shutter(self): + def close_shutter(self) -> None: """Close the shutter""" self.shutter_state = False try: @@ -133,7 +133,7 @@ def close_shutter(self): logger.debug(e) @property - def state(self): + def state(self) -> bool: """Return the state of both shutters Returns diff --git a/src/navigate/model/devices/shutter/synthetic.py b/src/navigate/model/devices/shutter/synthetic.py index b0f7afbc0..cf9a690d3 100644 --- a/src/navigate/model/devices/shutter/synthetic.py +++ b/src/navigate/model/devices/shutter/synthetic.py @@ -33,7 +33,7 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports @@ -54,7 +54,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -70,3 +70,11 @@ def __init__( Global configuration of the microscope """ super().__init__(microscope_name, device_connection, configuration) + + def open_shutter(self) -> None: + """Open the Shutter.""" + self.shutter_state = True + + def close_shutter(self) -> None: + """Close the Shutter.""" + self.shutter_state = False diff --git a/src/navigate/model/devices/stage/asi.py b/src/navigate/model/devices/stage/asi.py index 0a1519028..2c4563892 100644 --- a/src/navigate/model/devices/stage/asi.py +++ b/src/navigate/model/devices/stage/asi.py @@ -32,7 +32,7 @@ # Standard Imports import logging import time -from typing import Any, Dict +from typing import Any # Third Party Imports @@ -73,7 +73,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ): """Initialize the ASI Stage connection. @@ -154,7 +154,7 @@ def __init__( # Speed optimizations - Set speed to 90% of maximum on each axis self.set_speed(percent=0.9) - def __del__(self): + def __del__(self) -> None: """Delete the ASI Stage connection.""" try: if self.asi_controller is not None: @@ -165,7 +165,9 @@ def __del__(self): raise @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect( + cls, port: str, baudrate: int = 115200, timeout: float = 0.25 + ) -> TigerController: """Connect to the ASI Stage Parameters @@ -192,7 +194,7 @@ def connect(cls, port, baudrate=115200, timeout=0.25): return asi_stage - def get_axis_position(self, axis): + def get_axis_position(self, axis: str) -> float: """Get position of specific axis Parameters @@ -214,7 +216,7 @@ def get_axis_position(self, axis): return float("inf") return pos - def report_position(self): + def report_position(self) -> dict[str, float]: """Reports the position for all axes in microns, and create position dictionary. @@ -237,7 +239,9 @@ def report_position(self): return self.get_position_dict() - def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """Move stage along a single axis. Move absolute command for ASI is MOVE [Axis]=[units 1/10 microns] @@ -285,7 +289,7 @@ def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): self.asi_controller.wait_for_device() return True - def verify_move(self, move_dictionary): + def verify_move(self, move_dictionary: dict[str, float]) -> dict[str, float]: """Don't submit a move command for axes that aren't moving. The Tiger controller wait time for each axis is additive. @@ -310,7 +314,9 @@ def verify_move(self, move_dictionary): res_dict[axis] = val return res_dict - def move_absolute(self, move_dictionary, wait_until_done=False): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: """Move Absolute Method. XYZ Values should remain in microns for the ASI API @@ -335,7 +341,7 @@ def move_absolute(self, move_dictionary, wait_until_done=False): return False abs_pos_dict = self.verify_move(abs_pos_dict) if len(abs_pos_dict) == 0: - return + return False # This is to account for the asi 1/10 of a micron units pos_dict = { @@ -356,7 +362,7 @@ def move_absolute(self, move_dictionary, wait_until_done=False): return True - def stop(self): + def stop(self) -> None: """Stop all stage movement abruptly.""" try: self.asi_controller.stop() @@ -364,7 +370,9 @@ def stop(self): print(f"ASI stage halt command failed: {e}") logger.exception("ASI Stage Exception", e) - def set_speed(self, velocity_dict=None, percent=None): + def set_speed( + self, velocity_dict: dict[str, float] = None, percent: float = None + ) -> bool: """Set scan velocity. Parameters @@ -396,7 +404,7 @@ def set_speed(self, velocity_dict=None, percent=None): return False return True - def get_speed(self, axis): + def get_speed(self, axis: str) -> float: """Get scan velocity of the axis. Parameters @@ -418,7 +426,13 @@ def get_speed(self, axis): return 0 return velocity - def scanr(self, start_position_mm, end_position_mm, enc_divide, axis="z"): + def scanr( + self, + start_position_mm: float, + end_position_mm: float, + enc_divide: float, + axis: str = "z", + ) -> bool: """Set scan range Parameters @@ -454,8 +468,13 @@ def scanr(self, start_position_mm, end_position_mm, enc_divide, axis="z"): return True def scanv( - self, start_position_mm, end_position_mm, number_of_lines, overshoot, axis="z" - ): + self, + start_position_mm: float, + end_position_mm: float, + number_of_lines: int, + overshoot: float, + axis: str = "z", + ) -> bool: """Set scan range Parameters @@ -491,7 +510,7 @@ def scanv( return False return True - def start_scan(self, axis): + def start_scan(self, axis: str) -> bool: """Start scan state machine Parameters @@ -516,14 +535,14 @@ def start_scan(self, axis): return False return True - def stop_scan(self): + def stop_scan(self) -> None: """Stop scan""" try: self.asi_controller.stop_scan() except ASIException as e: logger.exception("ASI Stage Exception", e) - def wait_until_complete(self, axis): + def wait_until_complete(self, axis: str) -> bool: try: while self.asi_controller.is_axis_busy(axis): time.sleep(0.1) @@ -551,7 +570,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ): """Initialize the ASI Stage connection. @@ -640,7 +659,9 @@ def __init__( self.set_speed(percent=0.9) @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect( + cls, port: str, baudrate: int = 115200, timeout: float = 0.25 + ) -> TigerController: """Connect to the ASI Stage Parameters @@ -670,7 +691,9 @@ def connect(cls, port, baudrate=115200, timeout=0.25): return asi_stage - def move_axis_relative(self, axis, distance, wait_until_done=False): + def move_axis_relative( + self, axis: str, distance: float, wait_until_done: bool = False + ) -> bool: """Move the stage relative to the current position along the specified axis. XYZ Values should remain in microns for the ASI API Theta Values are not accepted. @@ -719,8 +742,12 @@ def move_axis_relative(self, axis, distance, wait_until_done=False): return True def scan_axis_triggered_move( - self, start_position, end_position, axis, ttl_triggered=False - ): + self, + start_position: float, + end_position: float, + axis: str, + ttl_triggered: bool = False, + ) -> bool: """Move the stage along the specified axis from start position to end position, with optional TTL triggering. @@ -776,7 +803,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ): """Initialize the ASI Stage connection. @@ -795,7 +822,9 @@ def __init__( super().__init__(microscope_name, device_connection, configuration, device_id) @classmethod - def connect(cls, port, baudrate=115200, timeout=0.25): + def connect( + cls, port: str, baudrate: int = 115200, timeout: float = 0.25 + ) -> TigerController: """Connect to the ASI Stage Parameters diff --git a/src/navigate/model/devices/stage/base.py b/src/navigate/model/devices/stage/base.py index d4a2410d8..7d9dfeff9 100644 --- a/src/navigate/model/devices/stage/base.py +++ b/src/navigate/model/devices/stage/base.py @@ -31,10 +31,9 @@ # Standard Imports import logging - -# from idlelib.debugger_r import DictProxy from multiprocessing.managers import ListProxy -from typing import Any, Dict +from typing import Any +from abc import ABC, abstractmethod # Third Party Imports @@ -47,14 +46,14 @@ @log_initialization -class StageBase: +class StageBase(ABC): """Stage Parent Class""" def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the stage. @@ -115,7 +114,10 @@ def __init__( """ for ax in self.axes: setattr(self, f"{ax}_pos", 0) - if f"{ax}_min" not in stage_configuration or f"{ax}_max" not in stage_configuration: + if ( + f"{ax}_min" not in stage_configuration + or f"{ax}_max" not in stage_configuration + ): logger.warning(f"Stage {ax} limits not set in configuration file.") setattr(self, f"{ax}_min", stage_configuration.get(f"{ax}_min", -10000)) setattr(self, f"{ax}_max", stage_configuration.get(f"{ax}_max", 10000)) @@ -126,15 +128,85 @@ def __init__( #: stage_configuration self.stage_configuration = stage_configuration - def __del__(self): + def __del__(self) -> None: """Destructor for the StageBase class.""" pass - def __str__(self): + def __str__(self) -> str: """Return a string representation of the stage.""" return "StageBase" - def get_position_dict(self): + @abstractmethod + def report_position(self) -> dict[str, float]: + """Reports the position for all axes, and create position dictionary. + + This abstract method must be implemented by all subclasses. + + Returns + ------- + position_dict : dict + Dictionary containing the position of all axes + """ + pass + + @abstractmethod + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done=False + ) -> bool: + """Move the stage to an absolute position. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + move_dictionary : dict + A dictionary of values required for movement. + Includes 'x_abs', 'y_abs', etc. for one or more axes. + Expect values in micrometers, except for theta, which is in degrees. + wait_until_done : bool, optional + If True, wait until the move is complete before returning. Default is False. + + Returns + ------- + bool + True if the move was successful, False otherwise. + """ + pass + + @abstractmethod + def move_axis_absolute( + self, axis: str, position: float, wait_until_done=False + ) -> bool: + """Move a single axis to an absolute position. + + This abstract method must be implemented by all subclasses. + + Parameters + ---------- + axis : str + The axis to move (e.g., 'x', 'y', 'z', 'f', 'theta'). + position : float + The absolute position to move to in micrometers (or degrees for theta). + wait_until_done : bool, optional + If True, wait until the move is complete before returning. Default is False. + + Returns + ------- + bool + True if the move was successful, False otherwise. + """ + pass + + @abstractmethod + def stop(self) -> None: + """Stop all stage movement abruptly. + + This abstract method must be implemented by all subclasses. + + """ + pass + + def get_position_dict(self) -> dict[str, float]: """Return a dictionary with the saved stage positions. Returns @@ -148,7 +220,7 @@ def get_position_dict(self): position_dict[ax_str] = getattr(self, ax_str) return position_dict - def get_abs_position(self, axis, axis_abs): + def get_abs_position(self, axis: str, axis_abs: float) -> float: """Ensure the requested position is within axis bounds and return it. Parameters @@ -192,7 +264,7 @@ def get_abs_position(self, axis, axis_abs): return -1e50 return axis_abs - def verify_abs_position(self, move_dictionary, is_strict=False): + def verify_abs_position(self, move_dictionary: dict, is_strict=False) -> dict: """Ensure the requested moving positions are within axes bounds Parameters @@ -233,15 +305,13 @@ def verify_abs_position(self, move_dictionary, is_strict=False): return {} return abs_pos_dict - def update_limits(self): + def update_limits(self) -> None: for ax in self.axes: - setattr(self, f"{ax}_min", self.stage_configuration.get(f"{ax}_min", -10000)) + setattr( + self, f"{ax}_min", self.stage_configuration.get(f"{ax}_min", -10000) + ) setattr(self, f"{ax}_max", self.stage_configuration.get(f"{ax}_max", 10000)) - def stop(self): - """Stop all stage movement abruptly.""" - pass - - def close(self): + def close(self) -> None: """Close the stage.""" pass diff --git a/src/navigate/model/devices/stage/mcl.py b/src/navigate/model/devices/stage/mcl.py index a00603109..072da17ef 100644 --- a/src/navigate/model/devices/stage/mcl.py +++ b/src/navigate/model/devices/stage/mcl.py @@ -33,6 +33,7 @@ import importlib import logging import time +from typing import Any, Optional # Third Party Imports @@ -46,12 +47,17 @@ logger = logging.getLogger(p) - @log_initialization class MCLStage(StageBase, IntegratedDevice): """Mad City Lab stage class.""" - def __init__(self, microscope_name, device_connection, configuration, device_id=0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ): """Initialize the MCL stage. Parameters @@ -86,7 +92,7 @@ def __init__(self, microscope_name, device_connection, configuration, device_id= axis: axes_mapping[axis] for axis in self.axes if axis in axes_mapping } - def __del__(self): + def __del__(self) -> None: """Close the connection to the stage.""" try: self.mcl_controller.MCL_ReleaseHandle(self.handle) @@ -94,7 +100,7 @@ def __del__(self): logger.exception(f"{e}") @classmethod - def get_connect_params(cls): + def get_connect_params(cls) -> list[str]: """Register the parameters required to connect to the stage. Returns @@ -105,7 +111,7 @@ def get_connect_params(cls): return ["serial_number"] @classmethod - def connect(cls, serial_number: int): + def connect(cls, serial_number: int) -> dict[str, Any]: """Build a connection to the Mad City Lab stage. Parameters @@ -118,7 +124,9 @@ def connect(cls, serial_number: int): stage_connection : dict Dictionary containing the connection information for the stage. """ - mcl_controller = importlib.import_module("navigate.model.devices.APIs.mcl.madlib") + mcl_controller = importlib.import_module( + "navigate.model.devices.APIs.mcl.madlib" + ) # Initialize mcl_controller.MCL_GrabAllHandles() @@ -129,7 +137,7 @@ def connect(cls, serial_number: int): return stage_connection - def report_position(self): + def report_position(self) -> dict[str, float]: """Report the position of the stage. Reports the position of the stage for all axes, and creates the hardware @@ -147,7 +155,9 @@ def report_position(self): return self.get_position_dict() - def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """Implement movement logic along a single axis. Example calls: @@ -185,7 +195,9 @@ def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): return False return True - def move_absolute(self, move_dictionary, wait_until_done=False): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: """Move the stage to an absolute position. Parameters @@ -212,3 +224,7 @@ def move_absolute(self, move_dictionary, wait_until_done=False): result = result and success return result + + def stop(self) -> None: + """Stop all motion of the stage.""" + pass diff --git a/src/navigate/model/devices/stage/ni.py b/src/navigate/model/devices/stage/ni.py index e48e11794..1a81af1fc 100644 --- a/src/navigate/model/devices/stage/ni.py +++ b/src/navigate/model/devices/stage/ni.py @@ -35,7 +35,7 @@ import traceback from multiprocessing.managers import ListProxy import time -from typing import Any, Dict +from typing import Any, Optional # Third Party Imports import numpy as np @@ -66,7 +66,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the Galvo Stage. @@ -166,7 +166,7 @@ def __init__( self.switch_mode("normal") # for stacking, we could have 2 axis here or not, y is for tiling, not necessary - def report_position(self): + def report_position(self) -> dict[str, float]: """Reports the position for all axes, and create position dictionary. Returns @@ -176,7 +176,7 @@ def report_position(self): """ return self.get_position_dict() - def update_waveform(self, waveform_dict): + def update_waveform(self, waveform_dict: dict[str, np.ndarray]) -> bool: """Update the waveform for the stage. Parameters @@ -218,7 +218,9 @@ def update_waveform(self, waveform_dict): } return True - def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """Implement movement logic along a single axis. Parameters @@ -268,7 +270,9 @@ def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): return True - def move_absolute(self, move_dictionary, wait_until_done=True): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = True + ) -> bool: """Move Absolute Method. Parameters @@ -291,11 +295,16 @@ def move_absolute(self, move_dictionary, wait_until_done=True): axis = list(abs_pos_dict.keys())[0] return self.move_axis_absolute(axis, abs_pos_dict[axis], wait_until_done) - def stop(self): + def stop(self) -> None: """Stop all stage movement abruptly.""" pass - def switch_mode(self, mode="normal", exposure_times=None, sweep_times=None): + def switch_mode( + self, + mode: str = "normal", + exposure_times: Optional[dict] = None, + sweep_times: Optional[dict] = None, + ): """Switch Galvo stage working mode. Parameters diff --git a/src/navigate/model/devices/stage/pi.py b/src/navigate/model/devices/stage/pi.py index b99e6cce6..12554c0f0 100644 --- a/src/navigate/model/devices/stage/pi.py +++ b/src/navigate/model/devices/stage/pi.py @@ -33,7 +33,7 @@ # Standard Imports import logging import time -from typing import Any, Dict +from typing import Any, Optional # Third Party Imports from pipython import GCSDevice, pitools, GCSError @@ -56,7 +56,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ): """ @@ -114,9 +114,9 @@ def __del__(self) -> None: print("Error while disconnecting the PI stage") logger.exception(f"Error while disconnecting the PI stage - {e}") raise e - + @classmethod - def get_connect_params(cls): + def get_connect_params(cls) -> list[str]: """Register the parameters required to connect to the stage. Returns @@ -127,7 +127,13 @@ def get_connect_params(cls): return ["controllername", "serial_number", "stages", "refmode"] @classmethod - def connect(cls, controller_name, serial_number, stages, reference_modes): + def connect( + cls, + controller_name: str, + serial_number: str, + stages: list[str], + reference_modes: list[str], + ) -> dict[str, Any]: """Connect to the Physik Instrumente (PI) Stage Parameters @@ -166,7 +172,7 @@ def connect(cls, controller_name, serial_number, stages, reference_modes): stage_connection = {"pi_tools": pi_tools, "pi_device": pi_device} return stage_connection - def report_position(self): + def report_position(self) -> dict[str, float]: """Reports the position for all axes, and create position dictionary. Positions from Physik Instrumente device are in millimeters @@ -194,7 +200,9 @@ def report_position(self): return self.get_position_dict() - def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """Move stage along a single axis. Parameters @@ -234,7 +242,9 @@ def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): return True - def move_absolute(self, move_dictionary, wait_until_done=False): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: """Move Absolute Method. XYZF Values are converted to millimeters for PI API. @@ -259,9 +269,9 @@ def move_absolute(self, move_dictionary, wait_until_done=False): return False pos_dict = { - self.axes_mapping[axis]: abs_pos_dict[axis] / 1000 - if axis != "theta" - else abs_pos_dict[axis] + self.axes_mapping[axis]: ( + abs_pos_dict[axis] / 1000 if axis != "theta" else abs_pos_dict[axis] + ) for axis in abs_pos_dict } @@ -275,14 +285,14 @@ def move_absolute(self, move_dictionary, wait_until_done=False): return self.wait_on_target(axes=list(pos_dict.keys())) return True - def stop(self): + def stop(self) -> None: """Stop all stage movement abruptly.""" try: self.pi_device.STP(noraise=True) except GCSError as e: logger.exception(f"Stage stop failed - {e}") - def wait_on_target(self, axes=None): + def wait_on_target(self, axes: Optional[Any] = None): """Wait on target Parameters diff --git a/src/navigate/model/devices/stage/sutter.py b/src/navigate/model/devices/stage/sutter.py index 080003d0d..c41086b50 100644 --- a/src/navigate/model/devices/stage/sutter.py +++ b/src/navigate/model/devices/stage/sutter.py @@ -34,7 +34,7 @@ import time # from idlelib.debugger_r import DictProxy -from typing import Any, Dict +from typing import Any # Third-Party Imports from serial import SerialException @@ -58,7 +58,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ) -> None: """Initialize the MP285Stage. @@ -152,7 +152,9 @@ def __del__(self) -> None: self.close() @classmethod - def connect(cls, port: str, baud_rate: int=115200, timeout: float=0.25) -> MP285: + def connect( + cls, port: str, baud_rate: int = 115200, timeout: float = 0.25 + ) -> MP285: """Connect to the MP285Stage.""" try: mp285_stage = MP285(port, baud_rate, timeout) @@ -163,9 +165,8 @@ def connect(cls, port: str, baud_rate: int=115200, timeout: float=0.25) -> MP285 raise UserWarning( "Could not communicate with Sutter MP-285 via COMPORT", port ) - - def report_position(self) -> dict: + def report_position(self) -> dict[str, float]: """Reports the position for all axes, and creates a position dictionary. Positions from the MP-285 are converted to microns. @@ -204,7 +205,7 @@ def report_position(self) -> dict: return position def move_axis_absolute( - self, axis: str, abs_pos: float, wait_until_done=False + self, axis: str, abs_pos: float, wait_until_done: bool = False ) -> bool: """Implement movement logic along a single axis. @@ -225,7 +226,9 @@ def move_axis_absolute( move_dictionary = {f"{axis}_abs": abs_pos} return self.move_absolute(move_dictionary, wait_until_done) - def move_absolute(self, move_dictionary: dict, wait_until_done=True) -> bool: + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = True + ) -> bool: """Move stage along a single axis. Parameters diff --git a/src/navigate/model/devices/stage/synthetic.py b/src/navigate/model/devices/stage/synthetic.py index dbc2fd930..872b0d60b 100644 --- a/src/navigate/model/devices/stage/synthetic.py +++ b/src/navigate/model/devices/stage/synthetic.py @@ -32,7 +32,7 @@ # Standard Imports import logging import time -from typing import Any, Dict +from typing import Any from typing import Optional @@ -55,7 +55,7 @@ def __init__( self, microscope_name: str, device_connection: Optional[Any], - configuration: Dict[str, Any], + configuration: dict[str, Any], device_id: int = 0, ): """Initialize the stage. @@ -92,7 +92,7 @@ def __del__(self) -> None: """Destructor.""" pass - def report_position(self): + def report_position(self) -> dict[str, float]: """Report the current position of the stage. Returns @@ -102,7 +102,9 @@ def report_position(self): """ return self.get_position_dict() - def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """Implement movement logic along a single axis. Parameters @@ -130,7 +132,9 @@ def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): setattr(self, f"{axis}_pos", axis_abs) return True - def move_absolute(self, move_dictionary, wait_until_done=False): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: """Move stage along a single axis. Parameters @@ -176,7 +180,7 @@ def unload_sample(self): """ self.y_pos = self.y_unload_position - def get_axis_position(self, axis): + def get_axis_position(self, axis: str) -> float: """Get the current position of the stage along a single axis. Parameters @@ -191,7 +195,7 @@ def get_axis_position(self, axis): """ return getattr(self, f"{axis}_pos") - def set_speed(self, velocity_dict): + def set_speed(self, velocity_dict: dict[str, float]) -> None: """Set the speed of the stage. Parameters @@ -201,7 +205,7 @@ def set_speed(self, velocity_dict): """ pass - def get_speed(self, axis): + def get_speed(self, axis: str) -> float: """Get the speed of the stage. Parameters @@ -217,7 +221,13 @@ def get_speed(self, axis): return 1 - def scanr(self, start_position_mm, end_position_mm, enc_divide, axis="z"): + def scanr( + self, + start_position_mm: float, + end_position_mm: float, + enc_divide: float, + axis: str = "z", + ) -> None: """Scan the stage using the constant velocity mode along a single axis. Parameters @@ -235,7 +245,7 @@ def scanr(self, start_position_mm, end_position_mm, enc_divide, axis="z"): pass - def start_scan(self, axis): + def start_scan(self, axis: str) -> None: """Start a scan along a single axis. Parameters @@ -246,10 +256,14 @@ def start_scan(self, axis): """ pass - def stop_scan(self): + def stop_scan(self) -> None: """Stop a scan.""" pass - def update_waveform(self, waveform_dict): + def update_waveform(self, waveform_dict: dict[str, Any]) -> None: print("*** update waveform:", waveform_dict.keys()) pass + + def stop(self) -> None: + """Stop all stage movement abruptly.""" + pass diff --git a/src/navigate/model/devices/stage/thorlabs.py b/src/navigate/model/devices/stage/thorlabs.py index 0b00e1501..2f371ae89 100644 --- a/src/navigate/model/devices/stage/thorlabs.py +++ b/src/navigate/model/devices/stage/thorlabs.py @@ -34,6 +34,7 @@ import logging import time from multiprocessing.managers import ListProxy +from typing import Any, Optional # Local Imports from navigate.model.devices.stage.base import StageBase @@ -49,7 +50,13 @@ class KIM001Stage(StageBase, IntegratedDevice): """Thorlabs KIM Stage""" - def __init__(self, microscope_name, device_connection, configuration, device_id=0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the stage. Parameters @@ -91,7 +98,7 @@ def __init__(self, microscope_name, device_connection, configuration, device_id= else: self.serial_number = device_config["serial_number"] - def __del__(self): + def __del__(self) -> None: """Delete the KIM Connection""" try: self.stop() @@ -100,7 +107,7 @@ def __del__(self): logger.exception(e) @classmethod - def get_connect_params(cls): + def get_connect_params(cls) -> list[str]: """Register the parameters required to connect to the stage. Returns @@ -111,7 +118,7 @@ def get_connect_params(cls): return ["serial_number"] @classmethod - def connect(cls, serial_number: str): + def connect(cls, serial_number: str) -> Any: """Connect to the Thorlabs KIM Stage Parameters @@ -134,7 +141,9 @@ def connect(cls, serial_number: str): # Open the same serial number device if there are several devices connected to the # computer available_serialnum = kim_controller.TLI_GetDeviceListExt() - if not list(filter(lambda s: str(s) == str(serial_number), available_serialnum)): + if not list( + filter(lambda s: str(s) == str(serial_number), available_serialnum) + ): print( f"** Please make sure Thorlabs stage with serial number {serial_number} " f"is connected to the computer!" @@ -143,7 +152,7 @@ def connect(cls, serial_number: str): kim_controller.KIM_Open(str(serial_number)) return kim_controller - def report_position(self): + def report_position(self) -> dict[str, float]: """Report the position of the stage. Reports the position of the stage for all axes, and creates the hardware @@ -169,7 +178,9 @@ def report_position(self): return self.get_position_dict() - def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """Implement movement logic along a single axis. Parameters @@ -213,7 +224,9 @@ def move_axis_absolute(self, axis, abs_pos, wait_until_done=False): return False return True - def move_absolute(self, move_dictionary, wait_until_done=False): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: """Move stage along a single axis. Parameters @@ -244,16 +257,23 @@ def move_absolute(self, move_dictionary, wait_until_done=False): return result - def stop(self): + def stop(self) -> None: """Stop all stage channels move""" for i in self.kim_axes: self.kim_controller.KIM_MoveStop(self.serial_number, i) + @log_initialization class KST101Stage(StageBase): """Thorlabs KST Stage""" - def __init__(self, microscope_name, device_connection, configuration, device_id=0): + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: """Initialize the stage. Parameters @@ -308,7 +328,7 @@ def __del__(self): logger.exception(e) @classmethod - def get_connect_params(cls): + def get_connect_params(cls) -> list[str]: """Register the parameters required to connect to the stage. Returns @@ -319,7 +339,7 @@ def get_connect_params(cls): return ["serial_number"] @classmethod - def connect(cls, serial_number): + def connect(cls, serial_number: str) -> Any: """Connect to the Thorlabs KST Stage Parameters @@ -342,7 +362,9 @@ def connect(cls, serial_number): # Open the same serial number device if there are several devices connected to the # computer available_serial_numbers = kst_controller.TLI_GetDeviceListExt() - if not list(filter(lambda s: str(s) == str(serial_number), available_serial_numbers)): + if not list( + filter(lambda s: str(s) == str(serial_number), available_serial_numbers) + ): print( f"** Please make sure Thorlabs stage with serial number {serial_number} " f"is connected to the computer!" @@ -351,7 +373,7 @@ def connect(cls, serial_number): kst_controller.KST_Open(str(serial_number)) return kst_controller - def report_position(self): + def report_position(self) -> dict[str, float]: """ Report the position of the stage. @@ -378,7 +400,9 @@ def report_position(self): return self.get_position_dict() - def move_axis_absolute(self, axes, abs_pos, wait_until_done=False): + def move_axis_absolute( + self, axes: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: """ Implement movement. @@ -419,7 +443,9 @@ def move_axis_absolute(self, axes, abs_pos, wait_until_done=False): return False return True - def move_absolute(self, move_dictionary, wait_until_done=False): + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: """Move stage along a single axis. Parameters @@ -445,7 +471,7 @@ def move_absolute(self, move_dictionary, wait_until_done=False): return result - def move_to_position(self, position, wait_until_done=False): + def move_to_position(self, position: float, wait_until_done: bool = False) -> bool: """Perform a move to position Parameters @@ -479,12 +505,12 @@ def move_to_position(self, position, wait_until_done=False): else: return True - def run_homing(self): + def run_homing(self) -> None: """Run homing sequence.""" self.kst_controller.KST_HomeDevice(self.serial_number) self.move_to_position(12.5, wait_until_done=True) - def stop(self): + def stop(self) -> None: """ Stop all stage channels move """ diff --git a/src/navigate/model/devices/zoom/base.py b/src/navigate/model/devices/zoom/base.py index 6488d80f2..e5dd936dc 100644 --- a/src/navigate/model/devices/zoom/base.py +++ b/src/navigate/model/devices/zoom/base.py @@ -32,6 +32,8 @@ # Standard Library Imports import logging +from abc import ABC, abstractmethod +from typing import Any, Optional # Third Party Imports @@ -44,10 +46,17 @@ @log_initialization -class ZoomBase: +class ZoomBase(ABC): """ZoomBase parent class.""" - def __init__(self, microscope_name, device_controller, configuration, *args, **kwargs): + def __init__( + self, + microscope_name: str, + device_controller: Any, + configuration: dict, + *args: Optional[Any], + **kwargs: Optional[Any], + ) -> None: """Initialize the parent zoom class. Parameters @@ -80,7 +89,7 @@ def __str__(self) -> str: """Return the string representation of the ZoomBase object.""" return "ZoomBase" - def build_stage_dict(self): + def build_stage_dict(self) -> None: """ Construct a dictionary of stage offsets in between different zoom values. @@ -122,7 +131,8 @@ def build_stage_dict(self): focus_target - focus_curr ) - def set_zoom(self, zoom, wait_until_done=False): + @abstractmethod + def set_zoom(self, zoom: dict, wait_until_done=False) -> None: """Change the microscope zoom. Confirms tha the zoom position is available in the zoomdict @@ -142,7 +152,7 @@ def set_zoom(self, zoom, wait_until_done=False): logger.error(f"Zoom designation, {zoom}, not in the configuration") raise ValueError("Zoom designation not in the configuration") - def move(self, position=0, wait_until_done=False): + def move(self, position: int, wait_until_done: bool = False) -> None: """Move the Zoom Servo Parameters @@ -154,7 +164,7 @@ def move(self, position=0, wait_until_done=False): """ pass - def read_position(self): + def read_position(self) -> int: """Read the position of the Zoom Servo Returns diff --git a/src/navigate/model/devices/zoom/dynamixel.py b/src/navigate/model/devices/zoom/dynamixel.py index 3f58d0db1..b08c7252b 100644 --- a/src/navigate/model/devices/zoom/dynamixel.py +++ b/src/navigate/model/devices/zoom/dynamixel.py @@ -32,6 +32,7 @@ # Standard Library Imports import logging import time +from typing import Any, Optional # Third Party Imports @@ -50,7 +51,14 @@ class DynamixelZoom(ZoomBase, SerialDevice): """DynamixelZoom Class - Controls the Dynamixel Servo.""" - def __init__(self, microscope_name, device_connection, configuration, *args, **kwargs): + def __init__( + self, + microscope_name: str, + device_connection, + configuration: dict, + *args: Optional[Any], + **kwargs: Optional[Any], + ) -> None: """Initialize the DynamixelZoom Servo. Parameters @@ -112,7 +120,7 @@ def __init__(self, microscope_name, device_connection, configuration, *args, **k #: obj: DynamixelZoom port number. self.port_num = device_connection - def __del__(self): + def __del__(self) -> None: """Delete the DynamixelZoom Instance""" try: self.dynamixel.closePort(self.port_num) @@ -120,7 +128,9 @@ def __del__(self): logger.exception(e) @classmethod - def connect(cls, comport: str, baudrate: int = 115200, timeout: float = 0.25): + def connect( + cls, comport: str, baudrate: int = 115200, timeout: float = 0.25 + ) -> int: """Connect to the DynamixelZoom Servo. Parameters @@ -151,7 +161,7 @@ def connect(cls, comport: str, baudrate: int = 115200, timeout: float = 0.25): dynamixel.setBaudRate(port_num, baudrate) return port_num - def set_zoom(self, zoom, wait_until_done=False): + def set_zoom(self, zoom: dict, wait_until_done: bool = False) -> None: """Change the DynamixelZoom Servo. Confirms that the zoom position is available in the zoomdict, and then @@ -178,7 +188,7 @@ def set_zoom(self, zoom, wait_until_done=False): logger.error(f"Zoom designation, {zoom}, not in the configuration") raise ValueError("Zoom designation not in the configuration") - def move(self, position, wait_until_done=False): + def move(self, position: int, wait_until_done: bool = False) -> None: """Move the DynamixelZoom Servo Parameters @@ -233,7 +243,7 @@ def move(self, position, wait_until_done=False): self.port_num, 1, self.id, self.addr_mx_present_position ) - def read_position(self): + def read_position(self) -> int: """Read the position of the Zoom Servo. Returned position is an int between 0 and 4096. diff --git a/src/navigate/model/devices/zoom/synthetic.py b/src/navigate/model/devices/zoom/synthetic.py index 9016e280c..1147b4731 100644 --- a/src/navigate/model/devices/zoom/synthetic.py +++ b/src/navigate/model/devices/zoom/synthetic.py @@ -32,7 +32,7 @@ # Standard Library Imports import logging -from typing import Any, Dict, Optional +from typing import Any, Optional # Third Party Imports @@ -53,7 +53,7 @@ def __init__( self, microscope_name: str, device_connection: Any, - configuration: Dict[str, Any], + configuration: dict[str, Any], *args: Optional[Any], **kwargs: Optional[Any], ) -> None: @@ -73,3 +73,7 @@ def __init__( def __del__(self) -> None: """Delete the SyntheticZoom Servo.""" pass + + def set_zoom(self, zoom: dict, wait_until_done=False) -> None: + """Set the zoom position.""" + super().set_zoom(zoom, wait_until_done) diff --git a/test/model/devices/camera/test_camera_base.py b/test/model/devices/camera/test_camera_base.py index 6a07b304b..f3b3aec13 100644 --- a/test/model/devices/camera/test_camera_base.py +++ b/test/model/devices/camera/test_camera_base.py @@ -29,17 +29,17 @@ # IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -# """ +#""" # Third Party Imports -from navigate.model.devices.camera.base import CameraBase +from navigate.model.devices.camera.synthetic import SyntheticCamera def test_start_camera(dummy_model): model = dummy_model for microscope_name in model.configuration["configuration"]["microscopes"].keys(): - camera = CameraBase(microscope_name, None, model.configuration) + camera = SyntheticCamera(microscope_name, None, model.configuration) assert ( camera.camera_parameters["hardware"]["serial_number"] == model.configuration["configuration"]["microscopes"][microscope_name][ @@ -53,7 +53,7 @@ def test_start_camera(dummy_model): ) raised_error = False try: - camera = CameraBase(microscope_name, None, model.configuration) + _ = SyntheticCamera(microscope_name, None, model.configuration) except NameError: raised_error = True assert ( @@ -69,7 +69,7 @@ def test_camera_base_functions(dummy_model): "microscope_name" ] - camera = CameraBase(microscope_name, None, model.configuration) + camera = SyntheticCamera(microscope_name, None, model.configuration) funcs = ["set_readout_direction", "calculate_light_sheet_exposure_time"] args = [[random.random()], [random.random(), random.random()]] diff --git a/test/model/devices/daq/test_daq_base.py b/test/model/devices/daq/test_daq_base.py index 8cf6d8795..8a2b152cb 100644 --- a/test/model/devices/daq/test_daq_base.py +++ b/test/model/devices/daq/test_daq_base.py @@ -1,19 +1,16 @@ -def test_initialize_daq(): - from navigate.model.devices.daq.base import DAQBase - from test.model.dummy import DummyModel +from navigate.model.devices.daq.synthetic import SyntheticDAQ +from test.model.dummy import DummyModel +import numpy as np + +def test_initialize_daq(): model = DummyModel() - DAQBase(model.configuration) + SyntheticDAQ(model.configuration) def test_calculate_all_waveforms(): - import numpy as np - - from navigate.model.devices.daq.base import DAQBase - from test.model.dummy import DummyModel - model = DummyModel() - daq = DAQBase(model.configuration) + daq = SyntheticDAQ(model.configuration) microscope_state = model.configuration["experiment"]["MicroscopeState"] microscope_name = microscope_state["microscope_name"] exposure_times = { diff --git a/test/model/devices/daq/test_daq_synthetic.py b/test/model/devices/daq/test_daq_synthetic.py index 089525ffb..022a00d11 100644 --- a/test/model/devices/daq/test_daq_synthetic.py +++ b/test/model/devices/daq/test_daq_synthetic.py @@ -20,25 +20,13 @@ def test_synthetic_daq_functions(): funcs = [ "add_camera", - "create_camera_task", - "create_master_trigger_task", - "create_galvo_remote_focus_tasks", - "start_tasks", - "stop_tasks", - "close_tasks", "prepare_acquisition", "run_acquisition", "stop_acquisition", - "write_waveforms_to_tasks", + "wait_acquisition_done", ] args = [ [microscope_name, model.camera[microscope_name]], - None, - None, - None, - None, - None, - None, [f"channel_{random.randint(1, 5)}"], None, None, diff --git a/test/model/devices/filter_wheel/test_fw_base.py b/test/model/devices/filter_wheel/test_fw_base.py index 96fccb286..c0539cef4 100644 --- a/test/model/devices/filter_wheel/test_fw_base.py +++ b/test/model/devices/filter_wheel/test_fw_base.py @@ -1,15 +1,18 @@ +from navigate.model.devices.filter_wheel.synthetic import SyntheticFilterWheel +from test.model.dummy import DummyModel + + def test_filter_wheel_base_functions(): - from navigate.model.devices.filter_wheel.base import FilterWheelBase - from test.model.dummy import DummyModel model = DummyModel() microscope_name = model.configuration["experiment"]["MicroscopeState"][ "microscope_name" ] - fw = FilterWheelBase( - microscope_name, - None, - model.configuration, + fw = SyntheticFilterWheel( + microscope_name=microscope_name, + device_connection=None, + configuration=model.configuration, + device_id=0, ) filter_dict = model.configuration["configuration"]["microscopes"][microscope_name][ diff --git a/test/model/devices/filter_wheel/test_fw_synthetic.py b/test/model/devices/filter_wheel/test_fw_synthetic.py index ff14c3c3b..8f0cab4e1 100644 --- a/test/model/devices/filter_wheel/test_fw_synthetic.py +++ b/test/model/devices/filter_wheel/test_fw_synthetic.py @@ -51,8 +51,8 @@ def test_synthetic_filter_wheel_functions(): 0 ) - funcs = ["filter_change_delay", "set_filter", "read", "close"] - args = [["channel_dummy"], ["channel_dummy"], [int(random.random() * 100)], None] + funcs = [ "set_filter", "close"] + args = [["channel_dummy"], None] for f, a in zip(funcs, args): if a is not None: diff --git a/test/model/devices/galvo/test_galvo_base.py b/test/model/devices/galvo/test_galvo_base.py index 11f718c93..4c59d8951 100644 --- a/test/model/devices/galvo/test_galvo_base.py +++ b/test/model/devices/galvo/test_galvo_base.py @@ -31,7 +31,7 @@ import unittest from unittest.mock import MagicMock -from navigate.model.devices.galvo.base import GalvoBase +from navigate.model.devices.galvo.synthetic import SyntheticGalvo from navigate.config import ( load_configs, get_configuration_paths, @@ -74,7 +74,7 @@ def setUp(self) -> None: self.device_connection = MagicMock() galvo_id = 0 - self.galvo = GalvoBase( + self.galvo = SyntheticGalvo( microscope_name=self.microscope_name, device_connection=self.device_connection, configuration=self.configuration, diff --git a/test/model/devices/lasers/test_laser_base.py b/test/model/devices/lasers/test_laser_base.py index 140f5fd65..9d20ee19e 100644 --- a/test/model/devices/lasers/test_laser_base.py +++ b/test/model/devices/lasers/test_laser_base.py @@ -1,16 +1,17 @@ -def test_laser_base_functions(): - import random +from navigate.model.devices.laser.synthetic import SyntheticLaser +from test.model.dummy import DummyModel +import random + - from navigate.model.devices.laser.base import LaserBase - from test.model.dummy import DummyModel +def test_laser_base_functions(): model = DummyModel() microscope_name = model.configuration["experiment"]["MicroscopeState"][ "microscope_name" ] - laser = LaserBase(microscope_name, None, model.configuration, 0) + laser = SyntheticLaser(microscope_name, None, model.configuration, 0) - funcs = ["set_power", "turn_on", "turn_off", "close", "initialize_laser"] + funcs = ["set_power", "turn_on", "turn_off", "close"] args = [[random.random()], None, None, None] for f, a in zip(funcs, args): diff --git a/test/model/devices/remote_focus/test_rf_base.py b/test/model/devices/remote_focus/test_rf_base.py index 394b77e37..a945d179f 100644 --- a/test/model/devices/remote_focus/test_rf_base.py +++ b/test/model/devices/remote_focus/test_rf_base.py @@ -37,22 +37,20 @@ import numpy as np # Local Imports +from navigate.model.devices.remote_focus.synthetic import SyntheticRemoteFocus +from test.model.dummy import DummyModel def test_remote_focus_base_init(): - from navigate.model.devices.remote_focus.base import RemoteFocusBase - from test.model.dummy import DummyModel - model = DummyModel() microscope_name = model.configuration["experiment"]["MicroscopeState"][ "microscope_name" ] - RemoteFocusBase(microscope_name, None, model.configuration) + SyntheticRemoteFocus(microscope_name, None, model.configuration) @pytest.mark.parametrize("smoothing", [0] + list(np.random.rand(5) * 100)) def test_remote_focus_base_adjust(smoothing): - from navigate.model.devices.remote_focus.base import RemoteFocusBase from test.model.dummy import DummyModel model = DummyModel() @@ -76,7 +74,7 @@ def test_remote_focus_base_adjust(smoothing): ] = smoothing channel["camera_exposure_time"] = np.random.rand() * 150 + 50 - rf = RemoteFocusBase(microscope_name, None, model.configuration) + rf = SyntheticRemoteFocus(microscope_name, None, model.configuration) # exposure_times = { # k: v["camera_exposure_time"] / 1000 diff --git a/test/model/devices/shutter/test_shutter_base.py b/test/model/devices/shutter/test_shutter_base.py index 252cebeae..38b51af77 100644 --- a/test/model/devices/shutter/test_shutter_base.py +++ b/test/model/devices/shutter/test_shutter_base.py @@ -30,7 +30,7 @@ # POSSIBILITY OF SUCH DAMAGE. # import unittest -from navigate.model.devices.shutter.base import ShutterBase +from navigate.model.devices.shutter.synthetic import SyntheticShutter from test.model.dummy import DummyModel @@ -41,7 +41,7 @@ class TestLaserBase(unittest.TestCase): microscope_name = "Mesoscale" def test_shutter_base_attributes(self): - shutter = ShutterBase( + shutter = SyntheticShutter( self.microscope_name, None, self.dummy_model.configuration ) diff --git a/test/model/devices/stages/test_stage_base.py b/test/model/devices/stages/test_stage_base.py index e7ee1c5ea..4772f307f 100644 --- a/test/model/devices/stages/test_stage_base.py +++ b/test/model/devices/stages/test_stage_base.py @@ -37,7 +37,7 @@ # Third Party Imports # Local Imports -from navigate.model.devices.stage.base import StageBase +from navigate.model.devices.stage.synthetic import SyntheticStage class TestStageBase: @@ -75,7 +75,7 @@ def setup_class(self, stage_configuration): def test_stage_attributes(self, axes, axes_mapping): self.stage_configuration["stage"]["hardware"]["axes"] = axes self.stage_configuration["stage"]["hardware"]["axes_mapping"] = axes_mapping - stage = StageBase(self.microscope_name, None, self.configuration) + stage = SyntheticStage(self.microscope_name, None, self.configuration) # Attributes for axis in axes: @@ -92,8 +92,9 @@ def test_stage_attributes(self, axes, axes_mapping): == self.stage_configuration["stage"][f"{axis}_max"] ) + # Check default axes mapping if axes_mapping is None: - assert stage.axes_mapping == {} + assert stage.axes_mapping == {axis: axis.capitalize() for axis in axes} else: for i, axis in enumerate(axes): assert stage.axes_mapping[axis] == axes_mapping[i] @@ -114,7 +115,7 @@ def test_stage_attributes(self, axes, axes_mapping): ) def test_get_position_dict(self, axes, axes_pos): self.stage_configuration["stage"]["hardware"]["axes"] = axes - stage = StageBase(self.microscope_name, None, self.configuration) + stage = SyntheticStage(self.microscope_name, None, self.configuration) for i, axis in enumerate(axes): setattr(stage, f"{axis}_pos", axes_pos[i]) @@ -137,7 +138,7 @@ def test_get_position_dict(self, axes, axes_pos): def test_get_abs_position(self, axes, axes_mapping): self.stage_configuration["stage"]["hardware"]["axes"] = axes self.stage_configuration["stage"]["hardware"]["axes_mapping"] = axes_mapping - stage = StageBase(self.microscope_name, None, self.configuration) + stage = SyntheticStage(self.microscope_name, None, self.configuration) for axis in axes: axis_min = self.stage_configuration["stage"][f"{axis}_min"] @@ -187,7 +188,7 @@ def test_get_abs_position(self, axes, axes_mapping): def test_verify_abs_position(self, axes, axes_mapping): self.stage_configuration["stage"]["hardware"]["axes"] = axes self.stage_configuration["stage"]["hardware"]["axes_mapping"] = axes_mapping - stage = StageBase(self.microscope_name, None, self.configuration) + stage = SyntheticStage(self.microscope_name, None, self.configuration) move_dict = {} abs_dict = {} @@ -198,7 +199,6 @@ def test_verify_abs_position(self, axes, axes_mapping): axis_abs = random.randrange(axis_min, axis_max) move_dict[f"{axis}_abs"] = axis_abs abs_dict[axis] = axis_abs - assert stage.verify_abs_position(move_dict) == abs_dict # turn off stage_limits @@ -206,9 +206,13 @@ def test_verify_abs_position(self, axes, axes_mapping): axis = random.choice(axes) axis_min = self.stage_configuration["stage"][f"{axis}_min"] axis_max = self.stage_configuration["stage"][f"{axis}_max"] + + # Test minimum boundary move_dict[f"{axis}_abs"] = axis_min - 1.5 abs_dict[axis] = axis_min - 1.5 assert stage.verify_abs_position(move_dict) == abs_dict + + # Test maximum boundary move_dict[f"{axis}_abs"] = axis_max + 1.5 abs_dict[axis] = axis_max + 1.5 assert stage.verify_abs_position(move_dict) == abs_dict @@ -225,13 +229,3 @@ def test_verify_abs_position(self, axes, axes_mapping): assert stage.verify_abs_position(move_dict) == abs_dict stage.stage_limits = False assert stage.verify_abs_position(move_dict) == abs_dict - - self.stage_configuration["stage"]["hardware"]["axes_mapping"] = axes_mapping[ - :-1 - ] - stage = StageBase(self.microscope_name, None, self.configuration) - abs_dict.pop(axes[-1]) - - assert stage.verify_abs_position(move_dict) == abs_dict - stage.stage_limits = False - assert stage.verify_abs_position(move_dict) == abs_dict diff --git a/test/model/devices/zoom/test_base.py b/test/model/devices/zoom/test_base.py index 34e2c1100..5bf123c7d 100644 --- a/test/model/devices/zoom/test_base.py +++ b/test/model/devices/zoom/test_base.py @@ -34,9 +34,11 @@ @pytest.fixture def dummy_zoom(dummy_model): - from navigate.model.devices.zoom.base import ZoomBase + from navigate.model.devices.zoom.synthetic import SyntheticZoom - return ZoomBase(dummy_model.active_microscope_name, None, dummy_model.configuration) + return SyntheticZoom( + dummy_model.active_microscope_name, None, dummy_model.configuration + ) def test_zoom_base_attributes(dummy_zoom):