In [None]:
%load_ext autoreload
%autoreload 2

from openlm.detectors.demo.demo import DemoDetector
from openlm.structures import DetectorSettings, ImageSettings, SerialSettings
import matplotlib.pyplot as plt

# initialise logging
import logging
logging.basicConfig(level=logging.INFO)


In [None]:
det_settings = DetectorSettings(
    name="demo",
    serial_settings=SerialSettings(
        port="ttyUSB0",
        baudrate=9600,
        timeout=0.1),
    pixel_size=1e-6,
    resolution=[2048, 2048],

)

detector = DemoDetector(detector_settings=det_settings)
detector.init_camera()

In [None]:
image_settings = ImageSettings(
    pixel_size=1e-6,
    exposure=1e-3
    )

image = detector.grab_image(image_settings)
print(image.shape)

In [None]:
plt.imshow(image, cmap="gray")
plt.show()

## Setup Session


In [None]:
%load_ext autoreload
%autoreload 2

from openlm import utils
from openlm.microscopes.base import BaseLightMicroscope

lc, det, obj = utils.setup_session()

lm: BaseLightMicroscope = utils.create_microscope("test", det, lc, obj)


In [None]:
lm.get_detector()
lm.get_laser_controller()
lm.get_objective()

In [None]:
lm._laser_controller.get_laser("laser_1")


In [None]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import numpy as np
import glob 
filenames = sorted(glob.glob(r"../image_*.npy"))

print(filenames)
import os

# plot as subplots
fig, axs = plt.subplots(len(filenames)//4, 4, figsize=(15, 10))
for i, fname in enumerate(filenames):
    arr = np.load(fname)
    axs[i//4, i%4].imshow(arr, cmap="gray")
    axs[i//4, i%4].set_title(os.path.basename(fname))

In [None]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import numpy as np

from openlm.structures import LightImage

import glob
import os

# path = r"D:\David\piescope_dev\tile"
path = "../"
filenames = sorted(glob.glob(os.path.join(path, "2023-05-08-04-1*.tif")))

for fname in filenames:
    img = LightImage.load(fname)

    # subplots
    fig, axs = plt.subplots(1, img.data.shape[2], figsize=(15, 10))

    for i, channels in enumerate(img.metadata.channels):
        cmap = "Greens" if i == 0 else "Blues"
        axs[i].imshow(img.data[:, :, i], cmap=cmap)
        axs[i].set_title(f"channel {channels}")

        #centre crosshair, dashed line
        axs[i].axvline(img.data.shape[0]//2, linestyle="--", color="yellow")
        axs[i].axhline(img.data.shape[1]//2, linestyle="--", color="yellow")
    print(fname)
    print(img.data.shape)
    print(img.metadata.channels)

    plt.show()

## Microscope State

In [None]:
%load_ext autoreload
%autoreload 2

import logging

from openlm import utils
from openlm import config as cfg
import os

from fibsem import utils as fibsem_utils


import os
from openlm import utils
from openlm import config
from openlm.structures import ImageMode, SynchroniserMessage, ImageSettings, DetectorSettings
import time
import logging
import numpy as np
from PIL import Image

from dataclasses import dataclass
from openlm.structures import (LaserSettings, ImageMode, TriggerEdge, TriggerSource, 
    DetectorSettings, ObjectiveSettings, LightImage, LightImageMetadata)

from openlm.workflow import test_acq


In [None]:
CFG_PATH = os.path.join(cfg.BASE_PATH, "config", "piedisc.yaml")
microscope, settings = utils.setup_session(config_path=CFG_PATH)
microscope.fibsem_microscope, microscope.fibsem_settings = fibsem_utils.setup_session()

In [None]:
# SINGLE IMAGE



# what is a workflow
# a list of workflow steps

# what is a workflow step
# a function to run




## Workflow Generation

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np

from pprint import pprint

In [None]:
def _gen_tiles(n_rows: int = 1, n_cols: int = 1, dx:float = 0, dy: float = 0.0)-> list:
    # relative position of each tile
    return [{"type": "move_stage", "dx":j*dx, "dy":i*dy} for i in range(n_rows) for j in range(n_cols)]


def _gen_vol():
    return


In [None]:
tile_list = _gen_tiles(n_rows=3, n_cols=3, dx=1, dy=1)
pprint(tile_list)

In [None]:
from dataclasses import dataclass

from openlm.microscope import LightMicroscope
from abc import ABC, abstractmethod

@dataclass
class OpenLMSWorkflowStep(ABC):
    type: str
    name: str
    params: dict

    @abstractmethod
    def run(self, microscope: LightMicroscope, return_func=None, *args, **kwargs):
        pass

class OpenLMSWorkflowStepMoveStage(OpenLMSWorkflowStep):
    def __init__(self, name: str, params: dict):
        super().__init__(type="move_stage", name=name, params=params)

    def run(self,microscope: LightMicroscope, return_func=None ):
        worker = microscope.move_stage(dx=self.params["dx"], dy=self.params["dy"])
        worker.returned.connect(return_func)  # type: ignore
        worker.start()

class OpenLMSWorkflowStepAcquireImage(OpenLMSWorkflowStep):
    def __init__(self, name: str, params: dict):
        super().__init__(type="acquire_image", name=name, params=params)

    def run(self, microscope: LightMicroscope, return_func=None ):
        self.params["stop_event"].clear()
        microscope.setup_acquisition()

        # TODO: disable other microscope interactions
        worker = microscope.consume_image_queue(save=True, parent_ui=self.params["parent_ui"])
        worker.returned.connect(return_func)  # type: ignore
        worker.start()

        time.sleep(1)

        # acquire image
        self.params["parent_ui"].image_queue, self.params["parent_ui"].stop_event = self.microscope.acquire_image(
            image_settings=self.params["settings"],
            sync_message=self.params["sync"],
            stop_event=self.params["parent_ui"].stop_event,
        )

class OpenLMSWorkflowStepMoveObjective(OpenLMSWorkflowStep):
    def __init__(self, name: str, params: dict):
        super().__init__(type="move_objective", name=name, params=params)

    def run(self, microscope: LightMicroscope, return_func=None, ):
        logging.info(f"Objective Move: {self.params['dz']}")
        worker = microscope.move_objective_stage(dz=self.params["dz"])
        worker.returned.connect(return_func)  # type: ignore
        worker.start()







### Worfklow

In [19]:
%load_ext autoreload
%autoreload 2

import numpy as np
import itertools
from pprint import pprint

from openlm.workflow import _gen_tiling_workflow, _gen_volume_workflow, _gen_workflow
from openlm.structures import ImageSettings, SynchroniserMessage

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [23]:


image_settings = ImageSettings()
sync_message = SynchroniserMessage(exposures=None, pins=None)

tile_coords = _gen_tiling_workflow(n_rows=2, n_cols=2, dx=100e-6, dy=100e-6)
# This gives us the relative x, y coordinates for each imaging position

volume_coords = _gen_volume_workflow(n_slices=3, step_size=5e-6)
# This gives us the relative z coordinates for each imaging position

workflow = _gen_workflow(tile_coords, volume_coords, 
                         image_settings=image_settings, 
                         sync_message=sync_message,
                         ) 


In [24]:
# print(f"Workflow Length: {len(workflow)}")
# pprint(workflow)

# counter how many dicts have type = "image"
from collections import Counter
counter = Counter([step["type"] for step in workflow])

pprint(counter)

# import pandas as pd
# df = pd.DataFrame(workflow)
# pd.set_option("display.max_rows", None)
# display(df)


Counter({'move_objective': 13, 'acquire_image': 12, 'move_stage': 4})


In [None]:


def test_gen_workflow(n_rows, n_cols, n_slices):
    initial_position = [0, 0, 0]

    tile_coords = _gen_tiling_workflow(n_rows=n_rows, n_cols=n_cols, dx=1, dy=1)
    volume_coords = _gen_volume_workflow(n_slices=n_slices, step_size=1)
    workflow = _gen_workflow(tile_coords, volume_coords, initial_position=initial_position)

    counter = Counter([step["type"] for step in workflow])

    assert counter["move_stage"] == n_rows * n_cols, f"Expected {n_rows * n_cols} move_stage steps, got {counter['move_stage']}"
    assert counter["move_objective"] == n_slices * n_cols * n_rows + 1, f"Expected {n_slices * n_cols * n_rows + 1} move_objective steps, got {counter['move_objective']}"
    assert counter["acquire_image"] == n_rows * n_cols * n_slices, f"Expected {n_rows * n_cols * n_slices} acquire_image steps, got {counter['acquire_image']}"


# test_gen_workflow(n_rows=2, n_cols=2, n_slices=1)
# test_gen_workflow(n_rows=2, n_cols=2, n_slices=2)
test_gen_workflow(n_rows=2, n_cols=2, n_slices=3)