In [None]:
import re
import os
from tifffile import imread
import numpy as np
from pathlib import Path
from spatialdata import SpatialData 
from spatialdata.models import Image2DModel, Labels2DModel

'4'

#Renaming Core Image Files

In [None]:
ROOT_DIR = Path(r'R:\Wayne\BLCA')
CORE_STORAGE_NAME = "Core_Storage"
SEGMENTATION_STORAGE_NAME = "Segmentation Storage"
VALID_IMAGE_EXT = ".tif"
VALID_MARKERS = ["Cy", "DAPI", "FITC"]

In [None]:
#helper function that creates list of files that need name change
def find_valid_files(core_path: Path, verbose = False) -> list[Path]:
    valid_files = [] 
    for file in core_path.iterdir():
        if file.suffix == VALID_IMAGE_EXT:
            #adds file if it has at least one marker
            for marker in VALID_MARKERS:
                if marker in file.name:
                    valid_files.append(file)
                    break
    if verbose:
        print(f"{len(valid_files)} valid files found in {core_path.name}")
    return sorted(valid_files)

In [None]:
#helper function to check whether a folder is a valid BCLA analysis folder
#based on presence of a core storage folder
def valid_analysis_folder(folder_path: Path, verbose = False) -> bool:
    core_storage_path = folder_path / CORE_STORAGE_NAME
    is_valid = folder_path.is_dir() and core_storage_path.is_dir()
    if verbose:
        if is_valid:
            print(f"Analysis folder found: {folder_path.name}")
        else:
            print(f"Skipped: {folder_path.name}")

    return is_valid

In [None]:
#helper function to check whether a core folder is valid 
#based on presence of segmentation file
def valid_core_folder(folder_path : Path, verbose = False) -> bool:
    segmentation_path = folder_path / SEGMENTATION_STORAGE_NAME
    is_valid = folder_path.is_dir() and segmentation_path.is_dir()
    if verbose:
        if is_valid:
            print(f"Core folder found: {folder_path.name}")
        else:
            print(f"Skipped: {folder_path.name}")
    return is_valid

In [None]:
#renaming images function 
def rename_BCLA_images(root_dir = ROOT_DIR, dry_run = True, verbose = True ) -> list[Path]:
    renamed_images = []

    #dictionary to store old path -> new path
    rename_map = {}

    #loop traversing through each analysis folder inside BCLA folder
    for item in sorted(root_dir.iterdir()):
        if not valid_analysis_folder(item, verbose = verbose):
            continue

        core_storage_path = item / CORE_STORAGE_NAME

        #loop through each core in Core_Storage
        for core_path in sorted(core_storage_path.iterdir()):

            if not valid_core_folder(core_path, verbose = verbose):
                continue
            
            #find & rename the images in core folders
            for image_path in find_valid_files(core_path, verbose = verbose):
                 #find round number and antibody name
                match = re.search(r'_(\d+)\.0\.4_.*?_(\w+)-', image_path)
                if match:
                    #make round number 2 digits
                    round_num = f"{int(match.group(1)):02d}"
                    antibody = match.group(2)

                    #establish the new image name
                    new_name = f"{round_num}_{antibody}.tif"
                    new_path = image_path.parent / new_name

                    if image_path.name != new_name:
                        if dry_run:
                            print(f"Would rename:\n {image_path.name} -> {new_name}")
                        else:
                            image_path.rename(new_path)
                            renamed_images.append(new_path)
                            rename_map[image_path] = new_path

    #write the rename map dict into a txt file 
    if not dry_run and rename_map:
        rename_map_path = ROOT_DIR / "renamed_map.txt"   
        with open(rename_map_path, "w")as f:
            for old_path, new_path, in rename_map.items():
                f.write(f"{old_path} -> {new_path}\n") 
        if verbose:
            print(f"Rename map saved to {rename_map_path}")
            
    return renamed_images



In [None]:
image_paths = rename_BCLA_images()

#Uploading Image + Segmentation into SpatialData

In [None]:
def load_spatial_data(core_path, output_path, chunk_size = (1, 4096, 4096), scale_factors = [2,4], dry_run = True):

    #path to the segmentation folder
    seg_folder = core_path / SEGMENTATION_STORAGE_NAME
    #creates a list of all the tif segmentation files, should only contain 1
    seg_files = list(seg_folder.glob("*.tif"))

    #skips the core if no seg file exists
    if not seg_files:
        return None
    
    #seg_files[0] bc the seg file should be the first and only file
    #numpy array for seg files
    seg_array = imread(seg_files[0])

    #wrap the segmentation array into Label2DModel or xarray format
    label_model = Labels2DModel.parse(seg_array, dims = ('c', 'y', 'x'), chunks = chunk_size, scale_factors = scale_factors)

    #image files list
    image_paths = find_valid_files(core_path)

    #skips the core if no images found
    if len(image_paths) == 0:
        return None
    
    #image dictionary
    #key = image name, value = image model
    images_dict = {}
    for img_path in image_paths:
        #read into memory as numpy array
        image_array = imread(img_path)
        model_name = img_path.stem

        #wrap the image array into Image2DModel or xarray format
        image_model = Image2DModel.parse(image_array, dims = ('c', 'y', 'x'), chunks = chunk_size, scale_factors = scale_factors)
        images_dict[model_name] = image_model

    #after parsing, create actual spatial data object
    sdata = SpatialData(images = images_dict, labels = {"segmentation": label_model})
    
    #dry run, does not actually upload
    if dry_run:
        print(f"Would upload {output_path} into SpatialData")
        return None
    else:
        sdata.write(str(output_path), overwrite = True)
        
    return sdata




In [None]:
for analysis_folder in sorted(ROOT_DIR.iterdir()):
      
    #check is the folder is valid
    if not valid_analysis_folder(analysis_folder):
        continue

    #restablishes where the core storage folders are
    core_storage_path = analysis_folder / CORE_STORAGE_NAME

    for core_path in sorted(core_storage_path.iterdir()):
        if not valid_core_folder(core_path):
             continue

        #creating output path file name (folder name + .zarr)
        core_name = core_path.name
        output_path = core_path / f"{core_name}.zarr"

        try:
            sdata = load_spatial_data(core_path, output_path, dry_run = True)

            #prints if sdata has a value and skipped if it doesnt
            #if sdata is empty, likely means either image or seg files are missing
            if sdata is not None:
                print(f"Successfully saved SpatialData for {core_name} -> {output_path}")
            else:
                print(f"Skipped {core_name} -> something missing")
        except Exception as e:
            print(f"Error processing core: {core_name} : {e}")
