# Convert Multi-Camera TIFF Stacks to Volumetric OME-Zarr

This notebook converts IsoView TIFF stacks (converted from KLB) into volumetric OME-Zarr files.

## Structure
- Each timepoint directory (TM000000, TM000001, ...) contains:
  - 4 cameras × TIFF stacks (79 z-planes each)
  - XML metadata files (one per camera)
  - MAT files (minIntensity, configuration)

## Output
- One OME-Zarr volume per camera per timepoint
- Conforms to OME-NGFF v0.5 specification
- Metadata extracted from XML files

In [25]:
# Imports
from pathlib import Path
import numpy as np
import time
from datetime import datetime
import pandas as pd
from tqdm import tqdm
import xml.etree.ElementTree as ET
from typing import Dict, List, Tuple

In [26]:
import mbo_utilities as mbo
from mbo_utilities._writers import _write_zarr

## Configuration

In [27]:
# Paths
SOURCE_ROOT = Path(r"\\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\tiff")
OUTPUT_ROOT = Path(r"\\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\zarr")

# Processing options
COMPRESSION_LEVEL = 1  # GZip compression level (1-9, lower = faster)
USE_SHARDING = True     # Use Zarr v3 sharding for better performance
OVERWRITE = True        # Overwrite existing zarr files

# Pattern matching
CAMERA_PATTERN = "SPM00_TM*_CM*.tif"  # Match camera TIFF files
XML_PATTERN = "SPM00_TM*_CM*.xml"     # Match XML metadata files

# Create output directory
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)

print(f"Source: {SOURCE_ROOT}")
print(f"Output: {OUTPUT_ROOT}")
print(f"Source exists: {SOURCE_ROOT.exists()}")
print(f"Compression level: {COMPRESSION_LEVEL}")
print(f"Sharding: {USE_SHARDING}")

Source: \\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\tiff
Output: \\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\zarr
Source exists: True
Compression level: 1
Sharding: True


## Utility Functions

In [28]:
def parse_isoview_xml(xml_path: Path) -> Dict:
    """
    Parse IsoView XML metadata file from push_config format.

    Returns dict with keys:
    - z_step: z-step size in micrometers
    - dimensions: image dimensions
    - exposure_time: exposure time in ms
    - wavelength: illumination wavelength
    - camera_type: camera model
    - timestamp: acquisition timestamp
    """
    if not xml_path.exists():
        print(f"Warning: XML file not found: {xml_path}")
        return {}

    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()

        metadata = {}

        # Parse all <info> elements
        for info in root.findall('.//info'):
            key = list(info.attrib.keys())[0] if info.attrib else None
            value = info.get(key) if key else None

            if key and value:
                # Store specific fields we care about
                if key == 'z_step':
                    metadata['z_step'] = float(value)
                elif key == 'dimensions':
                    metadata['dimensions'] = value
                elif key == 'exposure_time':
                    metadata['exposure_time'] = float(value)
                elif key == 'wavelength':
                    metadata['wavelength'] = int(value)
                elif key == 'camera_type':
                    metadata['camera_type'] = value
                elif key == 'timestamp':
                    metadata['timestamp'] = value
                elif key == 'time_point':
                    metadata['time_point'] = int(value)
                elif key == 'planes':
                    metadata['planes'] = value
                elif key == 'detection_objective':
                    metadata['detection_objective'] = value

        # Format for mbo_utilities
        # Default pixel sizes (can be refined if known)
        metadata['pixel_resolution'] = (0.65, 0.65)  # Approximate for 20x objective

        # Use z_step if available
        if 'z_step' in metadata:
            metadata['dz'] = metadata['z_step']
        else:
            metadata['dz'] = 1.0  # Default

        # Frame rate (IsoView typically ~1-10 fps depending on config)
        metadata['frame_rate'] = 1.0  # Default, update if known

        return metadata

    except Exception as e:
        print(f"Error parsing XML {xml_path}: {e}")
        return {}


def get_camera_files(timepoint_dir: Path) -> List[Tuple[Path, Path]]:
    """
    Get all camera TIFF files and their corresponding XML files for a timepoint.

    IsoView file naming:
    - TIFF: SPM00_TM000000_CM00_CHN01.tif (camera 00, channel 01)
    - XML:  SPM00_TM000000_CHN01.xml (channel 01 - shared across cameras)

    Returns list of (tiff_path, xml_path) tuples.
    """
    tiff_files = sorted(timepoint_dir.glob(CAMERA_PATTERN))

    camera_pairs = []
    for tiff_path in tiff_files:
        # Extract channel from filename: SPM00_TM000000_CM00_CHN01.tif -> CHN01
        # Pattern: SPM00_TM{timepoint}_CM{camera}_CHN{channel}.tif
        parts = tiff_path.stem.split('_')

        # Find the CHN part
        channel = None
        for part in parts:
            if part.startswith('CHN'):
                channel = part
                break

        if channel:
            # Construct XML filename: SPM00_TM{timepoint}_CHN{channel}.xml
            timepoint = parts[1]  # TM000000
            xml_name = f"SPM00_{timepoint}_{channel}.xml"
            xml_path = timepoint_dir / xml_name

            camera_pairs.append((tiff_path, xml_path))
        else:
            print(f"Warning: Could not extract channel from {tiff_path.name}")
            # Fallback: try with .xml extension (will likely not exist)
            xml_path = tiff_path.with_suffix('.xml')
            camera_pairs.append((tiff_path, xml_path))

    return camera_pairs


def get_timepoint_dirs(source_root: Path) -> List[Path]:
    """
    Get all timepoint directories (TM000000, TM000001, ...).
    """
    dirs = sorted([d for d in source_root.iterdir() if d.is_dir() and d.name.startswith('TM')])
    return dirs


def get_output_path(camera_tiff_path: Path, output_root: Path) -> Path:
    """
    Generate output zarr path from input TIFF path.

    Example:
    Input:  .../TM000001/SPM00_TM000001_CM00_CHN01.tif
    Output: .../TM000001/SPM00_TM000001_CM00_CHN01.zarr
    """
    timepoint_name = camera_tiff_path.parent.name
    camera_name = camera_tiff_path.stem  # Remove .tif extension

    output_dir = output_root / timepoint_name
    output_dir.mkdir(parents=True, exist_ok=True)

    return output_dir / f"{camera_name}.zarr"


print("Utility functions loaded")

Utility functions loaded


## Scan Directory Structure

In [None]:
# Get all timepoint directories
timepoint_dirs = get_timepoint_dirs(SOURCE_ROOT)

print(f"Found {len(timepoint_dirs)} timepoint directories")
print(f"Range: {timepoint_dirs[0].name} to {timepoint_dirs[-1].name}")

# Check first timepoint structure
if timepoint_dirs:
    first_tp = timepoint_dirs[0]
    camera_files = get_camera_files(first_tp)
    print(f"\nFirst timepoint has {len(camera_files)} camera files")

    # Check for missing XML files
    missing_xml = [xml for _, xml in camera_files if not xml.exists()]
    if missing_xml:
        print(f"WARNING: {len(missing_xml)} XML files not found")
        for xml in missing_xml[:3]:
            print(f"  Missing: {xml.name}")

Found 201 timepoint directories
Range: TM000000 to TM000200

First timepoint has 4 camera files


## Test Metadata Parsing

In [33]:
# Test metadata parsing on first file
if timepoint_dirs:
    first_tp = timepoint_dirs[0]
    camera_files = get_camera_files(first_tp)

    if camera_files:
        test_tiff, test_xml = camera_files[0]
        print(f"Testing metadata extraction:")
        print(f"  TIFF: {test_tiff.name}")
        print(f"  XML:  {test_xml.name}")

        # Check if XML exists and parse
        if test_xml.exists():
            metadata = parse_isoview_xml(test_xml)

            if metadata:
                print(f"\nExtracted metadata:")
                # Show only the most important fields
                for key in ['z_step', 'wavelength', 'exposure_time', 'camera_type', 'dimensions']:
                    if key in metadata:
                        print(f"  {key}: {metadata[key]}")
            else:
                print("\nWARNING: Failed to parse XML metadata")
        else:
            print(f"\nERROR: XML file not found")
            print(f"Expected: {test_xml.name}")

            # List available XML files
            xml_files = list(test_xml.parent.glob("*.xml"))
            if xml_files:
                print(f"\nAvailable XML files in {test_xml.parent.name}:")
                for xf in xml_files:
                    print(f"  {xf.name}")

Testing metadata extraction:
  TIFF: SPM00_TM000000_CM00_CHN01.tif
  XML:  SPM00_TM000000_CHN01.xml

Extracted metadata:
  z_step: 5.13
  wavelength: 488
  exposure_time: 4.8
  camera_type: C11440-22C,C11440-22C
  dimensions: 2048x752x79,2048x752x79


## Test Single Conversion

Convert one camera file to verify the process works

In [34]:
import tifffile

if timepoint_dirs:
    # Test on first camera of first timepoint
    test_tp = timepoint_dirs[0]
    camera_files = get_camera_files(test_tp)

    if camera_files:
        test_tiff, test_xml = camera_files[0]
        test_output = get_output_path(test_tiff, OUTPUT_ROOT)

        print(f"Test conversion: {test_tiff.name}")

        # Read TIFF stack using tifffile directly
        stack = tifffile.imread(test_tiff)
        print(f"  Input shape: {stack.shape}, dtype: {stack.dtype}")

        # Parse metadata
        metadata = parse_isoview_xml(test_xml)

        # Stack should be (Z, Y, X) for a single volume
        if stack.ndim == 2:
            data = stack[np.newaxis, ...]
        elif stack.ndim == 3:
            data = stack
        else:
            print(f"  WARNING: Unexpected dimensions: {stack.ndim}")
            data = stack

        # Ensure all required metadata fields
        if 'num_frames' not in metadata:
            metadata['num_frames'] = data.shape[0]
        if 'dz' not in metadata:
            metadata['dz'] = 5.13
        if 'pixel_resolution' not in metadata:
            metadata['pixel_resolution'] = (0.65, 0.65)
        if 'frame_rate' not in metadata:
            metadata['frame_rate'] = 1.0

        # Clear any cached zarr writers
        if hasattr(_write_zarr, '_arrays'):
            _write_zarr._arrays.clear()
            _write_zarr._offsets.clear()
            _write_zarr._groups.clear()

        # Convert to OME-Zarr
        start_time = time.time()

        _write_zarr(
            test_output,
            data,
            metadata=metadata,
            ome=True,
            sharded=USE_SHARDING,
            level=COMPRESSION_LEVEL,
            overwrite=OVERWRITE
        )

        elapsed = time.time() - start_time

        # Calculate statistics
        input_size_mb = test_tiff.stat().st_size / (1024**2)
        output_size_mb = sum(f.stat().st_size for f in test_output.rglob('*') if f.is_file()) / (1024**2)
        compression_ratio = input_size_mb / output_size_mb if output_size_mb > 0 else 0
        throughput_mb_s = output_size_mb / elapsed if elapsed > 0 else 0

        print(f"  Conversion: {elapsed:.1f}s @ {throughput_mb_s:.1f} MB/s")
        print(f"  Size: {input_size_mb:.1f} MB → {output_size_mb:.1f} MB ({compression_ratio:.2f}x compression)")

        # Verify readback
        try:
            verified = mbo.imread(test_output)
            print(f"  Verified: {verified.shape}")
        except Exception as e:
            # Try with zarr directly
            try:
                import zarr
                z = zarr.open(test_output, mode='r')
                print(f"  Verified: {z.shape}")
            except Exception as e2:
                print(f"  WARNING: Verification failed: {e2}")

Test conversion: SPM00_TM000000_CM00_CHN01.tif
  Input shape: (79, 752, 2048), dtype: uint16
  Conversion: 2.2s @ 46.6 MB/s
  Size: 232.1 MB → 101.9 MB (2.28x compression)
  Verified: (79, 1, 752, 2048)


## Batch Conversion

Convert all camera files across all timepoints

In [35]:
# Statistics tracking
conversion_stats = []
failed_conversions = []

total_start_time = time.time()

print(f"Converting {len(timepoint_dirs)} timepoints...")
print("=" * 80)

for tp_idx, tp_dir in enumerate(timepoint_dirs, 1):
    # Get all camera files for this timepoint
    camera_files = get_camera_files(tp_dir)

    if not camera_files:
        print(f"[{tp_idx}/{len(timepoint_dirs)}] {tp_dir.name}: No camera files found")
        continue

    tp_start_time = time.time()
    tp_successes = 0
    tp_failures = 0

    for cam_idx, (tiff_path, xml_path) in enumerate(camera_files, 1):
        output_path = get_output_path(tiff_path, OUTPUT_ROOT)

        # Skip if already exists and not overwriting
        if output_path.exists() and not OVERWRITE:
            continue

        try:
            file_start_time = time.time()

            # Read TIFF using tifffile directly
            stack = tifffile.imread(tiff_path)

            # Ensure correct shape (Z, Y, X)
            if stack.ndim == 2:
                data = stack[np.newaxis, ...]
            elif stack.ndim == 3:
                data = stack
            else:
                data = stack

            # Parse metadata
            metadata = parse_isoview_xml(xml_path)

            # Ensure all required metadata fields
            if 'num_frames' not in metadata:
                metadata['num_frames'] = data.shape[0]
            if 'dz' not in metadata:
                metadata['dz'] = 5.13
            if 'pixel_resolution' not in metadata:
                metadata['pixel_resolution'] = (0.65, 0.65)
            if 'frame_rate' not in metadata:
                metadata['frame_rate'] = 1.0

            # Clear zarr cache
            if hasattr(_write_zarr, '_arrays'):
                _write_zarr._arrays.clear()
                _write_zarr._offsets.clear()
                _write_zarr._groups.clear()

            # Write OME-Zarr
            _write_zarr(
                output_path,
                data,
                metadata=metadata,
                ome=True,
                sharded=USE_SHARDING,
                level=COMPRESSION_LEVEL,
                overwrite=OVERWRITE
            )

            file_elapsed = time.time() - file_start_time

            # Calculate sizes
            input_size_mb = tiff_path.stat().st_size / (1024**2)
            output_size_mb = sum(f.stat().st_size for f in output_path.rglob('*') if f.is_file()) / (1024**2)
            compression_ratio = input_size_mb / output_size_mb if output_size_mb > 0 else 0
            throughput = output_size_mb / file_elapsed if file_elapsed > 0 else 0

            # Record stats
            conversion_stats.append({
                'timepoint': tp_dir.name,
                'camera_file': tiff_path.name,
                'input_size_mb': input_size_mb,
                'output_size_mb': output_size_mb,
                'compression_ratio': compression_ratio,
                'time_sec': file_elapsed,
                'throughput_mb_per_sec': throughput,
                'num_z_planes': data.shape[0],
                'success': True
            })

            tp_successes += 1

        except Exception as e:
            print(f"[{tp_idx}/{len(timepoint_dirs)}] {tp_dir.name}/{tiff_path.name}: ERROR - {e}")
            failed_conversions.append({
                'timepoint': tp_dir.name,
                'camera_file': tiff_path.name,
                'error': str(e)
            })
            tp_failures += 1
            continue

    tp_elapsed = time.time() - tp_start_time

    # Only print if there were issues or every 10 timepoints
    if tp_failures > 0 or tp_idx % 10 == 0 or tp_idx == len(timepoint_dirs):
        status = f"{tp_successes} ok" + (f", {tp_failures} failed" if tp_failures > 0 else "")
        print(f"[{tp_idx}/{len(timepoint_dirs)}] {tp_dir.name}: {status} ({tp_elapsed:.1f}s)")

total_elapsed = time.time() - total_start_time

print("\n" + "=" * 80)
print(f"Completed in {total_elapsed:.1f}s ({total_elapsed/60:.1f} min)")
print(f"  Successful: {len(conversion_stats)}")
if failed_conversions:
    print(f"  Failed: {len(failed_conversions)}")

Converting 201 timepoints...
[10/201] TM000009: 4 ok (13.0s)
[20/201] TM000019: 4 ok (13.9s)
[30/201] TM000029: 4 ok (15.1s)
[40/201] TM000039: 4 ok (15.2s)
[50/201] TM000049: 4 ok (15.8s)
[60/201] TM000059: 4 ok (14.6s)
[70/201] TM000069: 4 ok (13.5s)
[80/201] TM000079: 4 ok (14.1s)
[90/201] TM000089: 4 ok (13.3s)
[100/201] TM000099: 4 ok (13.5s)
[110/201] TM000109: 4 ok (14.9s)
[120/201] TM000119: 4 ok (15.4s)
[130/201] TM000129: 4 ok (15.2s)
[140/201] TM000139: 4 ok (12.8s)
[150/201] TM000149: 4 ok (12.3s)
[160/201] TM000159: 4 ok (14.5s)
[170/201] TM000169: 4 ok (14.7s)
[180/201] TM000179: 4 ok (14.1s)
[190/201] TM000189: 4 ok (15.2s)
[200/201] TM000199: 4 ok (14.7s)
[201/201] TM000200: 4 ok (14.3s)

Completed in 2824.0s (47.1 min)
  Successful: 804


## Conversion Statistics

In [36]:
if conversion_stats:
    df_stats = pd.DataFrame(conversion_stats)

    print("\nCONVERSION STATISTICS")
    print("=" * 80)

    # Overall statistics
    total_input_mb = df_stats['input_size_mb'].sum()
    total_output_mb = df_stats['output_size_mb'].sum()
    avg_compression = df_stats['compression_ratio'].mean()
    avg_throughput = df_stats['throughput_mb_per_sec'].mean()
    total_conversion_time = df_stats['time_sec'].sum()

    print(f"Total data processed: {total_input_mb/1024:.2f} GB → {total_output_mb/1024:.2f} GB")
    print(f"Average compression: {avg_compression:.2f}x")
    print(f"Average throughput: {avg_throughput:.1f} MB/s")
    print(f"Total conversion time: {total_conversion_time/60:.1f} min")

    # Save statistics
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    stats_file = OUTPUT_ROOT / f"conversion_stats_{timestamp}.csv"
    df_stats.to_csv(stats_file, index=False)
    print(f"\nStatistics saved: {stats_file.name}")

    # Save failed conversions if any
    if failed_conversions:
        df_failed = pd.DataFrame(failed_conversions)
        failed_file = OUTPUT_ROOT / f"failed_conversions_{timestamp}.csv"
        df_failed.to_csv(failed_file, index=False)
        print(f"Failed conversions logged: {failed_file.name}")
else:
    print("No conversions completed")


CONVERSION STATISTICS
Total data processed: 182.22 GB → 75.52 GB
Average compression: 2.42x
Average throughput: 27.8 MB/s
Total conversion time: 46.6 min

Statistics saved: conversion_stats_20251030_152208.csv


## Verification

Verify a few random converted files can be read back correctly

In [None]:
if conversion_stats and len(conversion_stats) > 0:
    # Verify a few random samples
    num_to_test = min(3, len(conversion_stats))
    test_samples = df_stats.sample(n=num_to_test)

    print("\nVERIFYING RANDOM SAMPLES")
    print("=" * 80)

    for idx, row in test_samples.iterrows():
        timepoint = row['timepoint']
        camera_file = row['camera_file']

        # Reconstruct zarr path
        zarr_name = Path(camera_file).stem + ".zarr"
        zarr_path = OUTPUT_ROOT / timepoint / zarr_name

        try:
            # Try mbo.imread first
            verified = mbo.imread(zarr_path)
            print(f"{zarr_path.parent.name}/{zarr_path.name}: OK ({verified.shape})")
        except Exception as e:
            # Fallback to zarr
            try:
                import zarr
                z = zarr.open(zarr_path, mode='r')
                print(f"{zarr_path.parent.name}/{zarr_path.name}: OK ({z.shape})")
            except Exception as e2:
                print(f"{zarr_path.parent.name}/{zarr_path.name}: ERROR - {e2}")

## Summary

Final summary and next steps

In [50]:
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)

if conversion_stats:
    total_input_gb = df_stats['input_size_mb'].sum() / 1024
    total_output_gb = df_stats['output_size_mb'].sum() / 1024
    space_saved_gb = total_input_gb - total_output_gb
    space_saved_pct = (space_saved_gb / total_input_gb * 100) if total_input_gb > 0 else 0

    print(f"Processed {len(conversion_stats)} volumes in {total_elapsed/60:.1f} minutes")
    print(f"Data: {total_input_gb:.1f} GB → {total_output_gb:.1f} GB")
    print(f"Space saved: {space_saved_gb:.1f} GB ({space_saved_pct:.1f}%)")
    print(f"Average speed: {avg_throughput:.1f} MB/s")

    if failed_conversions:
        print(f"\nWARNING: {len(failed_conversions)} conversions failed (see logs)")

    print(f"\nOutput: {OUTPUT_ROOT}")
else:
    print("No files were converted")

print("=" * 80)


SUMMARY
Processed 804 volumes in 47.1 minutes
Data: 182.2 GB → 75.5 GB
Space saved: 106.7 GB (58.6%)
Average speed: 27.8 MB/s

Output: \\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\zarr


In [54]:
def consolidate_camera_volume(camera_id: str, zarr_paths: List[Path],
                              output_path: Path, axis_order: str = "TZYX",
                              compression_level: int = 1) -> Dict:
    """
    Consolidate multiple single-timepoint zarr files into one multi-timepoint volume.

    Args:
        camera_id: Camera identifier (e.g., 'SPM00_CM00_CHN01')
        zarr_paths: List of zarr file paths sorted by timepoint
        output_path: Output path for consolidated zarr
        axis_order: Either "TZYX" (time, z, y, x) or "ZTYX" (z, time, y, x)
        compression_level: GZip compression level (1-9)

    Returns:
        Dict with statistics
    """
    import dask.array as da

    print(f"  Loading {len(zarr_paths)} timepoints...")

    # Load all timepoints as dask arrays
    volumes = []
    for zpath in zarr_paths:
        try:
            z = zarr.open(zpath, mode='r')
            # Get the data array (usually at '0' key for zarr v3)
            if hasattr(z, 'keys'):
                # It's a group, get the first array
                arr_key = list(z.keys())[0]
                arr = da.from_zarr(z[arr_key])
            else:
                # It's already an array
                arr = da.from_zarr(z)

            # Remove singleton channel dimension if present
            # Shape might be (Z, C, Y, X) or (Z, Y, X)
            if arr.ndim == 4 and arr.shape[1] == 1:
                arr = arr[:, 0, :, :]  # Now (Z, Y, X)

            volumes.append(arr)
        except Exception as e:
            print(f"    ERROR loading {zpath.name}: {e}")
            return None

    if not volumes:
        print(f"    ERROR: No volumes loaded")
        return None

    # Stack along time dimension
    # volumes is list of (Z, Y, X) arrays
    stacked = da.stack(volumes, axis=0)  # Shape: (T, Z, Y, X)

    # Reorder axes if needed
    if axis_order == "ZTYX":
        # Transpose from (T, Z, Y, X) to (Z, T, Y, X)
        stacked = da.transpose(stacked, (1, 0, 2, 3))
    elif axis_order != "TZYX":
        raise ValueError(f"Invalid axis_order: {axis_order}. Must be 'TZYX' or 'ZTYX'")

    print(f"  Consolidated shape: {stacked.shape} ({axis_order})")
    print(f"  Writing to {output_path.name}...")

    start_time = time.time()

    # Remove existing if overwriting
    if output_path.exists() and CONSOLIDATE_OVERWRITE:
        import shutil
        shutil.rmtree(output_path)

    # Compute chunk shape - one z-plane per chunk
    if axis_order == "TZYX":
        chunk_shape = (1, 1, stacked.shape[2], stacked.shape[3])
    else:  # ZTYX
        chunk_shape = (1, 1, stacked.shape[2], stacked.shape[3])

    # Write using dask to zarr directly (works with both v2 and v3)
    stacked.to_zarr(
        str(output_path),
        chunks=chunk_shape,
        overwrite=CONSOLIDATE_OVERWRITE,
        compute=True
    )

    elapsed = time.time() - start_time

    # Calculate size
    output_size_mb = sum(f.stat().st_size for f in output_path.rglob('*') if f.is_file()) / (1024**2)
    throughput = output_size_mb / elapsed if elapsed > 0 else 0

    print(f"  Complete: {elapsed:.1f}s @ {throughput:.1f} MB/s, {output_size_mb:.1f} MB")

    return {
        'camera_id': camera_id,
        'num_timepoints': len(zarr_paths),
        'shape': tuple(stacked.shape),
        'axis_order': axis_order,
        'output_size_mb': output_size_mb,
        'time_sec': elapsed,
        'throughput_mb_per_sec': throughput
    }

print("Consolidation function defined")

Consolidation function defined


In [55]:
# Consolidate all cameras
consolidation_stats = []
consolidation_start = time.time()

print(f"Consolidating {len(camera_volumes)} cameras into {AXIS_ORDER} volumes...")
print("=" * 80)

for idx, (camera_id, zarr_paths) in enumerate(camera_volumes.items(), 1):
    print(f"[{idx}/{len(camera_volumes)}] {camera_id}:")

    output_path = CONSOLIDATED_OUTPUT / f"{camera_id}.zarr"

    try:
        stats = consolidate_camera_volume(
            camera_id=camera_id,
            zarr_paths=zarr_paths,
            output_path=output_path,
            axis_order=AXIS_ORDER,
            compression_level=COMPRESSION_LEVEL
        )

        if stats:
            consolidation_stats.append(stats)

    except Exception as e:
        print(f"  ERROR: {e}")
        import traceback
        traceback.print_exc()

consolidation_elapsed = time.time() - consolidation_start

print("\n" + "=" * 80)
print(f"Consolidation complete in {consolidation_elapsed/60:.1f} minutes")
print(f"  Successful: {len(consolidation_stats)}/{len(camera_volumes)} cameras")

Consolidating 4 cameras into TZYX volumes...
[1/4] SPM00_CM00_CHN01:
  Loading 201 timepoints...
  Consolidated shape: (201, 79, 752, 2048) (TZYX)
  Writing to SPM00_CM00_CHN01.zarr...
  ERROR: zarr.api.synchronous.create() got multiple values for keyword argument 'chunks'
[2/4] SPM00_CM01_CHN01:
  Loading 201 timepoints...


Traceback (most recent call last):
  File "C:\Users\RBO\AppData\Local\Temp\ipykernel_31376\3587504223.py", line 14, in <module>
    stats = consolidate_camera_volume(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\RBO\AppData\Local\Temp\ipykernel_31376\2515324663.py", line 77, in consolidate_camera_volume
    stacked.to_zarr(
  File "c:\Users\RBO\repos\millerbrainobservatory.github.io\.venv\Lib\site-packages\dask\array\core.py", line 3021, in to_zarr
    return to_zarr(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\RBO\repos\millerbrainobservatory.github.io\.venv\Lib\site-packages\dask\array\core.py", line 3933, in to_zarr
    z = zarr.create(
        ^^^^^^^^^^^^
TypeError: zarr.api.synchronous.create() got multiple values for keyword argument 'chunks'


  Consolidated shape: (201, 79, 752, 2048) (TZYX)
  Writing to SPM00_CM01_CHN01.zarr...
  ERROR: zarr.api.synchronous.create() got multiple values for keyword argument 'chunks'
[3/4] SPM00_CM02_CHN00:
  Loading 201 timepoints...


Traceback (most recent call last):
  File "C:\Users\RBO\AppData\Local\Temp\ipykernel_31376\3587504223.py", line 14, in <module>
    stats = consolidate_camera_volume(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\RBO\AppData\Local\Temp\ipykernel_31376\2515324663.py", line 77, in consolidate_camera_volume
    stacked.to_zarr(
  File "c:\Users\RBO\repos\millerbrainobservatory.github.io\.venv\Lib\site-packages\dask\array\core.py", line 3021, in to_zarr
    return to_zarr(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\RBO\repos\millerbrainobservatory.github.io\.venv\Lib\site-packages\dask\array\core.py", line 3933, in to_zarr
    z = zarr.create(
        ^^^^^^^^^^^^
TypeError: zarr.api.synchronous.create() got multiple values for keyword argument 'chunks'


KeyboardInterrupt: 

In [46]:
# Consolidation statistics
if consolidation_stats:
    df_consol = pd.DataFrame(consolidation_stats)

    print("\nCONSOLIDATION STATISTICS")
    print("=" * 80)

    total_size_mb = df_consol['output_size_mb'].sum()
    total_time = df_consol['time_sec'].sum()
    avg_throughput = df_consol['throughput_mb_per_sec'].mean()

    print(f"Total consolidated size: {total_size_mb/1024:.2f} GB")
    print(f"Average throughput: {avg_throughput:.1f} MB/s")
    print(f"Total processing time: {total_time/60:.1f} min")

    print("\nPer-camera summary:")
    for _, row in df_consol.iterrows():
        print(f"  {row['camera_id']}: {row['shape']} ({row['axis_order']}), {row['output_size_mb']:.1f} MB")

    # Save statistics
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    stats_file = CONSOLIDATED_OUTPUT / f"consolidation_stats_{timestamp}.csv"
    df_consol.to_csv(stats_file, index=False)
    print(f"\nStatistics saved: {stats_file.name}")

    print(f"\nConsolidated volumes: {CONSOLIDATED_OUTPUT}")
else:
    print("No consolidations completed")

No consolidations completed


In [47]:
def consolidate_camera_volume(camera_id: str, zarr_paths: List[Path],
                              output_path: Path, axis_order: str = "TZYX",
                              compression_level: int = 1) -> Dict:
    """
    Consolidate multiple single-timepoint zarr files into one multi-timepoint volume.

    Args:
        camera_id: Camera identifier (e.g., 'SPM00_CM00_CHN01')
        zarr_paths: List of zarr file paths sorted by timepoint
        output_path: Output path for consolidated zarr
        axis_order: Either "TZYX" (time, z, y, x) or "ZTYX" (z, time, y, x)
        compression_level: GZip compression level (1-9)

    Returns:
        Dict with statistics
    """
    from zarr.codecs import BytesCodec, GzipCodec, ShardingCodec
    import dask.array as da

    print(f"  Loading {len(zarr_paths)} timepoints...")

    # Load all timepoints as dask arrays
    volumes = []
    for zpath in zarr_paths:
        try:
            z = zarr.open(zpath, mode='r')
            # Convert to dask array, shape is (Z, C, Y, X) or (Z, Y, X)
            arr = da.from_zarr(z)

            # Remove singleton channel dimension if present
            if arr.ndim == 4 and arr.shape[1] == 1:
                arr = arr[:, 0, :, :]  # Now (Z, Y, X)

            volumes.append(arr)
        except Exception as e:
            print(f"    ERROR loading {zpath.name}: {e}")
            return None

    if not volumes:
        print(f"    ERROR: No volumes loaded")
        return None

    # Stack along time dimension
    # volumes is list of (Z, Y, X) arrays
    stacked = da.stack(volumes, axis=0)  # Shape: (T, Z, Y, X)

    # Reorder axes if needed
    if axis_order == "ZTYX":
        # Transpose from (T, Z, Y, X) to (Z, T, Y, X)
        stacked = da.transpose(stacked, (1, 0, 2, 3))
    elif axis_order != "TZYX":
        raise ValueError(f"Invalid axis_order: {axis_order}. Must be 'TZYX' or 'ZTYX'")

    print(f"  Consolidated shape: {stacked.shape} ({axis_order})")
    print(f"  Writing to {output_path.name}...")

    start_time = time.time()

    # Write consolidated zarr
    # Use sharding for better performance
    chunk_shape = (1, 1, stacked.shape[2], stacked.shape[3])  # Single plane chunks

    if USE_SHARDING:
        shard_shape = (1, stacked.shape[1], stacked.shape[2], stacked.shape[3])  # One full volume
        codecs = [
            ShardingCodec(
                chunk_shape=chunk_shape,
                codecs=[
                    BytesCodec(),
                    GzipCodec(level=compression_level)
                ]
            )
        ]
    else:
        codecs = [
            BytesCodec(),
            GzipCodec(level=compression_level)
        ]

    # Remove existing if overwriting
    if output_path.exists() and CONSOLIDATE_OVERWRITE:
        import shutil
        shutil.rmtree(output_path)

    # Create zarr store
    store = zarr.DirectoryStore(output_path)

    # Write with zarr v3
    root = zarr.group(store=store, overwrite=CONSOLIDATE_OVERWRITE)

    # Create array
    z_arr = root.create_array(
        '0',
        shape=stacked.shape,
        chunks=chunk_shape,
        dtype=stacked.dtype,
        codecs=codecs,
        fill_value=0
    )

    # Write data
    da.to_zarr(stacked, z_arr, compute=True)

    elapsed = time.time() - start_time

    # Calculate size
    output_size_mb = sum(f.stat().st_size for f in output_path.rglob('*') if f.is_file()) / (1024**2)
    throughput = output_size_mb / elapsed if elapsed > 0 else 0

    print(f"  Complete: {elapsed:.1f}s @ {throughput:.1f} MB/s, {output_size_mb:.1f} MB")

    return {
        'camera_id': camera_id,
        'num_timepoints': len(zarr_paths),
        'shape': stacked.shape,
        'axis_order': axis_order,
        'output_size_mb': output_size_mb,
        'time_sec': elapsed,
        'throughput_mb_per_sec': throughput
    }

print("Consolidation function defined")

Consolidation function defined


In [48]:
import zarr
from collections import defaultdict

def discover_camera_volumes(zarr_root: Path) -> Dict[str, List[Path]]:
    """
    Discover all zarr files and group them by camera.

    Returns dict mapping camera name to list of zarr paths sorted by timepoint.
    Example: {'SPM00_CM00_CHN01': [path1, path2, ...]}
    """
    camera_groups = defaultdict(list)

    # Scan all timepoint directories
    for tp_dir in sorted(zarr_root.glob("TM*")):
        if not tp_dir.is_dir():
            continue

        # Find all zarr files in this timepoint
        for zarr_path in tp_dir.glob("*.zarr"):
            # Extract camera identifier from filename
            # SPM00_TM000000_CM00_CHN01.zarr -> SPM00_CM00_CHN01
            parts = zarr_path.stem.split('_')

            # Find CM and CHN parts
            camera_parts = ['SPM00']
            for part in parts:
                if part.startswith('CM') or part.startswith('CHN'):
                    camera_parts.append(part)

            camera_id = '_'.join(camera_parts)
            camera_groups[camera_id].append(zarr_path)

    # Sort each camera's timepoints
    for camera_id in camera_groups:
        camera_groups[camera_id] = sorted(camera_groups[camera_id])

    return dict(camera_groups)


# Discover all camera volumes
camera_volumes = discover_camera_volumes(OUTPUT_ROOT)

print(f"Discovered {len(camera_volumes)} unique cameras:")
for camera_id, paths in camera_volumes.items():
    print(f"  {camera_id}: {len(paths)} timepoints")

Discovered 4 unique cameras:
  SPM00_CM00_CHN01: 201 timepoints
  SPM00_CM01_CHN01: 201 timepoints
  SPM00_CM02_CHN00: 201 timepoints
  SPM00_CM03_CHN00: 201 timepoints


In [49]:
# Configuration for consolidation
CONSOLIDATED_OUTPUT = OUTPUT_ROOT.parent / "zarr_consolidated"
AXIS_ORDER = "TZYX"  # Options: "TZYX" (time first) or "ZTYX" (z first)
CONSOLIDATE_OVERWRITE = True

# Create output directory
CONSOLIDATED_OUTPUT.mkdir(parents=True, exist_ok=True)

print(f"Consolidation configuration:")
print(f"  Input: {OUTPUT_ROOT}")
print(f"  Output: {CONSOLIDATED_OUTPUT}")
print(f"  Axis order: {AXIS_ORDER}")
print(f"  Overwrite: {CONSOLIDATE_OVERWRITE}")

Consolidation configuration:
  Input: \\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\zarr
  Output: \\rbo-s1\S1_DATA\isoview\foconnell\io_corrected_test\zarr_consolidated
  Axis order: TZYX
  Overwrite: True
