In [2]:
%pip install monai




In [70]:
%pip install nilearn

Collecting nilearn
  Using cached nilearn-0.10.4-py3-none-any.whl.metadata (7.8 kB)
Using cached nilearn-0.10.4-py3-none-any.whl (10.4 MB)
Installing collected packages: nilearn
Successfully installed nilearn-0.10.4
Note: you may need to restart the kernel to use updated packages.




# Imports

In [102]:
import os
import nibabel as nib
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import monai
from nilearn.image import resample_img

# Function to load and preprocess Nifti images
https://www.kaggle.com/code/mechaman/resizing-reshaping-and-resampling-nifti-files/notebook

In [104]:
def preprocess_3d_image(image, dataset):
    """
    Preprocess the input 3D image by standardizing shape and affine transformation.
    This includes resizing the image and setting it to a consistent resolution.

    Args:
        image (nifti image): A 3D image in NIfTI format to be preprocessed.
        dataset (list of dict): A list of dictionaries, where each dictionary contains medical images,
            stored under keys like 'mr' or 'ct', as numpy arrays.

    Returns:
        nifti preprcesse image: A preprocessed NIfTI image with standardized shape and affine transformation.
                         The image has been resampled to the target shape and resolution.
    
    """
    # Step 1: Gather the shapes of all images in the dataset
    all_shapes = []
    for element in dataset:
        all_shapes.append(element['mr'].shape[:3])  # Only the 3D shape is considered

    # Step 2: Calculate a target shape (new_size) based on the average shape across the dataset.
    # The size is rounded to the nearest power of 2 to ensure compatibility with common deep learning models.
    new_size = (2 ** np.round(np.log2(np.array(all_shapes).mean(0)))).astype(int)
    #print(f"Calculated target shape (new_size): {new_size}")

    # Step 3: Define a new resolution for the resampling.
    # Here, each voxel is set to represent 2 mm in each spatial dimension.
    new_resolution = [2,] * 3  # Sets a voxel resolution of 2 mm x 2 mm x 2 mm
    new_affine = np.zeros((4, 4))  # Initialize a 4x4 affine matrix for the transformation
    new_affine[:3, :3] = np.diag(new_resolution)  # Sets the diagonal to the new resolution

    # Step 4: Position the (0,0,0) point in the center of the new volume.
    # This is done by shifting the translation part of the affine matrix.
    new_affine[:3, 3] = new_size * new_resolution / 2. * -1
    new_affine[3, 3] = 1.  # Set the bottom-right corner to 1, completing the affine matrix

    # Step 5: Resample the image to the new affine and target shape using nearest-neighbor interpolation
    preprocessed_image = resample_img(
        image, target_affine=new_affine, target_shape=new_size, interpolation='nearest'
    )

    return preprocessed_image.get_fdata()


In [126]:
def load_images(directory_pelvis: str,output_folder="./data"):
    """
    Load nifti images and preprocess them for each patient pair MRI-CT.

    Args:
        directory_pelvis (str): directory containing each patient's folders.
    
    Return:
        list: A list of dictionaries, each containing 'mri' and 'ct' images as numpy arrays.
    """
    
    os.makedirs(output_folder, exist_ok=True)

    dataset = []
    preprocessed_images = []
    
    for folder_patient in os.listdir(directory_pelvis)[:5]:
        patient_path = os.path.join(directory_pelvis, folder_patient)

        # Load MRI and CT images for each patient
        mri_path = os.path.join(patient_path, "mr.nii.gz")
        ct_path = os.path.join(patient_path, "ct.nii.gz")

        if os.path.exists(mri_path) and os.path.exists(ct_path):
            mri_image = nib.load(mri_path).get_fdata()  # Load as numpy array
            ct_image = nib.load(ct_path).get_fdata()    # Load as numpy array
            # Append the MRI-CT pair and label to the dataset
            dataset.append({"mr": mri_image, "ct": ct_image})

            # Preprocess MRI and CT images
            preprocessed_mri =  torch.tensor(preprocess_3d_image(nib.load(mri_path), dataset), dtype=torch.float32)
            preprocessed_ct = torch.tensor(preprocess_3d_image(nib.load(ct_path), dataset), dtype=torch.float32)

            # Save preprocessed images in the list
            preprocessed_images.append({"mr": preprocessed_mri, "ct": preprocessed_ct})
            
    return preprocessed_images

preprocessed_images = load_images("C:\\Users\\catar\\OneDrive - Universidade de Coimbra\\Ambiente de Trabalho\\Master Thesis\\Dataset SynthRAD\\Task1\\pelvis")



# GAN Model Setup
https://medium.com/@Avizyt/understanding-generative-neural-networks-with-pytorch-a-step-by-step-guide-164a314062e6

In [140]:
# Define the Generator network using 3D transpose convolutions
class Generator(nn.Module):
    def __init__(self, latent_dim):
        super(Generator, self).__init__()
        self.init_size = 8  # Starting size (adjust based on target shape)
        self.l1 = nn.Sequential(nn.Linear(latent_dim, 512 * self.init_size * self.init_size * self.init_size))
        
        self.conv_blocks = nn.Sequential(
            nn.BatchNorm3d(512),
            nn.Upsample(scale_factor=2),  # Upsample to [batch_size, 512, 16, 16, 16]
            nn.Conv3d(512, 256, 3, stride=1, padding=1),
            nn.BatchNorm3d(256),
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2),  # Upsample to [batch_size, 256, 32, 32, 32]
            nn.Conv3d(256, 128, 3, stride=1, padding=1),
            nn.BatchNorm3d(128),
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=(16, 8, 4)),  # Upsample to [batch_size, 128, 512, 256, 128]
            nn.Conv3d(128, 1, 3, stride=1, padding=1),
            nn.Tanh()  # Use Tanh to match the range of real images (e.g., normalized to [-1, 1])
        )

    def forward(self, z):
        out = self.l1(z)
        out = out.view(out.shape[0], 512, self.init_size, self.init_size, self.init_size)  # Reshape to initial size
        img = self.conv_blocks(out)
        return img

# Define the Discriminator network
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv3d(1, 64, 3, stride=2, padding=1),  # [batch_size, 64, 256, 128, 64]
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv3d(64, 128, 3, stride=2, padding=1),  # [batch_size, 128, 128, 64, 32]
            nn.BatchNorm3d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv3d(128, 256, 3, stride=2, padding=1),  # [batch_size, 256, 64, 32, 16]
            nn.BatchNorm3d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv3d(256, 512, 3, stride=2, padding=1),  # [batch_size, 512, 32, 16, 8]
            nn.BatchNorm3d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv3d(512, 1, 3, stride=1, padding=0),   # Output a single value
            nn.Sigmoid()  # For binary classification (real/fake)
        )

    def forward(self, img):
        return self.model(img).view(img.size(0), -1)  # Flatten to [batch_size, 1]


# Instances of the generator and discriminator

In [142]:
latent_dim = 100

generator = Generator(latent_dim) # Define the output shape for your generator
discriminator = Discriminator() # Define the input shape for your discriminator

# Loss function and optimizers for generator and discriminator

In [138]:
criterion = nn.BCELoss()
optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

# Training the GAN

In [143]:
# Training loop
num_epochs = 10
batch_size = 4

train_loader = torch.utils.data.DataLoader(
            preprocessed_images,
            batch_size,
            num_workers=10,
            shuffle=True
)

for epoch in range(num_epochs):
    for i, batch in enumerate(train_loader):
        # Sample a batch of real medical images
        real_images = batch["mr"].unsqueeze(1)
        print(real_images.shape) # torch.Size([4, 1, 512, 256, 128])

        # Generate random noise as input for the generator
        noise = torch.randn(real_images.shape[0], latent_dim)

        # Generate fake medical images using the generator
        fake_images = generator(noise)
        print(fake_images.shape)

        # Train the discriminator on real images
        optimizer_D.zero_grad()
        real_labels = torch.ones(batch_size, 1)
        d_loss_real = criterion(discriminator(real_images), real_labels)
        d_loss_real.backward()

        # Train the discriminator on fake images
        fake_labels = torch.zeros(batch_size, 1)
        d_loss_fake = criterion(discriminator(fake_images.detach()), fake_labels)
        d_loss_fake.backward()
        optimizer_D.step()

        # Calculate discriminator loss
        d_loss = d_loss_real + d_loss_fake

        # Train the generator
        optimizer_G.zero_grad()
        g_loss = criterion(discriminator(fake_images), real_labels)
        g_loss.backward()
        optimizer_G.step()

        # Print progress
        print(f"Epoch [{epoch}/{num_epochs}], Batch [{i}/{preprocessed_images.shape[0]}], D Loss: {d_loss.item()}, G Loss: {g_loss.item()}")

torch.Size([4, 1, 512, 256, 128])


RuntimeError: [enforce fail at alloc_cpu.cpp:114] data. DefaultCPUAllocator: not enough memory: you tried to allocate 34359738368 bytes.