In [1]:
from PIL import Image
import numpy as np
import napari
import ipywidgets as widgets
import os
import math
from cellpose import models
import torch
from readlif.reader import LifFile


# turn on for 4 channel images
import sys
sys.path.append('../')
# from readlif_versions.readlif_fix.reader import LifFile

In [2]:
def find_name_and_replicate_info(mosaic_indication, pre_img, m):
    if mosaic_indication:
        name = pre_img.name
        if '/' in name:
            row = name.split('/')[-2]
            col = name.split('/')[-1]
            image_name = f'{row}{col}_{m}'
            well_sample = image_name.split('_')[:-1][0]
            replicate = m
        else:
            image_name = name
            try:
                well_sample = '/'.join(image_name.split('_'))  
                replicate = m
            except:
                well_sample = image_name
                replicate = '1'
    else:
        name = pre_img.name
        region_scan = False
        if '/' in name:
            # get the last part of the name, to get rid of the path
            image_name = name.split('/')[-1]
            #check the first image in the queue if it is a region scan, assume the entire file is built like this
            region_names = ['R'+str(x) for x in range(1,51)]
            if image_name in region_names:
                region_scan = True          
                region = name.split('/')[-1]
                well = f"{name.split('/')[-3]}{name.split('/')[-2]}"
                rest = '_'.join(name.split('/')[:-3])
                # buildup default variables
                image_name = image_name
                well_sample = '_'.join([rest, well])
                replicate = region
                print(image_name, well_sample, replicate)
        else:
            image_name = name
        if region_scan == False:
            try:
                well_sample = '_'.join(image_name.split('_')[:-1])
                replicate = image_name.split('_')[-1]
            except:
                well_sample = image_name
                replicate = '1'
        
    return image_name, well_sample, replicate

def collect_images(raw_data, img_sel):
    image_dict_list = []

    # check how many stacks, channels and mosaics there are
    raw_image = raw_data.get_image(img_sel)
    image_name = raw_image.name
    scale = tuple(abs(1/x) for x in raw_image.scale if x != None)
    z_nr_list = list(range(0,len([i for i in raw_image.get_iter_z()])))
    c_nr_list = list(range(0,len([i for i in raw_image.get_iter_c()])))
    m_nr_list = list(range(0,len([i for i in raw_image.get_iter_m()])))
    DimX = raw_image.dims[0]
    DimY = raw_image.dims[1]
    DimZ = raw_image.dims[2]

    # get info on if it is a mosaic tile
    M_positions = raw_image.mosaic_position # list with lenght of nr mosaics, each tuple contains (FieldX, FieldY, PosX, PosY)
    mosaic_list = list(range(0,len(M_positions)))
    mosaic_indication = True
    if mosaic_list == []:
        mosaic_list = [0]
        mosaic_indication = False

    for m in m_nr_list:
        name, well_sample, replicate  = find_name_and_replicate_info(mosaic_indication, raw_image, m)
        if 'Merged' in name:
            continue

        # collect info
        image_dict = {}
        image_dict['image_name'] = name
        image_dict['well_sample'] = well_sample
        image_dict['replicate'] = replicate
        image_dict['i_nr'] = img_sel
        image_dict['m_nr'] = m
        image_dict['scale'] = scale
        image_dict['M_positions'] = M_positions
        image_dict['DimX'] = DimX
        image_dict['DimY'] = DimY
        image_dict['DimZ'] = DimZ
        # create 2d numpy array's per channel
        for c_nr in c_nr_list:
            for z_nr in z_nr_list:
                if z_nr == 0:
                    layers = []
                
                layer = np.asarray(raw_image.get_frame(z = z_nr, t = 0, c = c_nr, m = m))
                if np.max(layer)>255:
                    # print('The image is not 8 bit, transforming to 8 bit for visualization.')
                    layer = convert_intensity_range(layer, 0, 255, np.uint8)
                layers.append(layer)
            layers = np.stack(layers, axis = 2)  
            image_dict[f'channel_{c_nr}_image'] = layers
        
        image_dict_list.append(image_dict)
        
    return image_dict_list

def get_rgb(color):
    if color == 'blue':
        rgb = (0,0,1)
    elif color == 'green':
        rgb = (0,1,0)
    elif color == 'red':
        rgb = (1,0,0)
    elif color == 'yellow':
        rgb = (1,1,0)
    elif color == 'cyan':
        rgb = (0,1,1)
    elif color == 'magenta':
        rgb = (1,0,1)
    return rgb

def convert_intensity_range(img, target_type_min, target_type_max, target_type, min_quantile = False, max_quantile = False):
    imin = img.min()
    # correct if the image is binary and find min value
    if imin == False:
        img = img*1
        imin = img.min()
    if min_quantile != False:
        imin = np.quantile(img, q = min_quantile)
        
    # find max value
    imax = img.max()
    if max_quantile != False:
        imax = np.quantile(img, q = max_quantile)
    
    # build new image
    a = (target_type_max - target_type_min) / (imax - imin)
    b = target_type_max - a * imax
    new_img = (a * img + b)
    new_img[new_img>255] = 255
    new_img[new_img<0] = 0
    new_img = new_img.astype(target_type)
    return new_img

def segment_nuclei(image_dict, nucleus_channel, model):
    nucleus_image = image_dict[f"{nucleus_channel} image"] 

    # segment nuclei using cellpose for quick and easy use
    # model =  models.Cellpose(gpu=True, model_type='nuclei')
    nuc_labels, flows, styles, diams = model.eval(nucleus_image, diameter = 18, channels = [0,0], net_avg=False, mask_threshold=0.01, rescale = 0.5, tile = False)
    torch.cuda.empty_cache()
    nuc_mask = nuc_labels > 0

    # collect results in image_dict
    image_dict[f"{nucleus_channel} mask image"] = nuc_mask
    image_dict[f"{nucleus_channel} labels image"] = nuc_labels
    return image_dict

def organize_channels(image_dict, channel_list, channel_info):
    # you have to have at least nuclei and myotubes to do something with the script.
    ch_name_list = [x.split('_col')[0] for x in list(channel_info.keys())]
    if 'Nuclei' not in ch_name_list:
        print('The nuclei channel has not been defined. This file is skipped until filled in correctly.', flush=True)
        return None

    if len(list(set(['Desmin', 'Myosin', 'Actin']) - set(ch_name_list))) > 2: # these 3 channels now describe the myotubes. If more need to be added to this list, don't forget to update the workflow manager too.
        print('No myotube channel has been defined. This file is skipped until filled in correctly.', flush=True)
        print(len(list(set(['Desmin', 'Myosin', 'Actin']) - set(ch_name_list))))
        return None

    # bind the channel descriptions to the channel images
    for x, channel_name in enumerate(ch_name_list):
        if channel_name == None:
            continue
        try:
            if f'{channel_name} image' not in image_dict.keys():
                image_dict[f'{channel_name} image'] = channel_list[x]
                image_dict[f'{channel_name} color'] = channel_info[f'{channel_name}_col']
            else:
                print('It seems like you have filled in 2 channels as the same. This file is skipped until filled in correctly.', flush=True)
                return None
        except:
            print(f'It seems that there are not that many channels in the image. This file is skipped until filled in correctly.', flush=True)
            return None
    return image_dict

def load_images(data_source, data_link, img, n, channel_info):
    model =  models.Cellpose(gpu=True, model_type='nuclei')
    image_dict_list = []
    if data_source == 'HCA':
        # set the image name
        img_name = img.split('_')[-1] 

        # set HCA specific parameters
        scale = (1,1)

        # check how many channels this image has
        path = './tmp/HCA-images/'
        all_image_list =[f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
        n_channels_this_image = len([x for x in all_image_list if img_name in x])

        large_image_list = []
        for c in range(n_channels_this_image):
            large_image = np.asarray(Image.open(f'{path}{img}_sx_1_sy_1_w{c+1}.tif').convert('I'))
            large_image_list.append(large_image)
        
        # assumes the image is 5x5, an entire 96 well
        m = 1
        for row in range(0,5, 1):
            for col in range(0,5,1):
                channel_list = []
                for large_image, channel_description in zip(large_image_list, channel_info):
                    tile_size_x = math.floor(large_image.shape[0]/5)
                    tile_size_y = math.floor(large_image.shape[1]/5)
                    mosaic_borders = [row*tile_size_x, (row+1)*tile_size_x, col*tile_size_y, (col+1)*tile_size_y]
                    small_image = large_image[mosaic_borders[0]:mosaic_borders[1], mosaic_borders[2]:mosaic_borders[3]]
                    if all(x not in channel_description for x in ['EDU', 'Transcription factor 1','Transcription factor 2']):
                        small_image = convert_intensity_range(small_image, 0,255, np.uint8, min_quantile = 0.05, max_quantile=0.95)
                    channel_list.append(small_image)

                # collect info
                image_dict = {}
                image_dict['data_link'] = data_link
                image_dict['data_source'] = data_source
                image_dict['i_nr'] = n+1
                image_dict['m_nr'] = m
                image_dict['total_picture_dimensions'] = large_image.shape
                image_dict['mosaic_borders'] = mosaic_borders
                image_dict['name'] = img
                image_dict['scale'] = scale
                image_dict['well_sample'] = img_name        
                image_dict['replicate'] = m
                image_dict['results'] = {}
                image_dict = organize_channels(image_dict, channel_list, channel_info)
                image_dict = segment_nuclei(image_dict, 'Nuclei', model) # this needs to be here since it is a GPU process
                image_dict_list.append(image_dict)
                m += 1

    elif data_source == 'LasX':
        # define the path and load the data
        path = "./tmp/lif-files/lif-file.lif"
        raw_data = LifFile(path)

        # check how many stacks, channels and mosaics there are
        raw_image = raw_data.get_image(img)
        scale = tuple(abs(1/x) for x in raw_image.scale if x != None)
        z_nr_list = list(range(0,len([i for i in raw_image.get_iter_z()])))
        c_nr_list = list(range(0,len([i for i in raw_image.get_iter_c()])))
        m_nr_list = list(range(0,len([i for i in raw_image.get_iter_m()])))
        DimX = raw_image.dims[0]
        DimY = raw_image.dims[1]

        # get info on if it is a mosaic tile
        M_positions = raw_image.mosaic_position # list with lenght of nr mosaics, each tuple contains (FieldX, FieldY, PosX, PosY)
        mosaic_list = list(range(0,len(M_positions)))
        mosaic_indication = 1
        if mosaic_list == []:
            mosaic_list = [0]
            mosaic_indication = 0 

        for m in m_nr_list:
            # create numpy array's per channel
            channel_list = []
            for c_nr, channel_description in zip(c_nr_list, channel_info):
                for z_nr in z_nr_list:
                    print(f"loading mosaic {m}, channel {c} and z{z_nr}", flush = True)
                    if z_nr == 0:
                        layers = []
                    
                    layer = np.asarray(raw_image.get_frame(z = z_nr, t = 0, c = c_nr, m = m))
                    if all(x not in channel_description for x in ['EDU', 'Transcription factor 1','Transcription factor 2']): # fill this list with images from which the info needs to be retained
                        layer = convert_intensity_range(layer, 0,255, np.uint8, min_quantile = 0.05, max_quantile=0.99)
                    layers.append(layer)
                layers = np.stack(layers, axis = 2)
                # make the image 2d (sufficient for this analysis)
                if len(layers.shape) > 2:
                    layers = np.max(layers, axis=2)
                channel_list.append(layers)

            # get image name
            name, well_sample, replicate  = find_name_and_replicate_info(mosaic_indication, raw_image, m)

            # collect info
            image_dict = {}
            image_dict['data_link'] = data_link
            image_dict['data_source'] = data_source
            image_dict['i_nr'] = img
            image_dict['m_nr'] = m
            image_dict['name'] = name
            image_dict['well_sample'] = well_sample
            image_dict['replicate'] = replicate
            image_dict['scale'] = (scale[0], scale[1])
            image_dict['M_positions'] =  M_positions
            image_dict['DimX'] =  DimX
            image_dict['DimY'] =  DimY
            image_dict['results'] = {}
            image_dict = organize_channels(image_dict, channel_list, channel_info)
            image_dict = segment_nuclei(image_dict, 'Nuclei', model) # this needs to be here since it is a GPU process
            image_dict_list.append(image_dict)

    return image_dict_list
    

In [3]:
location = 'test-files'
file_list = [f for f in os.listdir(location) if os.path.isfile(os.path.join(location, f))]

file_selection = widgets.Dropdown(options = file_list, description = 'Fi name')
display(file_selection)

Dropdown(description='Fi name', options=('SA_22.083_08-07-2022-new.lif', '22.088_22.103_TH_20220816.lif', '17.…

In [6]:
raw_data = LifFile(f"{location}/{file_selection.value}")
image_list = list(range(0,len([i for i in raw_data.get_iter_image()])))
image_name_list = []
for i in image_list:
    raw_image = raw_data.get_image(i)
    image_name_list.append(raw_image.name)

image_selection = widgets.Dropdown(options = image_name_list, description = 'Image name')
display(image_selection)

Dropdown(description='Image name', options=('Plate1/A2', 'Plate1/A3', 'Plate1/A4', 'Plate1/A1', 'Plate1/B1', '…

In [7]:
# set to true if you want to clean up the viewers
close_existing_viewer = True
if close_existing_viewer == True:
    try:
        viewer.close()
    except:
        print("No viewer was opened yet")
image_sel = [n for n, x in enumerate(image_name_list) if x == image_selection.value][0]
image_dict_list = collect_images(raw_data, image_sel)

No viewer was opened yet


In [8]:
# build the large image framework
image_dict = image_dict_list[0]
# find image dimensions
DimX = image_dict['DimX']
DimY = image_dict['DimY']
M_positions = image_dict['M_positions']
scale = image_dict['scale']

if M_positions != []:
    empty_image = None
    if empty_image==None:
        # find MaxX and MaxY
        MaxX = max(M_positions, key= lambda x: x[0])[0]
        MaxY = max(M_positions, key= lambda x: x[1])[1]
        MaxZ = image_dict['DimZ']
        empty_image = np.zeros(((DimY * (MaxY+1)), (DimX * (MaxX+1)), MaxZ))
else:
    empty_image = np.zeros_like(image_dict['channel_0_image'])

if empty_image.shape[0] > 2048 and empty_image.shape[2] > 10:
    print(f"Since the image shape is {empty_image.shape}, it is recommended to visualize individual mosaics.")
    print(f"There are {len(M_positions)} mosaic tiles.")
    vis_mosaic = True
else:
    print(f"Since the image shape is {empty_image.shape}, you can likely visualize the entire image.")
    vis_mosaic = False

# uncomment if you want to see the entire image
# vis_mosaic = False

# build the large image for all channels
if vis_mosaic == False:
    visualization_image_dict = {}
    for image_dict in image_dict_list:
        # find sub-image specific x-y coords        
        m_nr = image_dict['m_nr']
        if M_positions != []:
            FieldX = M_positions[m_nr][0]
            FieldY = M_positions[m_nr][1]

            # find coordinates in merged image
            Min_PixX = FieldX * DimX
            Max_PixX = ((FieldX+1) * DimX)
            Min_PixY = FieldY * DimY
            Max_PixY = ((FieldY+1) * DimY)
            
            for name, channel_image in [(name, img) for name, img in image_dict.items() if 'channel' in name ]:
                if name not in visualization_image_dict.keys():
                    visualization_image_dict[name] = empty_image.copy()
           
                # fill in the image
                visualization_image_dict[name][Min_PixY:Max_PixY, Min_PixX:Max_PixX, 0:MaxZ] = channel_image
        else:
            for name, channel_image in [(name, img) for name, img in image_dict.items() if 'channel' in name ]:
                visualization_image_dict[name] = channel_image
                
elif vis_mosaic == True:
    mosaic_selection = widgets.IntSlider(min=0,max=(len(M_positions)-1))
    display(mosaic_selection)

Since the image shape is (5120, 5120, 1), you can likely visualize the entire image.


In [9]:
if vis_mosaic == True:
    mosaic_sel = mosaic_selection.value
    visualization_image_dict = {}
    for image_dict in image_dict_list:
        if image_dict['m_nr'] == mosaic_sel:
            for name, channel_image in [(name, img) for name, img in image_dict.items() if 'channel' in name ]:
                if name not in visualization_image_dict.keys():
                    visualization_image_dict[name] = channel_image
                    
color_list = ['blue', 'green', 'red', 'yellow', 'magenta', 'cyan']

if close_existing_viewer == True:
    try:
        viewer.close()
    except:
        print("No viewer was opened yet")

viewer = napari.Viewer()
for n,image in enumerate(visualization_image_dict.values()):
    if image.shape[2] == 1:
        image = np.squeeze(image, axis = 2)
    viewer.add_image(convert_intensity_range(image, 0, 255, np.uint8), colormap=color_list[n], blending='additive', scale = scale, contrast_limits=(10, 255))

viewer.dims.ndisplay = 3
viewer.dims.order = (2,1,0)


No viewer was opened yet


v0.5.0. It is considered an "implementation detail" of the napari
application, not part of the napari viewer model. If your use case
requires access to qt_viewer, please open an issue to discuss.
  self.tools_menu = ToolsMenu(self, self.qt_viewer.viewer)


# for maximum projection images

In [None]:
# if vis_mosaic == True:
#     mosaic_sel = mosaic_selection.value
#     visualization_image_dict = {}
#     for image_dict in image_dict_list:
#         if image_dict['m_nr'] == mosaic_sel:
#             for name, channel_image in [(name, img) for name, img in image_dict.items() if 'channel' in name ]:
#                 if name not in visualization_image_dict.keys():
#                     visualization_image_dict[name] = np.max(channel_image, axis = 2)

# color_list = ['blue', 'green', 'red', 'yellow', 'magenta', 'cyan']

# viewer = napari.Viewer()
# for n,image in enumerate(visualization_image_dict.values()):
#     viewer.add_image(convert_intensity_range(image, 0, 255, np.uint8), colormap=color_list[n], blending='additive', scale = (scale[0], scale[1]), contrast_limits=(10, 255))

# # viewer.dims.ndisplay = 3
# # viewer.dims.order = (2,1,0)