## Inputs
---

In [1]:
# reading data and working with arrays
import zarr, nrrd
import numpy as np

# data paths
p = '/nrs/scicompsoft/rokicki/flyefish/output'
fix_path = p + '/G1_B1_F_R1_Gpb5_488_Eh_546_CCAP_669_1x/stitching/export.n5'
mov_path = p + '/G1_B1_F_R2_CCHa1_488_ILP7_546_RyA_669_1x/stitching/export.n5'
exp_factor = 1

out_p = '/nrs/scicompsoft/rokicki/flyefish/output_bigstream'

# load fix data and spacing
fix_zarr = zarr.open(store=zarr.N5Store(fix_path), mode='r')
fix_meta = fix_zarr.attrs.asdict()
fix_spacing = np.array(fix_meta['pixelResolution']['dimensions'][::-1]) / exp_factor
fix_spacing_s1 = fix_spacing * [1, 2, 2]
fix_spacing_s2 = fix_spacing * [2, 4, 4]
fix_spacing_s3 = fix_spacing * [4, 8, 8]
print(f"fix_spacing_s3: {fix_spacing_s3}")

# load mov data and spacing
mov_zarr = zarr.open(store=zarr.N5Store(mov_path), mode='r')
mov_meta = mov_zarr.attrs.asdict()
mov_spacing = np.array(mov_meta['pixelResolution']['dimensions'][::-1]) / exp_factor
mov_spacing_s1 = mov_spacing * [1, 2, 2]
mov_spacing_s2 = mov_spacing * [2, 4, 4]
mov_spacing_s3 = mov_spacing * [4, 8, 8]
print(f"mov_spacing_s3: {mov_spacing_s3}")

# # write some channels
# nrrd.write(out_p+'/fix.nrrd', fix_zarr['/c3/s3'][...].transpose(2,1,0), compression_level=2)
# nrrd.write(out_p+'/mov.nrrd', mov_zarr['/c3/s3'][...].transpose(2,1,0), compression_level=2)

fix_spacing_s3: [1.68 1.84 1.84]
mov_spacing_s3: [1.68 1.84 1.84]


  fix_zarr = zarr.open(store=zarr.N5Store(fix_path), mode='r')
  mov_zarr = zarr.open(store=zarr.N5Store(mov_path), mode='r')


## Masking
---

### fixed

In [11]:
# tools for coarse whole brain segmentation
from bigstream import level_set
from scipy.ndimage import zoom, binary_closing, binary_dilation

# get small mask
fix = fix_zarr['/c3/s3'][...]
fix_skip = fix[::4, ::4, ::4]
skip_spacing = fix_spacing_s3 * [4, 4, 4]
fix_mask = level_set.foreground_segmentation(
    fix_skip, skip_spacing,
    mask_smoothing=1,
    iterations=[80,40,10],
    smooth_sigmas=[6,3,1.5],
    lambda2=16.0,
    background=0,
    return_largest_cc_only=False
)

# enlarge and smooth mask
fix_mask = binary_closing(fix_mask, np.ones((5,5,5))).astype(np.uint8)
fix_mask = binary_dilation(fix_mask, np.ones((5,5,5))).astype(np.uint8)
fix_mask = zoom(fix_mask, np.array(fix.shape) / fix_skip.shape, order=0)

# write result
nrrd.write(out_p+'/fix_mask.nrrd', fix_mask.transpose(2,1,0), compression_level=2)

# load precomputed mask
fix_mask = nrrd.read(out_p+'/fix_mask.nrrd')[0].transpose(2,1,0)

### moving

In [12]:
# tools for coarse whole brain segmentation
from bigstream import level_set
from scipy.ndimage import zoom, binary_closing, binary_dilation

# get small mask
mov = mov_zarr['/c3/s3'][...]
mov_skip = mov[::4, ::4, ::4]
skip_spacing = mov_spacing_s3 * [4, 4, 4]

mov_mask = level_set.foreground_segmentation(
    mov_skip, skip_spacing,
    mask_smoothing=1,
    iterations=[80,40,10],
    smooth_sigmas=[6,3,1.5],
    lambda2=16.0,
    background=0,
    return_largest_cc_only=False
)

# enlarge and smooth mask
mov_mask = binary_closing(mov_mask, np.ones((5,5,5))).astype(np.uint8)
mov_mask = binary_dilation(mov_mask, np.ones((5,5,5))).astype(np.uint8)
mov_mask = zoom(mov_mask, np.array(mov.shape) / mov_skip.shape, order=0)

# save output
nrrd.write(out_p+'/mov_mask.nrrd', mov_mask.transpose(2,1,0), compression_level=2)

# load precomputed mask
mov_mask = nrrd.read(out_p+'/mov_mask.nrrd')[0].transpose(2,1,0)

## Alignment
---

### affine

In [2]:
# alignment functions
from bigstream.align import alignment_pipeline
from bigstream.transform import apply_transform

# get global alignment channels
fix = fix_zarr['/c3/s3'][...]
mov = mov_zarr['/c3/s3'][...]

# define alignment steps
affine_kwargs = {
    'initial_condition':'CENTER',
    'alignment_spacing':4.0, # increase this to throw out data
    'shrink_factors':(2,),
    'smooth_sigmas':(2.,),
    'optimizer_args':{
        'learningRate':0.25,
        'minStep':0.,
        'numberOfIterations':1200,
    },
}

steps = [('affine', affine_kwargs,)]

# align
affine = alignment_pipeline(
    fix, mov, fix_spacing_s3, mov_spacing_s3,
    steps,
    # fix_mask=fix_mask,
    # mov_mask=mov_mask,
)

# apply affine only
affine_aligned = apply_transform(
    fix, mov,
    fix_spacing_s3, mov_spacing_s3,
    transform_list=[affine,],
)

# write results
np.savetxt(out_p+'/affine.mat', affine)
nrrd.write(out_p+'/affine.nrrd', affine_aligned.transpose(2,1,0), compression_level=2)

# load precomputed results
affine = np.loadtxt(out_p+'/affine.mat')

IBM Spectrum LSF 10.1.0.0 build 601088, Apr 15 2022
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64


Wed May 29 08:25:17 2024 Run affine {'initial_condition': 'CENTER', 'alignment_spacing': 4.0, 'shrink_factors': (2,), 'smooth_sigmas': (2.0,), 'optimizer_args': {'learningRate': 0.25, 'minStep': 0.0, 'numberOfIterations': 1200}}
LEVEL:  0  ITERATION:  0  METRIC:  -0.031640618539351205
LEVEL:  0  ITERATION:  1  METRIC:  -0.03201256079230089
LEVEL:  0  ITERATION:  2  METRIC:  -0.03240122384818903
LEVEL:  0  ITERATION:  3  METRIC:  -0.03279418425150044
LEVEL:  0  ITERATION:  4  METRIC:  -0.03321812864486358
LEVEL:  0  ITERATION:  5  METRIC:  -0.033678935262144896
LEVEL:  0  ITERATION:  6  METRIC:  -0.034145858519718936
LEVEL:  0  ITERATION:  7  METRIC:  -0.03466401268661937
LEVEL:  0  ITERATION:  8  METRIC:  -0.03523438725416853
LEVEL:  0  ITERATION:  9  METRIC:  -0.03581938169957404
LEVEL:  0  ITERATION:  10  METRIC:  -0.03639288319154232
LEVEL:  0  ITERATION:  11  METRIC:  -0.03696623117577791
LEVEL:  0  ITERATION:  12  METRIC:  -0.0375401775749682
LEVEL:  0  ITERATION:  13  METRIC:  -0

In [2]:
# load precomputed results
affine = np.loadtxt(out_p+'/affine.mat')

### deform

In [3]:
%%time
from bigstream.piecewise_align import distributed_piecewise_alignment_pipeline

# FASTER

# get full resolution data
fix = fix_zarr['/c3/s2']
mov = mov_zarr['/c3/s2']

# define alignment steps
affine_kwargs = {
    'smooth_sigmas':(0.25,),
    'optimizer_args':{
        'learningRate':0.25,
        'minStep':0.,
        'numberOfIterations':150,
    },
    "alignment_spacing": 2.0,
}

deform_kwargs = {
    'smooth_sigmas':(0.25,),
    'control_point_spacing':50.0,
    'control_point_levels':(1,),
    'optimizer_args':{
        'learningRate':2.5,
        'minStep':0.,
        'numberOfIterations':25,
    },
    "alignment_spacing": 2.0,
}

cluster_kwargs={
    'walltime':'11:59',
    'project':'scicompsoft',
    'ncpus':1,
    'threads':1,
    'min_workers':10,
    'max_workers':200,
    'death_timeout':1200,
    'config':{
        'distributed.worker.memory.target':0.9,
        'distributed.worker.memory.spill':0.9,
        'distributed.worker.memory.pause':0.9,
        'distributed.comm.retry.count':10,
        'distributed.comm.timeouts.connect':'600s',
        'distributed.scheduler.unknown-task-duration':'60m',
    },
}

steps = [ ('affine', affine_kwargs,), ('deform', deform_kwargs,), ]

# deform
deform = distributed_piecewise_alignment_pipeline(
    fix, mov, fix_spacing_s2, mov_spacing_s2,
    steps=steps,
    blocksize=[128, 128, 128],
    # fix_mask=fix_mask,
    # mov_mask=mov_mask,
    static_transform_list=[affine,],
    write_path=out_p+'/deform.zarr',
    cluster_kwargs=cluster_kwargs,
)

# read already saved result
deform = zarr.open(out_p+'/deform.zarr', mode='r')

IBM Spectrum LSF 10.1.0.0 build 601088, Apr 15 2022
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64


Cluster dashboard link:  http://10.40.4.195:8787/status
Cluster adapting between 10 and 200 workers with 1 cores per worker
*** This cluster has an upper bound cost of 14.0 dollars per hour ***


2024-05-29 15:31:42,967 - distributed.deploy.adaptive_core - INFO - Adaptive stop


CPU times: user 6min 58s, sys: 25.3 s, total: 7min 23s
Wall time: 1h 1min 40s


### invert

In [2]:
from bigstream.piecewise_transform import distributed_invert_displacement_vector_field

# # invert affine
# affine_inv = np.linalg.inv(affine)
# np.savetxt(out_p+'/affine_inverse.mat', affine_inv)

# # invert deform
# deform_inv = distributed_invert_displacement_vector_field(
#     deform,
#     fix_spacing_s1,
#     blocksize=(256,)*3,
#     write_path=out_p+'/deform_inverse.zarr',
#     cluster_kwargs={
#         'project':'ahrens',
#         'ncpus':1,
#         'threads':1,
#         'min_workers':20,
#         'max_workers':200,
#     },
# )

# load precomputed results
affine_inv = np.loadtxt(out_p+'/affine_inverse.mat')
deform_inv = zarr.open(out_p+'/deform_inverse.zarr', mode='r')

IBM Spectrum LSF 10.1.0.0 build 601088, Apr 15 2022
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64


## Resampling
---

### move image data

In [4]:
from bigstream.piecewise_transform import distributed_apply_transform

# define cluster settings and parameters
cluster_kwargs={
    'project':'scicompsoft',
    'ncpus':1,
    'threads':1,
    'min_workers':20,
    'max_workers':100,
}
blocksize = (64,)*3
channels = ['c3', 'c2', 'c1', 'c0']

# s3 channels
fix = fix_zarr['/c3/s3']
for channel in channels:
    mov = mov_zarr[f'/{channel}/s3']
    aligned = distributed_apply_transform(
        fix, mov, fix_spacing_s3, mov_spacing_s3,
        transform_list=[affine, deform,],
        transform_spacing=fix_spacing*[2,4,4],
        blocksize=blocksize,
        cluster_kwargs=cluster_kwargs
    )
    nrrd.write(out_p+f'/r2{channel}s3_aligned_to_r1.nrrd', aligned.transpose(2,1,0), compression_level=2)


Cluster dashboard link:  http://10.40.4.195:8787/status
Cluster adapting between 20 and 100 workers with 1 cores per worker
*** This cluster has an upper bound cost of 7.0 dollars per hour ***


2024-05-29 15:35:39,781 - distributed.deploy.adaptive_core - INFO - Adaptive stop


Cluster dashboard link:  http://10.40.4.195:8787/status
Cluster adapting between 20 and 100 workers with 1 cores per worker
*** This cluster has an upper bound cost of 7.0 dollars per hour ***


2024-05-29 15:37:10,412 - distributed.deploy.adaptive_core - INFO - Adaptive stop


Cluster dashboard link:  http://10.40.4.195:8787/status
Cluster adapting between 20 and 100 workers with 1 cores per worker
*** This cluster has an upper bound cost of 7.0 dollars per hour ***


2024-05-29 15:38:35,561 - distributed.deploy.adaptive_core - INFO - Adaptive stop


Cluster dashboard link:  http://10.40.4.195:8787/status
Cluster adapting between 20 and 100 workers with 1 cores per worker
*** This cluster has an upper bound cost of 7.0 dollars per hour ***


2024-05-29 15:40:01,659 - distributed.deploy.adaptive_core - INFO - Adaptive stop


### move spots

In [3]:
# ### EML: This has not been done yet
from bigstream.piecewise_transform import distributed_apply_transform_to_coordinates

# define cluster settings and partition
cluster_kwargs = {
    'project':'ahrens',
    'ncpus':1,
    'threads':1,
    'min_workers':10,
    'max_workers':10,
}
partition_size = 50.
# channels = ['c0', 'c1', 'c2',]
channels = ['c1', 'c2',]

# warp all spots
for channel in channels:
    spots = np.loadtxt(p+ f'/R1/spot_detection/{channel}_spots.txt')
    spots[:, :3] = distributed_apply_transform_to_coordinates(
        spots[:, :3], [deform_inv, affine_inv],
        partition_size=partition_size,
        transform_spacing=fix_spacing_s1,
        cluster_kwargs=cluster_kwargs
    )
    np.savetxt(out_p+f'/r2{channel}_spots_warped_to_r1.txt', spots)

Cluster dashboard link:  http://10.36.60.20:8787/status
Cluster adapting between 10 and 10 workers with 1 cores per worker
*** This cluster has an upper bound cost of 0.7 dollars per hour ***


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2024-05-08 22:19:51,990 - distributed.deploy.adaptive_core - INFO - Adaptive stop


Cluster dashboard link:  http://10.36.60.20:8787/status
Cluster adapting between 10 and 10 workers with 1 cores per worker
*** This cluster has an upper bound cost of 0.7 dollars per hour ***


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2024-05-08 22:43:15,651 - distributed.deploy.adaptive_core - INFO - Adaptive stop
