In [1]:
import numpy as np
import os

import vodex as vx
import numan as nu
import tifffile as tif
from patchify import patchify

## create test 4D movie: t=20 x,y,z=10x10x10 

In [2]:
test_movie = np.zeros((20,10,10,10), dtype=np.int16)
test_movie[:,0:2,3:5,2:4]=1
test_movie[:,0:2,3:5,4:6]=10
test_movie = test_movie*np.arange(20)[:,None,None,None]
#print(test_movie)
tif.imwrite('test_movie.tif', test_movie.astype(np.int16), shape=(20,10,10,10), 
            metadata={'axes': 'TZYX'}, imagej=True)

## create test 3D mask 10x10x10 (2 masks, 1&2 values)

In [3]:
test_data = np.zeros((10,10,10), dtype=np.int16)
test_data[0:5,0:5,0:5]=1
test_data[5:,5:,5:]=2
tif.imwrite('test_mask.tif', test_data, imagej=True)

# define class Voxelizer

In [12]:
class Voxelizer:
    
    def __init__(self, mask_file, superVoxel_size, roi_id):
        self.SV_size = superVoxel_size # order: z,y,x
        self.roi_id = roi_id
        # define binary mask for the stack volume
        self.mask = self._load_mask(mask_file)
        # define mask bounding box on mask
        self.bbox_lim = self._get_bbox()
        # define normalization factors for super voxels (SV) inside a mask (normalization = number of mask voxels in a SV)
        self.SV_normalization = self._get_mask_in_SV()
        self.cell_avg = None
        
    def process_movie(self, experiment, n_vol): #patchify in SV and calculated avg signal inside each SV
        vol_count = 0
        tot_vol = experiment.n_volumes
        self.cell_avg = np.zeros((tot_vol, self.SV_normalization.shape[0],self.SV_normalization.shape[1],self.SV_normalization.shape[2]))
        chuncks = experiment.batch_volumes(n_vol, full_only=True, overlap=0)
        for chunck in chuncks:
            data = experiment.load_volumes(chunck, verbose=False)
            # cycle across volumes (i.e. time points)
            for vol in data:
                masked_data = vol*self.mask
                bbox_data = masked_data[self.bbox_lim[0]:self.bbox_lim[1], self.bbox_lim[2]:self.bbox_lim[3], self.bbox_lim[4]:self.bbox_lim[5]]
                data_patched = patchify(bbox_data, self.SV_size, step=self.SV_size)
                sum_data = np.zeros((data_patched.shape[0],data_patched.shape[1],data_patched.shape[2]))
                # cycle across all patches in a volume
                for z in np.arange(data_patched.shape[0]):
                    for y in np.arange(data_patched.shape[1]):
                        for x in np.arange(data_patched.shape[2]):
                           sum_data[z,y,x] = sum(sum(sum(data_patched[z,y,x,:,:,:])))
                # this is a matrix of avg intensity values for all patches in the specific volume
                avg_data = sum_data/self.SV_normalization
                #print(avg_data)
                self._save_to_cell_table(avg_data, vol_count)
                vol_count+=1
        # this is only last element process, no real meaning to extract, only to test
        return avg_data
    
    def create_cell_signal_table(self, experiment): # from patched avg data, create a 2D table timeXcells
        if self.cell_avg.any() != None:
            cell_avg_matrix = self.cell_avg.reshape(experiment.n_volumes,-1)
            return cell_avg_matrix  # volumes(time) X SV(cells)
        else:
            return None
    
    def save_ROI_to_file(self, path):   
        tif.imwrite(path+'ROI'+str(self.roi_id)+'.tif', self.cell_avg.astype(np.int16), imagej=True)
        
    def _load_mask(self, mask_file):
        im = tif.imread(mask_file)
        np_seg = np.array(im)
        roi_mask = np.where(np_seg == self.roi_id, 1, 0)
        return roi_mask
    
    def _get_bbox(self):
        bbox_lim = 0, 0, 0, 0, 0, 0 # order: z,y,x
        if len(self.mask) != 0 and len(self.mask[2]) != 0 and len(self.mask[1]) != 0 and len(self.mask[0]) != 0:
            temporal_indicies = np.where(np.any(self.mask, axis=0))[0]
            vertical_indicies = np.where(np.any(self.mask, axis=1))[0]
            horizontal_indicies = np.where(np.any(self.mask, axis=2))[0]
            z_min, z_max = temporal_indicies[[0, -1]]
            y_min, y_max = vertical_indicies[[0, -1]]
            x_min, x_max = horizontal_indicies[[0, -1]]
            z_max += 1
            y_max += 1
            x_max += 1
            
            # adjust bounding box to fit an exact number of super voxels
            if self.SV_size != None:
                if (x_max-x_min)%self.SV_size[2] != 0:
                    x_max += self.SV_size[2]-((x_max-x_min)%self.SV_size[2])
                if (y_max-y_min)%self.SV_size[1] != 0:
                    y_max += self.SV_size[1]-((y_max-y_min)%self.SV_size[1])
                if (z_max-z_min)%self.SV_size[0] != 0:
                    z_max += self.SV_size[0]-((z_max-z_min)%self.SV_size[0])
            
            # adjust bounding box to fit volume dimentions (if it exceed it is shifted back)
            if z_max > self.mask.shape[0]:
                z_min -= (z_max - self.mask.shape[0])
                z_max -= (z_max - self.mask.shape[0])
            if y_max > self.mask.shape[1]:
                y_min -= (y_max - self.mask.shape[1])
                y_max -= (y_max - self.mask.shape[1])
            if x_max > self.mask.shape[2]:
                x_min -= (x_max - self.mask.shape[2])
                x_max -= (x_max - self.mask.shape[2])
                
            bbox_lim = [x_min, x_max, y_min, y_max, z_min, z_max]
        else:
            # Handle error case where segmentation image cannot be read or is empty
            print("Error: Segmentation image could not be read or is empty.")           
        return bbox_lim
    
    def _get_mask_in_SV(self):
        bbox = self.mask[self.bbox_lim[0]:self.bbox_lim[1],self.bbox_lim[2]:self.bbox_lim[3],self.bbox_lim[4]:self.bbox_lim[5]]
        patches = patchify(bbox, self.SV_size, step=self.SV_size)
        normalization_voxels = np.zeros((patches.shape[0],patches.shape[1],patches.shape[2]))
        for z in np.arange(patches.shape[0]):
            for y in np.arange(patches.shape[1]):
                for x in np.arange(patches.shape[2]):
                    normalization_voxels[z,y,x] = sum(sum(sum(patches[z,y,x,:,:,:])))
        return normalization_voxels
    
    def _save_to_cell_table(self, data, index):   
        for z in np.arange(data.shape[0]):
            for y in np.arange(data.shape[1]):
                for x in np.arange(data.shape[2]):
                    self.cell_avg[index,z,y,x]=data[z,y,x]

# Test

In [7]:
movie_dir = "test_movie"
mask_file = "test_mask.tif"
superVoxel_size = [2,2,2] # order: z,y,x
roi_id = 1 #select mask, the test checks fit mask 1, not 2
n_vol_batch = 3

frames_per_volume = 10
starting_slice = 0
experiment = vx.Experiment.from_dir(movie_dir, frames_per_volume, starting_slice, verbose=True)

voxelizer = Voxelizer(mask_file, superVoxel_size, roi_id)
#voxelizer.process_movie(experiment, n_vol_batch)
#final_cell_matrix = voxelizer.create_cell_signal_table(experiment)

Image files information :

files directory: test_movie
files [number of frames]: 
0) test_movie.tif [200]

Total frames : 200
Volumes start on frame : 0
Total good volumes : 20
Frames per volume : 10
Tailing frames (not a full volume , at the end) : 0



### test _load_mask()

In [8]:
mask = voxelizer._load_mask(mask_file)
print(mask.shape)
sum(sum(sum(mask))) == 125

(10, 10, 10)


True

### test _get_bbox()

In [9]:
bbox_lim = voxelizer._get_bbox()
print(bbox_lim)
bbox_lim == [0, 6, 0, 6, 0, 6]

[0, 6, 0, 6, 0, 6]


True

### test _get_mask_in_SV()

In [10]:
normalization_voxels = voxelizer._get_mask_in_SV()
# this is true for test mask 1
normalization_voxels == [[[8.,8.,4.],[8.,8.,4.],[4.,4.,2.]],[[8.,8.,4.],[8.,8.,4.],[4.,4.,2.]],[[4.,4.,2.],[4.,4.,2.],[2.,2.,1.]]]

array([[[ True,  True,  True],
        [ True,  True,  True],
        [ True,  True,  True]],

       [[ True,  True,  True],
        [ True,  True,  True],
        [ True,  True,  True]],

       [[ True,  True,  True],
        [ True,  True,  True],
        [ True,  True,  True]]])

### test process_movie()

In [11]:
last_avg = voxelizer.process_movie(experiment, n_vol_batch)
# this is true for test mask 1
last_avg == [[[0.,0.,0.],[0.,9.5,95.],[0.,19.,190.]],[[0.,0.,0.],[0.,0.,0.],[0.,0.,0.]],[[0.,0.,0.],[0.,0.,0.],[0.,0.,0.]]]

UnboundLocalError: local variable 'data_patched' referenced before assignment

### test create_cell_signal_table(experiment)

In [80]:
final_cell_matrix = voxelizer.create_cell_signal_table(experiment)
final_cell_matrix.shape == (20,27) # volumes(time) X SV(cells)

True

### test save_ROI_to_file(path)

In [17]:
path = (movie_file+'/ROI_patched/')
check_folder = os.path.isdir(path)
# If folder doesn't exist, then create it.
if not check_folder:
    os.makedirs(path)
    print("created folder : ", path)
else:
    print(path, "folder already exists.")
voxelizer.save_ROI_to_file(movie_file+'/ROI_patched/')
voxelizer.cell_avg.shape

test_movie/ROI_patched/ folder already exists.


(20, 3, 3, 3)

# Run complete analysis

In [16]:
movie_file = "test_movie"
mask_file = "test_mask.tif"
superVoxel_size = [2,2,2]
roi_id = 1
n_vol_batch = 5

frames_per_volume = 10
starting_slice = 0
experiment = vx.Experiment.from_dir(movie_file, frames_per_volume, starting_slice, verbose=True)

voxelizer = Voxelizer(mask_file, superVoxel_size, roi_id)
voxelizer.process_movie(experiment, n_vol_batch)
final_cell_matrix = voxelizer.create_cell_signal_table(experiment)
print(final_cell_matrix.shape) # volumes(time) X SV(cells)

path = (movie_file+'/ROI_patched/')
check_folder = os.path.isdir(path)
# If folder doesn't exist, then create it.
if not check_folder:
    os.makedirs(path)
    print("created folder : ", path)
else:
    print(path, "folder already exists.")
voxelizer.save_ROI_to_file(movie_file+'/ROI_patched/')

Image files information :

files directory: test_movie
files [number of frames]: 
0) test_movie.tif [200]

Total frames : 200
Volumes start on frame : 0
Total good volumes : 20
Frames per volume : 10
Tailing frames (not a full volume , at the end) : 0

(20, 27)
test_movie/ROI_patched/ folder already exists.
