In [None]:
%load_ext autoreload
%autoreload 2

# FliCamera

>     This class provides implementations of all abstract methods defined in the BaseCameraInterface for FLI cameras using the FliSdk_V2 SDK. It handles camera initialization, image acquisition, configuration management, and resource cleanup. There are additonal camera specific function as well.


In [None]:
#| default_exp cameras

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
#| hide

# bring forth **kwargs from an inherited class for documentation
from fastcore.meta import delegates

# # monkey patching class methods using @patch
# from fastcore.foundation import *
# from fastcore.foundation import patch

# external
import numpy as np
import time
import ctypes

# internal
from sail_cameras.BaseCameraInterface import BaseCameraInterface, goodtimer

In [None]:
#| export

@delegates()
class FliCamera(BaseCameraInterface):
    """
    First Light Imaging (FLI) camera implementation of the BaseCameraInterface.

    This class provides concrete implementations of all abstract methods defined in the
    BaseCameraInterface for FLI cameras using the FliSdk_V2 SDK. It handles camera
    initialization, image acquisition, configuration management, and resource cleanup.

    The camera can be used in a context manager (with statement) for automatic resource management.
    """
    def __init__(
        self,
        camera_index=0,
        camera_id=None,
        verbose=False,
        cam_settings=None,
        cropdims=None,
        darkpath="./",
        darkfile=None,
        buffersize_ims=1000,
        **kwargs
    ):
        """
        Initialize the FLI camera with given parameters.

        Parameters
        ----------
        camera_index : int, default=0
            Index of the camera to use when multiple cameras are connected.
        camera_id : str, optional
            Camera identifier string, used to select a specific camera.
        verbose : bool, default=False
            If True, prints detailed information during operations.
        cam_settings : dict, optional
            Camera-specific settings to override defaults.
        cropdims : tuple, optional
            Cropping dimensions for the camera sensor (left, right, top, bottom).
        darkpath : str, default='./'
            Directory path for saving/loading dark frames.
        darkfile : str, optional
            Filename of a previously saved dark frame to load at initialization.
        buffersize_ims : int, default=1000
            Size of the buffer in number of images.
        """
        super().__init__(**kwargs)
        from flisdk import FliSdk_V2  ## import here to avoid errors if not installed.

        if cam_settings is None:
            self.dflt_settings = {
                "sensitivity": "low",
                "bias mode": "off",
                "flat mode": "off",
                "badpixel mode": "on",
                "fps": 600,  # 1e6, # 1e6 Sets to maximum
                "tint": 0.00005,  # 1 Sets to maximum # 0.0002
            }
        else:
            self.dflt_settings = cam_settings

        self.cam_context = FliSdk_V2.Init()
        self.latest_im = None
        self.nims_lefttolog = 0
        self.nims_tolog = 1
        self.update_latestim = False
        self.syncdelay = None

        callback_fps = 0  # 0 for full speed

        self.darkpath = darkpath
        if darkfile is not None:
            self.dark = np.load(self.darkpath + darkfile + ".npy")
            if verbose:
                print("Using darkfile " + self.darkpath + darkfile)
        else:
            self.dark = 0

        grabber_list = FliSdk_V2.DetectGrabbers(self.cam_context)
        camera_list = FliSdk_V2.DetectCameras(self.cam_context)

        if camera_id is not None:
            camera_index_l = [i for i, s in enumerate(camera_list) if camera_id in s]
            if len(camera_index_l) != 1:
                print("Error: couldn't find specified camera id")
            camera_index = camera_index_l[0]

        self.camera_index = camera_index

        num_cameras = len(camera_list)
        if verbose:
            print("%d cameras detected: " % num_cameras)
            for k in range(num_cameras):
                print("%d: " % k + camera_list[k])
            print("Using camera %d" % camera_index)

        errorval = FliSdk_V2.SetCamera(self.cam_context, camera_list[camera_index])
        FliSdk_V2.SetMode(
            self.cam_context, FliSdk_V2.Mode.Full
        )  # Enables grab and config
        errorval = FliSdk_V2.Update(self.cam_context)

        if cropdims is not None:
            self.send_command(
                "set cropping columns %d-%d" % (cropdims[0], cropdims[1]),
                verbose=verbose,
            )
            self.send_command(
                "set cropping rows %d-%d" % (cropdims[2], cropdims[3]), verbose=verbose
            )
            self.send_command("set cropping on", verbose=verbose)
        else:
            self.send_command("set cropping off", verbose=verbose)
        self.camdims = FliSdk_V2.GetCurrentImageDimension(self.cam_context)

        buffersize_ims = buffersize_ims
        FliSdk_V2.SetBufferSizeInImages(self.cam_context, buffersize_ims)

        self.loggedims_cube = np.zeros(
            (self.nims_tolog, self.camdims[1], self.camdims[0]), dtype=np.int16
        )
        self.loggedims_times_arr = np.zeros(self.nims_tolog)

        self.wrappedFunc = FliSdk_V2.CWRAPPER(self.newim_callbackfunc)
        FliSdk_V2.AddCallBackNewImage(
            self.cam_context, self.wrappedFunc, callback_fps, True, 0
        )  # True

        self.external_trigger(enabled=False)
        FliSdk_V2.Start(self.cam_context)
        self.set_camera_defaults(verbose=verbose)


    def send_command(self, commandstr, return_response=True, verbose=False):
        """
        Send a command to the camera through the SDK.
    
        Parameters
        ----------
        commandstr : str
            Command string to send to the camera.
        return_response : bool, default=True
            If True, returns the camera's response.
        verbose : bool, default=False
            If True, prints the command and response.
    
        Returns
        -------
        str or None
            The camera's response if return_response is True, otherwise None.
        """
        errorval, response = FliSdk_V2.FliSerialCamera.SendCommand(
            self.cam_context, commandstr
        )
        if verbose:
            print(commandstr)
            print(response)
        if return_response:
            return response


    def set_camera_defaults(self, verbose=True):
        """
        Configure camera with default settings.
    
        Applies all the default settings specified during initialization in
        the dflt_settings dictionary, including sensitivity, bias mode,
        flat mode, badpixel mode, framerate, and integration time.
    
        Parameters
        ----------
        verbose : bool, default=True
            If True, prints commands, responses and current settings.
        """
        print("Setting camera default settings:")
        self.send_command(
            "set sensitivity " + self.dflt_settings["sensitivity"], verbose=verbose
        )
        self.send_command(
            "set bias " + self.dflt_settings["bias mode"], verbose=verbose
        )
        self.send_command(
            "set flat " + self.dflt_settings["flat mode"], verbose=verbose
        )
        self.send_command(
            "set badpixel " + self.dflt_settings["badpixel mode"], verbose=verbose
        )
        self.send_command("set fps " + str(self.dflt_settings["fps"]), verbose=verbose)
        self.send_command(
            "set tint " + str(self.dflt_settings["tint"]), verbose=verbose
        )
    
        if verbose:
            print(" ")
            print(self.send_command("fps"))
            print(self.send_command("tint"))
    def reset_buffer(self):
        """Reset the camera's internal buffer, clearing any stored images."""
        FliSdk_V2.ResetBuffer(self.cam_context)
    
    def camera_start(self):
        """Start the camera's acquisition process.
    
        Implementation of the abstract start_capture method from BaseCameraInterface.
        """
        FliSdk_V2.Start(self.cam_context)

    def start_capture(self):
        """ Comptablity function. calls camera self.camera_start() """
        self.camera_start()
    
    def camera_stop(self):
        """Stop the camera's acquisition process.
    
        Implementation of the abstract stop_capture method from BaseCameraInterface.
        """
        FliSdk_V2.Stop(self.cam_context)

    def stop_capture(self):
        """ Comptablity function. calls camera self.camera_stop() """
        self.camera_stop()
    
    def close(self):
        """Release resources and shut down the camera.
    
        Implementation of the abstract close method from BaseCameraInterface.
        """
        FliSdk_V2.Stop(self.cam_context)
        FliSdk_V2.Exit(self.cam_context)
        print("Closed context for camera %d" % self.camera_index)
    
    def external_trigger(self, enabled, syncdelay=None, verbose=False):
        """Configure external triggering for the camera.
    
        Parameters
        ----------
        enabled : bool
            If True, enables external triggering; if False, disables it.
        syncdelay : float, optional
            Synchronization delay in milliseconds.
        verbose : bool, default=False
            If True, prints the commands and responses.
        """
        if enabled:
            self.send_command("set extsynchro exposure internal", verbose=verbose)
            self.send_command("set extsynchro source external", verbose=verbose)
            self.send_command("set extsynchro on", verbose=verbose)
            self.send_command("set extsynchro polarity standard", verbose=verbose)
            if syncdelay is not None:
                self.syncdelay = syncdelay
                self.send_command(
                    "set syncdelay " + str(syncdelay / 1000), verbose=verbose
                )
            print("External synchronisation ENABLED")
        else:
            self.send_command("set extsynchro off", verbose=verbose)
            print("External synchronisation DISABLED")
    
    def set_tint(self, tint, verbose=False):
        """Set the camera integration time (exposure time).
    
        Parameters
        ----------
        tint : float
            Integration time in seconds.
        verbose : bool, default=False
            If True, prints the command and response.
        """
        self.send_command("set tint " + str(tint), verbose=verbose)
    
    def take_dark(self, darkfile="default_dark", navs=1000, save=False):
        """
        Acquire a dark frame by averaging multiple captures.
    
        Implementation of the abstract take_dark method from BaseCameraInterface.
    
        Parameters
        ----------
        darkfile : str, default="default_dark"
            Name to use when saving the dark frame.
        navs : int, default=1000
            Number of frames to average for the dark frame.
        save : bool, default=False
            If True, saves the dark frame to disk.
    
        Returns
        -------
        numpy.ndarray
            The averaged dark frame.
        """
        self.set_nims_tolog(navs)
        darkframe = self.get_n_images(return_ims=True, coadd=True)
        self.dark = darkframe
        print("New darkframe acquired")
        if save:
            print("Saving darkframe to " + self.darkpath + darkfile + ".npy")
            np.save(self.darkpath + darkfile + ".npy", darkframe)
    
    def load_dark(self, darkfile="default_dark"):
        """
        Load a previously saved dark frame from disk.
    
        Implementation of the abstract load_dark method from BaseCameraInterface.
    
        Parameters
        ----------
        darkfile : str, default="default_dark"
            Name of the dark frame file to load (without extension).
        """
        darkframe = np.load(self.darkpath + darkfile + ".npy")
        self.dark = darkframe
        print("Loaded darkframe from " + self.darkpath + darkfile + ".npy")
    
    
    
    def get_latest_image(self, return_im=True, waitfornewframe=True):
        """
        Retrieve the latest image frame from the camera.
    
        Implementation of the abstract get_image method from BaseCameraInterface.
    
        Parameters
        ----------
        return_im : bool, default=True
            If True, returns the image; otherwise just updates the internal buffer.
        waitfornewframe : bool, default=True
            If True, blocks until a new frame is available.
    
        Returns
        -------
        numpy.ndarray or None
            The image as a 2D numpy array if return_im is True, otherwise None.
        """
        if waitfornewframe:
            self.update_latestim = True
            while self.update_latestim:
                # self.goodtimer(0.1)
                pass
                # time.sleep(0.001)
        else:
            ## This doesn't support signed ints...
            # new_im = FliSdk_V2.GetRawImageAsNumpyArray(self.cam_context, -1)  # -1 gets most recent image
    
            image = FliSdk_V2.GetRawImage(self.cam_context, -1)
            ArrayType = ctypes.c_uint16 * self.camdims[0] * self.camdims[1]
            pa = ctypes.cast(image, ctypes.POINTER(ArrayType))
            buffer = np.ndarray(
                (self.camdims[1], self.camdims[0]), dtype=np.int16, buffer=pa.contents
            )
            new_im = np.copy(buffer)
            self.latest_im = new_im
    
        self.loggedims_times_arr = [time.perf_counter()]
        if return_im:
            return self.latest_im
    
    def check_nims_buffer(self):
        """
        Check the number of images in the camera buffer.
    
        Returns
        -------
        int
            Number of images currently stored in the buffer.
        """
        n_buffer_ims = FliSdk_V2.GetBufferFilling(self.cam_context) + 1
        return n_buffer_ims
    
    def get_buffer_images(self, return_im=True, verbose=False):
        """
        Retrieve all images currently stored in the camera buffer.
    
        Parameters
        ----------
        return_im : bool, default=True
            If True, returns the image cube; otherwise just updates internal buffer.
        verbose : bool, default=False
            If True, prints additional information.
    
        Returns
        -------
        numpy.ndarray or None
            3D array of images (frames, height, width) if return_im is True, otherwise None.
        """
        ## This doesn't support signed ints...
        # new_im = FliSdk_V2.GetRawImageAsNumpyArray(self.cam_context, -1)  # -1 gets most recent image
    
        n_buffer_ims = FliSdk_V2.GetBufferFilling(self.cam_context) + 1
        print("Num images in buffer: %.1f" % n_buffer_ims)
    
        self.loggedims_cube = np.zeros(
            (n_buffer_ims, self.camdims[1], self.camdims[0]), dtype=np.int16
        )
        for k in range(n_buffer_ims):
            image = FliSdk_V2.GetRawImage(self.cam_context, k)
            ArrayType = ctypes.c_uint16 * self.camdims[0] * self.camdims[1]
            pa = ctypes.cast(image, ctypes.POINTER(ArrayType))
            buffer = np.ndarray(
                (self.camdims[1], self.camdims[0]), dtype=np.int16, buffer=pa.contents
            )
            self.loggedims_cube[k, :, :] = np.copy(buffer)
        print("get_buffer_ims: loop done")
        if return_im:
            return self.loggedims_cube
    
    def newim_callbackfunc(
        self, image, ctx
    ):  # Use different method than numpy one to avoid dupes? E.g get straight
        """
        Callback function executed by the SDK when a new image is available.
    
        This function is registered with the FliSdk_V2 and called automatically
        when new frames are captured. It handles updating the latest image buffer
        and logging images when requested.
    
        Parameters
        ----------
        image : pointer
            Pointer to the raw image data from the SDK.
        ctx : pointer
            Context pointer passed by the SDK.
        """
        # from a buffer?
        # new_im = FliSdk_V2.GetRawImageAsNumpyArray(self.cam_context, -1)  # -1 gets most recent image
    
        ArrayType = ctypes.c_uint16 * self.camdims[0] * self.camdims[1]
        pa = ctypes.cast(image, ctypes.POINTER(ArrayType))
        buffer = np.ndarray(
            (self.camdims[1], self.camdims[0]), dtype=np.int16, buffer=pa.contents
        )
    
        new_im = np.copy(buffer)
    
        if self.update_latestim:
            self.latest_im = new_im  ### TODO - double check this can't change im in loggedims_list (another copy?)
            self.update_latestim = False
    
        if self.nims_lefttolog > 0:
            self.loggedims_cube[self.nims_tolog - self.nims_lefttolog, :, :] = new_im
            self.loggedims_times_arr[self.nims_tolog - self.nims_lefttolog] = (
                time.perf_counter()
            )
            self.nims_lefttolog -= 1
    
    def get_n_images(
        self, blocking=True, return_ims=False, coadd=False, subtract_dark=False
    ):
        """
        Acquire a specified number of images from the camera.
    
        Parameters
        ----------
        blocking : bool, default=True
            If True, blocks until all requested images are acquired.
        return_ims : bool, default=False
            If True, returns the acquired images.
        coadd : bool, default=False
            If True, returns the average of all acquired images.
        subtract_dark : bool, default=False
            If True, subtracts the dark frame from each image.
    
        Returns
        -------
        numpy.ndarray or None
            If return_ims is True, returns either a 3D array of images or
            a 2D array (if coadd=True). Otherwise returns None.
        """
        self.nims_lefttolog = self.nims_tolog
    
        if blocking:
            while self.nims_lefttolog > 0:
                time.sleep(0.001)
            if subtract_dark:
                self.loggedims_cube = (
                    self.loggedims_cube - self.dark
                )  # TODO - does this slow it down, if coadding later?
            if return_ims:
                # loggedims_cube_copy = np.copy(self.loggedims_cube)
                # self.loggedims_cube = np.zeros((self.nims_tolog, self.camdims[1], self.camdims[0]), dtype=np.uint16)
                # return loggedims_cube_copy
                if coadd:
                    return np.mean(self.loggedims_cube, axis=0)
                else:
                    return self.loggedims_cube
    
    def set_nims_tolog(self, nims):
        """
        Set the number of images to acquire in subsequent operations.
    
        Parameters
        ----------
        nims : int
            Number of images to acquire and store.
        """
        self.nims_tolog = nims
        self.loggedims_cube = np.zeros(
            (self.nims_tolog, self.camdims[1], self.camdims[0]), dtype=np.int16
        )
        self.loggedims_times_arr = np.zeros(self.nims_tolog)


In [None]:
show_doc(FliCamera.external_trigger)

---

[source](https://github.com/SAIL-Labs/sail-cameras/blob/main/sail_cameras/cameras.py#L247){target="_blank" style="float:right; font-size:smaller"}

### FliCamera.external_trigger

>      FliCamera.external_trigger (enabled, syncdelay=None, verbose=False)

*Configure external triggering for the camera.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| enabled | bool |  | If True, enables external triggering; if False, disables it. |
| syncdelay | NoneType | None | Synchronization delay in milliseconds. |
| verbose | bool | False | If True, prints the commands and responses. |

In [None]:
show_doc(FliCamera.check_nims_buffer)

---

[source](https://github.com/SAIL-Labs/sail-cameras/blob/main/sail_cameras/cameras.py#L370){target="_blank" style="float:right; font-size:smaller"}

### FliCamera.check_nims_buffer

>      FliCamera.check_nims_buffer ()

*Check the number of images in the camera buffer.*

In [None]:
show_doc(FliCamera.get_buffer_images)

---

[source](https://github.com/SAIL-Labs/sail-cameras/blob/main/sail_cameras/cameras.py#L382){target="_blank" style="float:right; font-size:smaller"}

### FliCamera.get_buffer_images

>      FliCamera.get_buffer_images (return_im=True, verbose=False)

*Retrieve all images currently stored in the camera buffer.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| return_im | bool | True | If True, returns the image cube; otherwise just updates internal buffer. |
| verbose | bool | False | If True, prints additional information. |
| **Returns** | **numpy.ndarray or None** |  | **3D array of images (frames, height, width) if return_im is True, otherwise None.** |

In [None]:
show_doc(FliCamera.get_latest_image)

---

[source](https://github.com/SAIL-Labs/sail-cameras/blob/main/sail_cameras/cameras.py#L329){target="_blank" style="float:right; font-size:smaller"}

### FliCamera.get_latest_image

>      FliCamera.get_latest_image (return_im=True, waitfornewframe=True)

*Retrieve the latest image frame from the camera.

Implementation of the abstract get_image method from BaseCameraInterface.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| return_im | bool | True | If True, returns the image; otherwise just updates the internal buffer. |
| waitfornewframe | bool | True | If True, blocks until a new frame is available. |
| **Returns** | **numpy.ndarray or None** |  | **The image as a 2D numpy array if return_im is True, otherwise None.** |

In [None]:
show_doc(FliCamera.get_n_images)

---

[source](https://github.com/SAIL-Labs/sail-cameras/blob/main/sail_cameras/cameras.py#L458){target="_blank" style="float:right; font-size:smaller"}

### FliCamera.get_n_images

>      FliCamera.get_n_images (blocking=True, return_ims=False, coadd=False,
>                              subtract_dark=False)

*Acquire a specified number of images from the camera.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| blocking | bool | True | If True, blocks until all requested images are acquired. |
| return_ims | bool | False | If True, returns the acquired images. |
| coadd | bool | False | If True, returns the average of all acquired images. |
| subtract_dark | bool | False | If True, subtracts the dark frame from each image. |
| **Returns** | **numpy.ndarray or None** |  | **If return_ims is True, returns either a 3D array of images or<br>a 2D array (if coadd=True). Otherwise returns None.** |

In [None]:
show_doc(FliCamera.mro)

In [None]:
show_doc(FliCamera.newim_callbackfunc)

In [None]:
show_doc(FliCamera.reset_buffer)

In [None]:
show_doc(FliCamera.send_command)

In [None]:
show_doc(FliCamera.set_nims_tolog)

In [None]:
show_doc(FliCamera.set_tint)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()