In [6]:
"""
Run the illumination correction task after a custom init, which creates
a parallelization list with different entries for different channels.

NOTES:
1. The illumination-correction task used here is illumination_correction_B,
   which is only meant to run after the init_channel_paralllelization task.
2. Within the current version, in the intermediate state (after the init
   task) there exist the new empty images on-disk (in case of not
   overwriting input images) but they do not exist in the image list. This
   is consistent with the custom-parallelization-list execution branch (as
   opposed to the default-filtering scenario, where the init task already
   creates the new images in the image-list metadata).
3. Because of the creation of on-disk images within the init task, also the
   `overwrite_input` argument was moved from the actual
   illumination-correction (parallel) task into the init task. Communication
   with the parallel task takes place through an extra function argument
   `raw_path`.
""";

In [7]:
# Preliminary imports and auxiliary-function definition

from pathlib import Path
from devtools import debug
import tempfile

from fractal_server.app.runner.v2.models import Dataset, WorkflowTask
from fractal_server.app.runner.v2.runner import execute_tasks_v2
from fractal_server.images import SingleImage

from concurrent.futures import ThreadPoolExecutor

import sys
sys.path.append("../tests/v2/04_runner")
from fractal_tasks_core_mock import TASK_LIST

executor = ThreadPoolExecutor()


def image_data_exist_on_disk(image_list: list[SingleImage]):
    """
    Given an image list, check whether mock data were written to disk.
    """
    prefix = "[image_data_exist_on_disk]"
    all_images_have_data = True
    for image in image_list:
        if (Path(image.path) / "data").exists():
            print(f"{prefix} {image.path} contains data")
        else:
            print(f"{prefix} {image.path} does *not* contain data")
            all_images_have_data = False
    return all_images_have_data

In [8]:
# Create temporary directory for mocked zarrs
tmp_path = Path(tempfile.mkdtemp())
zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/")
print(f"{zarr_dir=}")

zarr_dir='/tmp/tmp52dbxn0l/zarr_dir'


In [9]:
# Run create_ome_zarr+yokogawa_to_zarr
dataset = execute_tasks_v2(
    wf_task_list=[
        WorkflowTask(
            task=TASK_LIST["create_ome_zarr_compound"],
            args_non_parallel=dict(image_dir="/tmp/input_images"),
        ),
    ],
    dataset=Dataset(zarr_dir=zarr_dir),
    executor=executor,
)

# Print current dataset information
debug(dataset);

[create_ome_zarr] START
[create_ome_zarr] image_dir='/tmp/input_images'
[create_ome_zarr] zarr_dir='/tmp/tmp52dbxn0l/zarr_dir'
[create_ome_zarr] zarr_path='/tmp/tmp52dbxn0l/zarr_dir/my_plate.zarr'
[create_ome_zarr] END
[yokogawa_to_zarr] START
[yokogawa_to_zarr] path='/tmp/tmp52dbxn0l/zarr_dir/my_plate.zarr/A/01/0'
[yokogawa_to_zarr] raw_path='/tmp/input_images/A_01_0.tif'
[yokogawa_to_zarr] START
[yokogawa_to_zarr] path='/tmp/tmp52dbxn0l/zarr_dir/my_plate.zarr/A/02/0'
[yokogawa_to_zarr] raw_path='/tmp/input_images/A_02_0.tif'
[yokogawa_to_zarr] END
[yokogawa_to_zarr] END
/tmp/ipykernel_135597/3454169311.py:14 <module>
    dataset: Dataset(
        id=None,
        history=[None],
        zarr_dir='/tmp/tmp52dbxn0l/zarr_dir',
        images=[
            SingleImage(
                path='/tmp/tmp52dbxn0l/zarr_dir/my_plate.zarr/A/01/0',
                attributes={
                    'well': 'A01',
                    'plate': 'my_plate.zarr',
                    'data_dimensionality'

In [11]:
# Run init_channel_parallelization
dataset = execute_tasks_v2(
    wf_task_list=[
        WorkflowTask(
            task=TASK_LIST["illumination_correction_compound"],
            args_non_parallel=dict(overwrite_input=True),
        ),
    ],
    dataset=dataset,
    executor=executor,
)

# Check that custom parallelization_list
assert len(dataset.images) == 2

for image in dataset.images[0:2]:
    assert image.attributes.get("illumination_correction") is True


# Print current dataset information
debug(dataset);

/tmp/ipykernel_135597/479568765.py:21 <module>
    dataset: Dataset(
        id=None,
        history=[
            None,
            None,
            None,
        ],
        zarr_dir='/tmp/tmp52dbxn0l/zarr_dir',
        images=[
            SingleImage(
                path='/tmp/tmp52dbxn0l/zarr_dir/my_plate.zarr/A/01/0',
                attributes={
                    'well': 'A01',
                    'plate': 'my_plate.zarr',
                    'data_dimensionality': 3,
                    'illumination_correction': True,
                },
            ),
            SingleImage(
                path='/tmp/tmp52dbxn0l/zarr_dir/my_plate.zarr/A/02/0',
                attributes={
                    'well': 'A02',
                    'plate': 'my_plate.zarr',
                    'data_dimensionality': 3,
                    'illumination_correction': True,
                },
            ),
        ],
        filters={
            'plate': 'my_plate.zarr',
            'data_dimen

In [None]:
# Run init_channel_parallelization
dataset = execute_tasks_v2(
    wf_task_list=[
        WorkflowTask(
            task=TASK_LIST["illumination_correction_B"],
        ),
    ],
    dataset=dataset,
    executor=executor,
)

# Print current dataset information
debug(dataset);

[illumination_correction_B] START
[illumination_correction_B] path='/tmp/tmppm9uwfi7/zarr_dir/my_plate.zarr/A/01/0'
[illumination_correction_B] raw_path='/tmp/tmppm9uwfi7/zarr_dir/my_plate.zarr/A/01/0'
[illumination_correction_B] subsets={'C_index': 1}
[illumination_correction_B] START
[illumination_correction_B] path='/tmp/tmppm9uwfi7/zarr_dir/my_plate.zarr/A/02/0'
[illumination_correction_B] raw_path='/tmp/tmppm9uwfi7/zarr_dir/my_plate.zarr/A/02/0'
[illumination_correction_B] subsets={'C_index': 0}
[illumination_correction] END
[illumination_correction] END
/tmp/ipykernel_116267/3866893277.py:16 <module>
    dataset: Dataset(
        id=None,
        history=[
            'create_ome_zarr',
            'yokogawa_to_zarr',
            'init_channel_parallelization',
            'illumination_correction_B',
        ],
        images=[
            SingleImage(
                path='/tmp/tmppm9uwfi7/zarr_dir/my_plate.zarr/A/01/0',
                attributes={
                    'well': 

In [None]:
executor.shutdown()