In [1]:
# IMPORTS
# - standard imports

import os
os.chdir('..')

import ipynbname

import os.path
from collections import namedtuple

# - additional imports
import numpy as np
import napari

# LOGGER
log = csutils.get_logger(__name__)

NameError: name 'csutils' is not defined

In [None]:
%load_ext autoreload
%autoreload 2

from utilities import *
import c_swain_python_utils as csutils
import prairie_view_imports as pvi
import imaging_dataset as imd

In [None]:
# GLOBAL VARIABLES
# - paths
sources_dir = os.path.abspath(os.getcwd())
config_dir = os.path.abspath(os.path.join(sources_dir, '..', 'configs', ))

# - logging setup
debug_mode = False
default_window_level = 'info'

run_tag = '00'

# - custom global variables
DatasetIngestSpec = namedtuple('DatasetLoadInfo',
                               ['name', 'load_kwargs', 'napari_kwargs'])
TRANSLATE_REL_KEY = 'translate_relative_to'

# environment update
os.environ.update({'NUMEXPR_MAX_THREADS': str(os.cpu_count())})

# custom functions
def get_config(config_name):
    config_path = os.path.join(config_dir, config_name)
    return ZMIAConfig(config_path)


def choose_dataset_loader(dataset_type):
    if dataset_type == 'two-photon':
        return pvi.load_dataset
    elif dataset_type == 'behavioral-camera':
        return AVIDataset.from_dataset_info
    elif dataset_type == 'confocal':
        return NISDataset.from_dataset_info
    else:
        msg = (f'Provided dataset type does not have an associated reader, '
               f'"{dataset_type}".')
        raise ValueError(msg)


# handle running of the job
if __name__ == '__main__':
    # get jobfile name without extension
    _file = ipynbname.name()

    # setup logging
    log_path = os.path.join(sources_dir, '..', 'logs', f'{_file}.log')
    csutils.apply_standard_logging_config(
        file_path=log_path,
        window_level='debug' if debug_mode else default_window_level,
        window_format='debug' if debug_mode else 'default')

In [None]:
# small matter of programing
config = get_config('cs-ii-14_shared-config.yml')

main_dataset_list = [
    DatasetIngestSpec('bidir-tz-vid-01', {}, {'colormap': 'bop orange'}),

    DatasetIngestSpec('tail-vid-01', {'pixel_pitch_um': 11.1, 'fps': 150},
                      {'translate': {imd.Dim.Y: -2920, imd.Dim.X: -470}}),

    DatasetIngestSpec('morph-ref-anterior', {},
                      {'colormap': 'bop blue',
                     TRANSLATE_REL_KEY: 'bidir-tz-vid-01'}),

    DatasetIngestSpec('morph-ref-anterior-ventral-append', {},
                      {TRANSLATE_REL_KEY: 'bidir-tz-vid-01',
                     'colormap': 'green'}),

    DatasetIngestSpec('fixed-tissue-gcamp-stain', {'lazy': False}, {}),
]

im_datasets = dict()

for spec in main_dataset_list[0:4]:
    log.info('Loading dataset info for: {} > {}.',
             config.name or '[unnamed]', spec.name)
    ds_info = config.get_dataset_info(name=spec.name)
    ds_loader = choose_dataset_loader(ds_info.type)

    log.info('Loading the dataset based on the configured type.')
    ds = ds_loader(dataset_info=ds_info, **spec.load_kwargs)

    log.info('Converting the loaded dataset into an ImagingDataset '
             'instance.')
    im_dataset = imd.ImagingDataset.from_dataset(ds)

    if isinstance(im_dataset, imd.ImagingDatasetDict):
        if (im_dataset.key_type
                is imd.DatasetListKeyType.PRAIRIE_VIEW_TRACK):
            log.info('Averaging multi-track morphology reference into a '
                     'single dataset.')
            im_stack = np.stack([imds.get_image_data()
                                 for imds in im_dataset.values()])
            new_im = np.mean(im_stack, axis=0, keepdims=False)
            im_dataset = imd.ImagingDataset(
                path=im_dataset[0].path,
                name=ds.name,
                image_ndarray=new_im,
                dimensions=im_dataset[0].dimensions,
                coordinates=im_dataset[0].coordinates,
                position=im_dataset[0].position,
                raw_data=ds)

    if not isinstance(im_dataset, imd.ImagingDataset):
            msg = (f'Unexpected datatype for im_dataset received, got an '
                   f'instance of {im_dataset.__class__.__name__}.')
            raise RuntimeError(msg)

    log.info('\nDataset {} Coordinate Pitches', im_dataset.name)
    for d, coord in im_dataset.coordinates.items():
        if d not in (imd.Dim.X, imd.Dim.Y, imd.Dim.Z, imd.Dim.TIME):
            continue
        log.info('> {:s} - {:.3f} {:s}', d, coord.median_pitch, coord.unit)

    im_datasets[spec.name] = im_dataset

    log.info('Dataset {:s} is successfully loaded.', repr(im_dataset))

    log.info('Veiwing Dataset with napari.')
    viewer = napari.Viewer()
    im_dataset.add_to_napari(viewer, colormap='magma')
    imd.ImagingDataset.apply_standard_napari_config(viewer)
    napari.run()
    log.info('Napari session ended.')

In [None]:
# OVERLAYING 2 PHOTON DATA
two_photon_data_idxs = [0, 2, 3]
viewer2 = napari.Viewer()

world_extents = dict()
for i_dataset in two_photon_data_idxs:
    spec = main_dataset_list[i_dataset]
    im_dataset = im_datasets[spec.name]

    position = im_dataset.position

    if TRANSLATE_REL_KEY in spec.napari_kwargs:
        translate_rel_to_idx = spec.napari_kwargs.pop(TRANSLATE_REL_KEY)
        ref_loc = im_datasets[translate_rel_to_idx].position
        current_loc = im_dataset.position
        translate_dict = dict()
        for dim in (imd.Dim.Z, imd.Dim.Y, imd.Dim.X):
            translate_dict[dim] = current_loc[dim] - ref_loc[dim]
        spec.napari_kwargs['translate'] = translate_dict

    for dim in im_dataset.dimensions:
        if not dim.is_numeric:
            continue

        im_extent = im_dataset.extents[dim]

        try:
            current_extent = world_extents[dim]
        except KeyError:
            current_extent = world_extents[dim] = im_extent

        world_extents[dim] = [
            min(current_extent[0], im_extent[0]),
            max(current_extent[1], im_extent[1])]

for i_dataset in two_photon_data_idxs:
    spec = main_dataset_list[i_dataset]
    im_dataset = im_datasets[spec.name]

    log.info('Adding dataset {} to napari as a layer.', im_dataset)

    imd.ImagingDataset.add_to_napari(im_dataset,
                                     viewer2,
                                     blending='additive',
                                     with_world_extents=world_extents,
                                     **spec.napari_kwargs)

log.info('Displaying over-layed two-photon data in napari.')
imd.ImagingDataset.apply_standard_napari_config(viewer2)
napari.run()
log.info('Napari session ended.')