Complete Medical Image Analysis Pipeline
========================================

A unified pipeline for knee CT scan analysis implementing:
1. Bone Segmentation (femur and tibia)
2. Mask Expansion (2mm and 4mm)
3. Randomized Contour Generation
4. Tibial Landmark Detection



In [None]:
import os
import json
import numpy as np
import nibabel as nib
import SimpleITK as sitk
from skimage import morphology, measure
from scipy.ndimage import binary_dilation, distance_transform_edt
import random
from datetime import datetime

# CONFIGURATION PARAMETERS


In [None]:
class Config:
    """Configuration parameters for the entire pipeline"""

    # Input/Output paths
    INPUT_CT_PATH = "../left_knee.nii.gz"
    RESULTS_DIR = "results"

    # Segmentation parameters
    BONE_THRESHOLD_HU = 300  # Hounsfield Units for bone detection
    MIN_OBJECT_SIZE = 1000   # Minimum voxels for noise removal
    MIN_HOLE_SIZE = 1000     # Minimum hole size to fill
    NUM_LARGEST_COMPONENTS = 2  # Keep femur and tibia

    # Expansion parameters
    EXPANSION_2MM = 2.0      # 2mm expansion distance
    EXPANSION_4MM = 4.0      # 4mm expansion distance
    MAX_RANDOMIZATION_MM = 2.0  # Maximum randomization distance

    # Randomization parameters
    RANDOM_SEEDS = [42, 43]  # Seeds for reproducible randomization

    # Output file names
    ORIGINAL_MASK = "original_mask.nii.gz"
    EXPANDED_2MM_MASK = "expanded_mask_2mm.nii.gz"
    EXPANDED_4MM_MASK = "expanded_mask_4mm.nii.gz"
    RANDOMIZED_MASK_1 = "randomized_mask_1_seed42.nii.gz"
    RANDOMIZED_MASK_2 = "randomized_mask_2_seed43.nii.gz"
    LANDMARKS_OUTPUT = "tibial_landmarks.json"

# UTILITY FUNCTIONS


In [None]:
def setup_directories():
    """Create necessary directories for output"""
    os.makedirs(Config.RESULTS_DIR, exist_ok=True)
    print(f"✓ Results directory created: {Config.RESULTS_DIR}")

def log_step(step_name, details=""):
    """Log pipeline steps with timestamp"""
    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"[{timestamp}] {step_name}")
    if details:
        print(f"    {details}")

# TASK 1: BONE SEGMENTATION


In [None]:
def load_nii(path):
    """Load NIfTI image and return data, affine, and header"""
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")

    img = nib.load(path)
    return img.get_fdata(), img.affine, img.header

def save_nii(data, affine, header, path):
    """Save data as NIfTI file"""
    nib.save(nib.Nifti1Image(data.astype(np.uint8), affine, header), path)
    print(f"    Saved: {path}")

def segment_bone(ct_volume, threshold=Config.BONE_THRESHOLD_HU):
    """Segment bone based on intensity thresholding"""
    # Apply threshold
    bone_mask = ct_volume > threshold

    # Remove small objects (noise)
    bone_mask = morphology.remove_small_objects(
        bone_mask.astype(bool),
        min_size=Config.MIN_OBJECT_SIZE
    )

    # Fill small holes
    bone_mask = morphology.remove_small_holes(
        bone_mask,
        area_threshold=Config.MIN_HOLE_SIZE
    )

    return bone_mask.astype(np.uint8)

def keep_largest_components(mask, num_components=Config.NUM_LARGEST_COMPONENTS):
    """Keep the N largest connected components (femur and tibia)"""
    labeled = measure.label(mask)
    props = measure.regionprops(labeled)
    props = sorted(props, key=lambda x: x.area, reverse=True)

    output_mask = np.zeros_like(mask)
    for i in range(min(num_components, len(props))):
        output_mask[labeled == props[i].label] = 1

    return output_mask

def task1_bone_segmentation(ct_volume, affine, header):
    """Complete bone segmentation pipeline"""
    log_step("TASK 1: Bone Segmentation", "Segmenting femur and tibia")

    # Segment bone tissue
    raw_bone_mask = segment_bone(ct_volume)
    print(f"    Raw bone voxels: {np.sum(raw_bone_mask)}")

    # Keep only femur and tibia
    final_bone_mask = keep_largest_components(raw_bone_mask)
    print(f"    Final bone voxels: {np.sum(final_bone_mask)}")

    # Save original mask
    output_path = os.path.join(Config.RESULTS_DIR, Config.ORIGINAL_MASK)
    save_nii(final_bone_mask, affine, header, output_path)

    return final_bone_mask


# TASK 2: MASK EXPANSION


In [None]:
def expand_mask(mask_array, spacing, expansion_mm):
    """Expand mask uniformly by specified distance in mm"""
    # Convert mm to voxels (spacing is x,y,z; array is z,y,x)
    expansion_voxels = [
        max(1, int(np.ceil(expansion_mm / spacing[2]))),  # z direction
        max(1, int(np.ceil(expansion_mm / spacing[1]))),  # y direction
        max(1, int(np.ceil(expansion_mm / spacing[0])))   # x direction
    ]

    print(f"    {expansion_mm}mm → voxels: {expansion_voxels}")

    # Create 3D structuring element
    struct_shape = tuple(2 * ev + 1 for ev in expansion_voxels)
    structuring_element = np.ones(struct_shape, dtype=np.uint8)

    # Perform binary dilation
    expanded = binary_dilation(mask_array, structure=structuring_element)
    return expanded.astype(np.uint8)

def task2_mask_expansion(original_mask, spacing, affine, header):
    """Complete mask expansion for 2mm and 4mm"""
    log_step("TASK 2: Mask Expansion", "Creating 2mm and 4mm expanded masks")

    # 2mm expansion
    expanded_2mm = expand_mask(original_mask, spacing, Config.EXPANSION_2MM)
    output_path_2mm = os.path.join(Config.RESULTS_DIR, Config.EXPANDED_2MM_MASK)
    save_nii(expanded_2mm, affine, header, output_path_2mm)
    print(f"    2mm expanded voxels: {np.sum(expanded_2mm)}")

    # 4mm expansion
    expanded_4mm = expand_mask(original_mask, spacing, Config.EXPANSION_4MM)
    output_path_4mm = os.path.join(Config.RESULTS_DIR, Config.EXPANDED_4MM_MASK)
    save_nii(expanded_4mm, affine, header, output_path_4mm)
    print(f"    4mm expanded voxels: {np.sum(expanded_4mm)}")

    return expanded_2mm, expanded_4mm

# TASK 3: RANDOMIZED CONTOUR GENERATION


In [None]:
def create_randomized_contour(original_mask, max_expanded_mask, spacing,
                            random_seed, max_expansion_mm):
    """Create randomized contour between original and expanded boundaries"""
    random.seed(random_seed)
    np.random.seed(random_seed)

    # Find expansion zone
    expansion_zone = np.logical_and(max_expanded_mask,
                                  np.logical_not(original_mask))

    # Calculate distance from original boundary
    distance_from_original = distance_transform_edt(
        np.logical_not(original_mask),
        sampling=[spacing[2], spacing[1], spacing[0]]
    )

    # Start with original mask
    randomized_mask = np.copy(original_mask)

    # Process expansion zone
    expansion_indices = np.where(expansion_zone)

    for i in range(len(expansion_indices[0])):
        z, y, x = expansion_indices[0][i], expansion_indices[1][i], expansion_indices[2][i]
        dist = distance_from_original[z, y, x]

        if dist <= max_expansion_mm:
            # Distance-based probability
            probability = 1 - (dist / max_expansion_mm) ** 1.5
            probability = max(0.0, min(1.0, probability))

            if random.random() < probability:
                randomized_mask[z, y, x] = 1

    return randomized_mask

def validate_randomized_mask(original, expanded, randomized, max_exp_mm, spacing, seed):
    """Validate randomized mask meets all constraints"""
    violations = []

    # Check containment constraints
    if not np.all((original == 1) <= (randomized == 1)):
        violations.append("Original mask not fully contained")

    if not np.all((randomized == 1) <= (expanded == 1)):
        violations.append("Randomized mask exceeds expansion boundary")

    # Check size constraints
    if np.sum(original) > np.sum(randomized):
        violations.append("Randomized mask smaller than original")

    if np.sum(randomized) > np.sum(expanded):
        violations.append("Randomized mask larger than expanded")

    # Check distance constraint
    distance_from_original = distance_transform_edt(
        np.logical_not(original),
        sampling=[spacing[2], spacing[1], spacing[0]]
    )
    randomized_only = np.logical_and(randomized, np.logical_not(original))

    if np.any(randomized_only):
        max_distance = np.max(distance_from_original[randomized_only])
        if max_distance > max_exp_mm + 0.01:
            violations.append(f"Distance constraint violated: {max_distance:.2f}mm")

    # Report results
    if violations:
        print(f"    ⚠️  Validation issues for seed {seed}:")
        for v in violations:
            print(f"       - {v}")
    else:
        print(f"    ✓ Validation passed for seed {seed}")
        print(f"      Voxels: {np.sum(original)} → {np.sum(randomized)} → {np.sum(expanded)}")

def task3_randomized_contours(original_mask, expanded_2mm, spacing, affine, header):
    """Generate randomized contours with validation"""
    log_step("TASK 3: Randomized Contour Generation",
             f"Creating {len(Config.RANDOM_SEEDS)} randomized versions")

    randomized_masks = []
    filenames = [Config.RANDOMIZED_MASK_1, Config.RANDOMIZED_MASK_2]

    for i, seed in enumerate(Config.RANDOM_SEEDS):
        print(f"    Generating randomized mask {i+1} (seed {seed})")

        # Create randomized contour
        randomized = create_randomized_contour(
            original_mask, expanded_2mm, spacing, seed, Config.MAX_RANDOMIZATION_MM
        )

        # Save mask
        output_path = os.path.join(Config.RESULTS_DIR, filenames[i])
        save_nii(randomized, affine, header, output_path)

        # Validate
        validate_randomized_mask(
            original_mask, expanded_2mm, randomized,
            Config.MAX_RANDOMIZATION_MM, spacing, seed
        )

        randomized_masks.append(randomized)

    return randomized_masks

# TASK 4: TIBIAL LANDMARK DETECTION


In [None]:
def find_tibia_from_mask(mask_array):
    """Extract tibia (lower bone) from mask"""
    # Convert to SimpleITK for connected component analysis
    mask_sitk = sitk.GetImageFromArray(mask_array)
    labeled_sitk = sitk.ConnectedComponent(mask_sitk)
    labeled_array = sitk.GetArrayFromImage(labeled_sitk)

    # Get component statistics
    stats = sitk.LabelShapeStatisticsImageFilter()
    stats.Execute(labeled_sitk)

    # Find component with lowest centroid (tibia)
    min_z = float('inf')
    tibia_label = None

    for label in stats.GetLabels():
        centroid = stats.GetCentroid(label)
        if centroid[2] < min_z:  # z-coordinate (superior-inferior)
            min_z = centroid[2]
            tibia_label = label

    # Create tibia-only mask
    tibia_mask = (labeled_array == tibia_label).astype(np.uint8)
    return tibia_mask

def find_tibial_landmarks(tibia_mask, spacing):
    """Find medial and lateral lowest points on tibial surface"""
    # Find lowest slice with bone
    z_slices = np.any(tibia_mask, axis=(1, 2))
    if not np.any(z_slices):
        return None, None

    lowest_z = np.where(z_slices)[0][-1]
    lowest_slice = tibia_mask[lowest_z]

    # Find boundary of bone in lowest slice
    boundary_sitk = sitk.GetImageFromArray(lowest_slice.astype(np.uint8))
    boundary_sitk = sitk.BinaryContour(boundary_sitk)
    boundary_array = sitk.GetArrayFromImage(boundary_sitk)

    # Get boundary points
    boundary_points = np.where(boundary_array)
    if len(boundary_points[0]) == 0:
        return None, None

    y_coords = boundary_points[0]
    x_coords = boundary_points[1]

    # Find leftmost (medial) and rightmost (lateral) points
    left_idx = np.argmin(x_coords)
    right_idx = np.argmax(x_coords)

    # Convert to physical coordinates (mm)
    medial_point = [
        float(x_coords[left_idx] * spacing[0]),   # x
        float(y_coords[left_idx] * spacing[1]),   # y
        float(lowest_z * spacing[2])              # z
    ]

    lateral_point = [
        float(x_coords[right_idx] * spacing[0]),  # x
        float(y_coords[right_idx] * spacing[1]),  # y
        float(lowest_z * spacing[2])              # z
    ]

    return medial_point, lateral_point

def task4_landmark_detection(all_masks, spacing):
    """Detect tibial landmarks for all mask types"""
    log_step("TASK 4: Tibial Landmark Detection", "Processing all 5 masks")

    mask_names = [
        "original",
        "expanded_2mm",
        "expanded_4mm",
        "randomized_1",
        "randomized_2"
    ]

    results = {}

    for i, (mask_name, mask_array) in enumerate(zip(mask_names, all_masks)):
        print(f"    Processing {mask_name} mask...")

        # Extract tibia
        tibia_mask = find_tibia_from_mask(mask_array)

        # Find landmarks
        medial_point, lateral_point = find_tibial_landmarks(tibia_mask, spacing)

        if medial_point is not None and lateral_point is not None:
            results[mask_name] = {
                "medial_point": medial_point,
                "lateral_point": lateral_point,
                "tibia_voxels": int(np.sum(tibia_mask))
            }
            print(f"      Medial:  [{medial_point[0]:.2f}, {medial_point[1]:.2f}, {medial_point[2]:.2f}]")
            print(f"      Lateral: [{lateral_point[0]:.2f}, {lateral_point[1]:.2f}, {lateral_point[2]:.2f}]")
        else:
            print(f"      ⚠️  Could not detect landmarks for {mask_name}")
            results[mask_name] = {
                "medial_point": None,
                "lateral_point": None,
                "tibia_voxels": 0,
                "error": "Landmark detection failed"
            }

    return results

# MAIN PIPELINE


In [None]:
def main():
    """Execute the complete medical image analysis pipeline"""

    print("="*70)
    print("MEDICAL IMAGE ANALYSIS PIPELINE")
    print("="*70)
    print(f"Input: {Config.INPUT_CT_PATH}")
    print(f"Output: {Config.RESULTS_DIR}/")
    print("="*70)

    # Setup
    setup_directories()

    try:
        # Load input CT scan
        log_step("LOADING DATA", f"Reading CT scan: {Config.INPUT_CT_PATH}")
        ct_volume, affine, header = load_nii(Config.INPUT_CT_PATH)

        # Get spatial information
        ct_sitk = sitk.ReadImage(Config.INPUT_CT_PATH)
        spacing = ct_sitk.GetSpacing()  # (x, y, z)
        print(f"    Image shape: {ct_volume.shape}")
        print(f"    Voxel spacing: {spacing} mm")
        print(f"    Intensity range: [{ct_volume.min():.1f}, {ct_volume.max():.1f}] HU")

        # Task 1: Bone Segmentation
        original_mask = task1_bone_segmentation(ct_volume, affine, header)

        # Task 2: Mask Expansion
        expanded_2mm, expanded_4mm = task2_mask_expansion(
            original_mask, spacing, affine, header
        )

        # Task 3: Randomized Contours
        randomized_masks = task3_randomized_contours(
            original_mask, expanded_2mm, spacing, affine, header
        )

        # Prepare all masks for landmark detection
        all_masks = [
            original_mask,
            expanded_2mm,
            expanded_4mm,
            randomized_masks[0],
            randomized_masks[1]
        ]

        # Task 4: Landmark Detection
        landmark_results = task4_landmark_detection(all_masks, spacing)

        # Save landmark results
        log_step("SAVING RESULTS", "Writing landmark coordinates")
        results_path = os.path.join(Config.RESULTS_DIR, Config.LANDMARKS_OUTPUT)

        # Add metadata to results
        final_results = {
            "metadata": {
                "input_file": Config.INPUT_CT_PATH,
                "processing_timestamp": datetime.now().isoformat(),
                "voxel_spacing_mm": list(spacing),
                "parameters": {
                    "bone_threshold_hu": Config.BONE_THRESHOLD_HU,
                    "expansion_2mm": Config.EXPANSION_2MM,
                    "expansion_4mm": Config.EXPANSION_4MM,
                    "randomization_seeds": Config.RANDOM_SEEDS,
                    "max_randomization_mm": Config.MAX_RANDOMIZATION_MM
                }
            },
            "landmarks": landmark_results
        }

        with open(results_path, 'w') as f:
            json.dump(final_results, f, indent=2)
        print(f"    Saved: {results_path}")

        # Summary
        log_step("PIPELINE COMPLETE", "All tasks finished successfully")
        print("\n" + "="*70)
        print("SUMMARY")
        print("="*70)
        print("Generated Files:")
        for filename in [Config.ORIGINAL_MASK, Config.EXPANDED_2MM_MASK,
                        Config.EXPANDED_4MM_MASK, Config.RANDOMIZED_MASK_1,
                        Config.RANDOMIZED_MASK_2, Config.LANDMARKS_OUTPUT]:
            filepath = os.path.join(Config.RESULTS_DIR, filename)
            if os.path.exists(filepath):
                print(f"  ✓ {filename}")
            else:
                print(f"  ✗ {filename} (MISSING)")

        print(f"\nLandmark Detection Results:")
        for mask_name, result in landmark_results.items():
            if result.get("medial_point") is not None:
                print(f"  ✓ {mask_name}: Landmarks detected")
            else:
                print(f"  ✗ {mask_name}: {result.get('error', 'Failed')}")

        print("="*70)

    except FileNotFoundError as e:
        print(f"❌ ERROR: {e}")
        print("Please ensure the input CT file exists at the specified path.")
        return 1
    except Exception as e:
        print(f"❌ PIPELINE ERROR: {e}")
        import traceback
        traceback.print_exc()
        return 1

    return 0

if __name__ == "__main__":
    exit_code = main()
    exit(exit_code)

MEDICAL IMAGE ANALYSIS PIPELINE
Input: /content/left_knee.nii.gz
Output: results/
✓ Results directory created: results
[17:31:51] LOADING DATA
    Reading CT scan: /content/left_knee.nii.gz
    Image shape: (512, 512, 216)
    Voxel spacing: (0.8691409826278687, 0.8691409826278687, 2.0) mm
    Intensity range: [-3024.0, 1769.0] HU
[17:31:55] TASK 1: Bone Segmentation
    Segmenting femur and tibia
    Raw bone voxels: 288032
    Final bone voxels: 269826
    Saved: results/original_mask.nii.gz
[17:32:06] TASK 2: Mask Expansion
    Creating 2mm and 4mm expanded masks
    2.0mm → voxels: [1, 3, 3]
    Saved: results/expanded_mask_2mm.nii.gz
    2mm expanded voxels: 612786
    4.0mm → voxels: [2, 5, 5]
    Saved: results/expanded_mask_4mm.nii.gz
    4mm expanded voxels: 805907
[17:33:01] TASK 3: Randomized Contour Generation
    Creating 2 randomized versions
    Generating randomized mask 1 (seed 42)
    Saved: results/randomized_mask_1_seed42.nii.gz
    ✓ Validation passed for seed 42
 