# PIPELINE

In [2]:
import nibabel as nib
import numpy as np
import torch
import torchvision.models as models
import torch.nn as nn
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity

In [3]:
# File paths
ct_path = "../data/3702_left_knee.nii.gz"
mask_path = "../data/original_mask.nii.gz"
output_csv = "../output/similarities.csv"

## Step 1: Segmentation-Based Splitting

In [4]:
def load_and_segment_ct(ct_path, mask_path):
    # Load CT scan and mask
    ct_img = nib.load(ct_path).get_fdata()
    mask_img = nib.load(mask_path).get_fdata()

    # Verify mask values
    print(f"Mask Unique Values: {np.unique(mask_img)}")
    
    # Account for swapped labels: tibia = 2, femur = 1
    tibia_mask = (mask_img == 2).astype(np.float32)  # Tibia (value=2)
    femur_mask = (mask_img == 1).astype(np.float32)  # Femur (value=1)
    background_mask = (mask_img == 0).astype(np.float32)  # Background

    # Apply masks to CT scan
    tibia_region = ct_img * tibia_mask
    femur_region = ct_img * femur_mask
    background_region = ct_img * background_mask

    return tibia_region, femur_region, background_region

## Step 2: Convert 2D Pretrained Model to 3D

In [5]:
def inflate_densenet121_to_3d():
    # Load pretrained 2D DenseNet121
    model_2d = models.densenet121(weights='DEFAULT')  # Use 'weights' for newer PyTorch
    model_2d.eval()

    # Create a new model for 3D
    class DenseNet1213D(nn.Module):
        def __init__(self, model_2d):
            super(DenseNet1213D, self).__init__()
            self.features = nn.Sequential()
            for name, module in model_2d.features.named_children():
                if isinstance(module, nn.Conv2d):
                    # Inflate Conv2d to Conv3d
                    out_channels = module.out_channels
                    in_channels = module.in_channels
                    kernel_size = module.kernel_size[0]  # Assume square kernel
                    stride = module.stride[0]
                    padding = module.padding[0]
                    weight_2d = module.weight.data  # Shape: (out_channels, in_channels, h, w)
                    depth = kernel_size  # New depth dimension
                    weight_3d = weight_2d.unsqueeze(2).repeat(1, 1, depth, 1, 1) / depth  # Normalize by depth
                    conv3d = nn.Conv3d(in_channels, out_channels, 
                                      kernel_size=(kernel_size, kernel_size, kernel_size),
                                      stride=(stride, stride, stride), 
                                      padding=(padding, padding, padding))
                    conv3d.weight.data = weight_3d
                    if module.bias is not None:
                        conv3d.bias.data = module.bias.data
                    self.features.add_module(name, conv3d)
                else:
                    # Copy non-convolutional layers
                    if isinstance(module, nn.MaxPool2d):
                        self.features.add_module(name, nn.MaxPool3d(
                            kernel_size=module.kernel_size,
                            stride=module.stride,
                            padding=module.padding))
                    elif isinstance(module, nn.BatchNorm2d):
                        self.features.add_module(name, nn.BatchNorm3d(module.num_features))
                    else:
                        self.features.add_module(name, module)

        def forward(self, x):
            return self.features(x)

    return DenseNet1213D(model_2d)

In [6]:
# Step 3: Feature Extraction
def get_conv_layers(model):
    conv_layers = []
    for name, module in model.features.named_children():
        if isinstance(module, nn.Conv3d):
            conv_layers.append(name)
    # Ensure we have enough conv layers
    if len(conv_layers) < 5:
        raise ValueError(f"Not enough Conv3d layers ({len(conv_layers)} found, need at least 5)")
    return conv_layers[-1], conv_layers[-3], conv_layers[-5]

def extract_features(model, volume, device='cuda'):
    model = model.to(device)
    # Ensure volume is 3D and add batch and channel dimensions
    if volume.ndim != 3:
        raise ValueError(f"Expected 3D volume, got shape {volume.shape}")
    volume = torch.from_numpy(volume).float().unsqueeze(0).unsqueeze(0).to(device)  # Shape: (1, 1, D, H, W)

    # Get convolutional layer names
    last_layer, third_last_layer, fifth_last_layer = get_conv_layers(model)
    feature_maps = {}

    # Define hook to capture feature maps
    def hook_fn(name):
        def hook(module, input, output):
            feature_maps[name] = output
        return hook

    # Register hooks
    for name, module in model.features.named_children():
        if name in [last_layer, third_last_layer, fifth_last_layer]:
            module.register_forward_hook(hook_fn(name))

    # Run model
    model.eval()
    with torch.no_grad():
        _ = model(volume)

    # Apply global average pooling
    feature_vectors = {}
    for name in feature_maps:
        fmap = feature_maps[name].squeeze(0)  # Shape: (C, D, H, W)
        gap = nn.AdaptiveAvgPool3d(1)(fmap).squeeze().cpu().numpy()  # Shape: (C,)
        feature_vectors[name] = gap

    return feature_vectors

# Step 4: Feature Comparison
def compute_cosine_similarity(features_tibia, features_femur, features_background):
    layers = list(features_tibia.keys())
    similarities = {
        'Tibia_Femur': [],
        'Tibia_Background': [],
        'Femur_Background': []
    }

    for layer in layers:
        tibia_vec = features_tibia[layer].reshape(1, -1)
        femur_vec = features_femur[layer].reshape(1, -1)
        background_vec = features_background[layer].reshape(1, -1)

        sim_tibia_femur = cosine_similarity(tibia_vec, femur_vec)[0][0]
        sim_tibia_background = cosine_similarity(tibia_vec, background_vec)[0][0]
        sim_femur_background = cosine_similarity(femur_vec, background_vec)[0][0]

        similarities['Tibia_Femur'].append(sim_tibia_femur)
        similarities['Tibia_Background'].append(sim_tibia_background)
        similarities['Femur_Background'].append(sim_femur_background)

    return similarities, layers

# Step 5: Result Organization
def save_results(similarities, layers, output_csv):
    # Create DataFrame with one row for the image pair
    df_data = {}
    for i, layer in enumerate(layers):
        df_data[f'{layer}_Tibia_Femur'] = [similarities['Tibia_Femur'][i]]
        df_data[f'{layer}_Tibia_Background'] = [similarities['Tibia_Background'][i]]
        df_data[f'{layer}_Femur_Background'] = [similarities['Femur_Background'][i]]
    df = pd.DataFrame(df_data)
    df.to_csv(output_csv, index=False)
    print(f"Results saved to {output_csv}")

# Main Pipeline
def main(ct_path, mask_path, output_csv):
    # Step 1: Segment CT scan
    tibia, femur, background = load_and_segment_ct(ct_path, mask_path)
    print(f"Segmented regions - Tibia shape: {tibia.shape}, Femur shape: {femur.shape}, Background shape: {background.shape}")

    # Step 2: Initialize 3D model
    model_3d = inflate_densenet121_to_3d()
    print("3D DenseNet121 model initialized")

    # Step 3: Extract features
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {device}")
    features_tibia = extract_features(model_3d, tibia, device)
    features_femur = extract_features(model_3d, femur, device)
    features_background = extract_features(model_3d, background, device)
    print("Features extracted for all regions")

    # Step 4: Compute cosine similarities
    similarities, layers = compute_cosine_similarity(features_tibia, features_femur, features_background)
    print(f"Cosine similarities computed for layers: {layers}")

    # Step 5: Save results
    save_results(similarities, layers, output_csv)

if __name__ == "__main__":
    main(ct_path, mask_path, output_csv)

Mask Unique Values: [0. 1. 2.]
Segmented regions - Tibia shape: (512, 512, 216), Femur shape: (512, 512, 216), Background shape: (512, 512, 216)


Downloading: "https://download.pytorch.org/models/densenet121-a639ec97.pth" to C:\Users\ACER/.cache\torch\hub\checkpoints\densenet121-a639ec97.pth
100.0%


3D DenseNet121 model initialized
Using device: cpu


ValueError: Not enough Conv3d layers (1 found, need at least 5)