### Complete PET-to-CT Translation Pipeline
 **Architecture**: ResNet-34 Encoder + ViT Bottleneck + CNN Decoder  
 **Features**:
 - TCIA API download
 - NPY/PNG preprocessing (7GB storage)
 - Mixed precision training
 - Multi-scale SSIM loss
 - Model checkpointing

### 0. Install Dependencies

In [1]:
%pip install pydicom numpy pillow tqdm requests torch torchvision pytorch-msssim einops kaggle --quiet

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m36.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m68.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m36.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m49.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [3]:
%pip install optuna

Collecting optuna
  Downloading optuna-4.3.0-py3-none-any.whl.metadata (17 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.15.2-py3-none-any.whl.metadata (7.3 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading optuna-4.3.0-py3-none-any.whl (386 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m386.6/386.6 kB[0m [31m28.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.15.2-py3-none-any.whl (231 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m231.9/231.9 kB[0m [31m23.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, alembic, optuna
Successfully installed alembic-1.15.2 colorlog-6.9.0 optuna-4.3.0


In [4]:
import os
import numpy as np
import pydicom
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from tqdm import tqdm
from multiprocessing import Pool
from pytorch_msssim import MS_SSIM
from einops import rearrange
from torch.cuda.amp import autocast, GradScaler
import requests
import zipfile
import io
import random
from torch.utils.tensorboard import SummaryWriter
import optuna
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error


#### 1. Download QIN-Breast from TCIA

In [5]:
from google.colab import drive
drive.mount('/content/drive')

# Input directories for your original DICOM files
pet_dir = '/content/drive/MyDrive/PIX/PET'
ct_dir  = '/content/drive/MyDrive/PIX/CT'

# Output directories for processed .npy files
processed_pet_dir = '/content/drive/MyDrive/PIX/processed/PET'
processed_ct_dir  = '/content/drive/MyDrive/PIX/processed/CT'


MessageError: Error: credential propagation was unsuccessful

#### 2. Preprocess to NPY/PNG

In [None]:
def process_dicom_file(args):
    """
    Convert a DICOM (.dcm) file to a NumPy (.npy) file.

    Args:
        args (tuple): Contains:
                      - dicom_file (str): Full path to the DICOM file.
                      - output_dir (str): Directory where the .npy file will be saved.
    """
    dicom_file, output_dir = args
    os.makedirs(output_dir, exist_ok=True)

    try:
        ds = pydicom.dcmread(dicom_file)
        img_array = ds.pixel_array  # Extract the image as a NumPy array

        # Replace the .dcm extension with .npy for the output file
        base_name = os.path.basename(dicom_file)
        output_filename = os.path.splitext(base_name)[0] + '.npy'
        output_path = os.path.join(output_dir, output_filename)

        # Save the NumPy array
        np.save(output_path, img_array)
        print(f"Converted '{dicom_file}' to '{output_path}'.")
    except Exception as e:
        print(f"Error processing {dicom_file}: {e}")


In [None]:
def preprocess_dataset(pet_dir, ct_dir, processed_pet_dir, processed_ct_dir):
    """
    Process all DICOM files in PET and CT directories and convert them to .npy files.
    """
    # Process PET files
    pet_files = sorted([os.path.join(pet_dir, f) for f in os.listdir(pet_dir) if f.endswith('.dcm')])
    print(f"Found {len(pet_files)} PET files.")
    for dicom_file in pet_files:
        process_dicom_file((dicom_file, processed_pet_dir))

    # Process CT files
    ct_files = sorted([os.path.join(ct_dir, f) for f in os.listdir(ct_dir) if f.endswith('.dcm')])
    print(f"Found {len(ct_files)} CT files.")
    for dicom_file in ct_files:
        process_dicom_file((dicom_file, processed_ct_dir))

    print("Preprocessing complete.")

# Run preprocessing (if you haven't done it already)
preprocess_dataset(pet_dir, ct_dir, processed_pet_dir, processed_ct_dir)


#### Step 2. Split the Processed Dataset
Since your processed files now exist as .npy files in separate folders for PET and CT—with matching filenames—we can list the PET folder (or CT, as they should be identical) and then randomly split that list.

In [None]:
def get_file_splits(processed_pet_dir, test_size=0.1, val_size=0.1):
    """
    Split the dataset based on the .npy files in the processed_pet_dir.
    Assumes that processed_pet_dir and processed_ct_dir contain matching filenames.

    This function returns three lists: train, validation, and test filenames.
    """
    all_files = sorted([f for f in os.listdir(processed_pet_dir) if f.endswith('.npy')])
    print("Total number of processed paired images:", len(all_files))

    # First, split into train and temporary set (val + test)
    train_files, temp_files = train_test_split(all_files, test_size=(test_size + val_size), random_state=42)

    # Then, split the temporary set into validation and test set equally, based on provided ratios.
    val_ratio = val_size / (test_size + val_size)
    val_files, test_files = train_test_split(temp_files, test_size=(1 - val_ratio), random_state=42)

    print("Train samples:", len(train_files))
    print("Validation samples:", len(val_files))
    print("Test samples:", len(test_files))

    return train_files, val_files, test_files

# Get the file splits
#train_files, val_files, test_files = get_file_splits(processed_pet_dir, test_size=0.1, val_size=0.1)


In [None]:
import torch
from torch.utils.data import Dataset

class QinBreastDataset(Dataset):
    def __init__(self, filenames, processed_pet_dir, processed_ct_dir, transform=None):
        """
        filenames: list of filenames (e.g., "1-01.npy") existing in both directories.
        processed_pet_dir: Directory where processed PET .npy files are stored.
        processed_ct_dir: Directory where processed CT .npy files are stored.
        transform: Optional transform (e.g., normalization, conversion to tensor) to apply.
        """
        self.filenames = filenames
        self.pet_dir = processed_pet_dir
        self.ct_dir = processed_ct_dir
        self.transform = transform

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        filename = self.filenames[idx]
        pet_path = os.path.join(self.pet_dir, filename)
        ct_path  = os.path.join(self.ct_dir, filename)

        pet_img = np.load(pet_path)
        ct_img  = np.load(ct_path)

        if self.transform:
            pet_img = self.transform(pet_img)
            ct_img = self.transform(ct_img)

        return pet_img, ct_img


#### 4. Model Architecture

In [None]:
# %% [code]
# ======================

class ViTBlock(nn.Module):
    def __init__(self, dim=512, heads=8, dropout=0.1):
        super().__init__()
        self.attention = nn.MultiheadAttention(dim, heads, dropout=dropout)
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)
        self.mlp = nn.Sequential(
            nn.Linear(dim, dim*4),
            nn.GELU(),
            nn.Dropout(dropout),  #GEL nad Dropout for better stability
            nn.Linear(dim*4, dim)
        )

    def forward(self, x):
        attn_out, _ = self.attention(x, x, x)
        x = self.norm1(x + attn_out)
        mlp_out = self.mlp(x)
        return self.norm2(x + mlp_out)

class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        # Encoder (ResNet-34)
        #resnet = models.resnet34(pretrained=True)
        resnet = models.resnet34(weights=models.ResNet34_Weights.DEFAULT)

        self.encoder = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False),
            *list(resnet.children())[1:-2]  # Remove original fc layer
        )

        # ViT Bottleneck
        self.vit = nn.Sequential(
            ViTBlock(dim=512),
            #ViTBlock(dim=512),
           # ViTBlock(dim=512)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 1, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        b, c, h, w = x.shape
        x = rearrange(x, 'b c h w -> (h w) b c')
        x = self.vit(x)
        x = rearrange(x, '(h w) b c -> b c h w', h=h, w=w)
        #return self.decoder(x)
        return self.decoder(x.to(device))  # # Move output tensor back to GPU



Multi-Scale Discriminator is designed to assess images at different resolutions, improving adversarial learning stability

In [None]:
class MultiScaleDiscriminator(nn.Module):
    def __init__(self, input_channels=1):
        super().__init__()
        self.discriminators = nn.ModuleList([
            self._make_discriminator(input_channels, 64),
            self._make_discriminator(input_channels, 32),
            self._make_discriminator(input_channels, 16)
        ])

    def _make_discriminator(self, in_ch, base_ch):
        return nn.Sequential(
            nn.utils.spectral_norm(nn.Conv2d(in_ch, base_ch, 4, 2, 1)),  #Improves training stability by constraining weight norms.
            nn.LeakyReLU(0.2),
            nn.utils.spectral_norm(nn.Conv2d(base_ch, base_ch*2, 4, 2, 1)),
            nn.InstanceNorm2d(base_ch*2),         #Helps normalize features, preventing vanishing or exploding gradients.
            nn.LeakyReLU(0.2),
            nn.utils.spectral_norm(nn.Conv2d(base_ch*2, 1, 4, 1, 1)),
            nn.AdaptiveAvgPool2d(1)
        )

    def forward(self, x):
        outputs = []
        x = x.to(device)  # Ensure tensor is on GPU
        for disc in self.discriminators:
            outputs.append(disc(x))
            #x = nn.functional.interpolate(x, scale_factor=0.5, mode='bilinear')
            x = nn.functional.interpolate(x, scale_factor=0.5, mode='nearest')
         #return torch.cat(outputs, dim=1)
        return torch.cat(outputs, dim=1).to(device)  # Keep output on GPU

####  5. Training Utilities

In [None]:
#from torchvision.models import vgg19
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import vgg19, VGG19_Weights

class VGGLoss(nn.Module):
    def __init__(self, requires_grad=False):
        super(VGGLoss, self).__init__()
        # Load a pre-trained VGG19 model
        vgg = vgg19(weights=VGG19_Weights.IMAGENET1K_V1).features

        # Break the network into slices corresponding to different layers' outputs.
        self.slice1 = nn.Sequential(*[vgg[x] for x in range(2)])
        self.slice2 = nn.Sequential(*[vgg[x] for x in range(2, 7)])
        self.slice3 = nn.Sequential(*[vgg[x] for x in range(7, 12)])
        self.slice4 = nn.Sequential(*[vgg[x] for x in range(12, 21)])
        self.slice5 = nn.Sequential(*[vgg[x] for x in range(21, 30)])

        # Freeze the VGG parameters if not training them.
        if not requires_grad:
            for param in self.parameters():
                param.requires_grad = False

    def forward(self, x, y):
        # Compute feature maps at various depths
        loss = 0
        x1, y1 = self.slice1(x), self.slice1(y)
        loss += F.l1_loss(x1, y1)

        x2, y2 = self.slice2(x), self.slice2(y)
        loss += F.l1_loss(x2, y2)

        x3, y3 = self.slice3(x), self.slice3(y)
        loss += F.l1_loss(x3, y3)

        x4, y4 = self.slice4(x), self.slice4(y)
        loss += F.l1_loss(x4, y4)

        x5, y5 = self.slice5(x), self.slice5(y)
        loss += F.l1_loss(x5, y5)

        return loss


In [None]:
class TotalLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.l1 = nn.L1Loss()
        self.vgg = VGGLoss()
        self.ms_ssim = MS_SSIM(data_range=1.0, channel=1)

    def forward(self, gen_ct, real_ct, D_real, D_fake, D):
        # Reconstruction Losses
        l1_loss = self.l1(gen_ct, real_ct)
        ms_ssim_loss = 1 - self.ms_ssim(gen_ct, real_ct)
        vgg_loss = self.vgg(gen_ct, real_ct)

        # Adversarial Loss
        adv_loss = -torch.mean(D_fake)

        # Gradient Penalty
        gp = self._gradient_penalty(D, real_ct, gen_ct.detach())

        return 100*l1_loss + ms_ssim_loss + 0.1*vgg_loss + 10*(adv_loss + gp)

    def _gradient_penalty(self, D, real, fake):
        alpha = torch.rand(real.size(0), 1, 1, 1, device=real.device)
        interpolates = (alpha * real + ((1 - alpha) * fake)).requires_grad_(True)
        #d_interpolates = D(interpolates)
        d_interpolates = D(interpolates).view(-1)

        gradients = torch.autograd.grad(
            outputs=d_interpolates,
            inputs=interpolates,
            grad_outputs=torch.ones_like(d_interpolates),
            create_graph=True,
            retain_graph=True
        )[0]
        return ((gradients.norm(2, dim=1) - 1) ** 2).mean()

def psnr(output, target):
    """Compute PSNR between [-1,1] normalized tensors"""
    output = (output + 1) / 2  # [-1,1] → [0,1]
    target = (target + 1) / 2
    mse = torch.mean((output - target) ** 2)
    mse = torch.clamp(mse, min=1e-8)  # Avoid division by zero
    return 20 * torch.log10(1.0 / torch.sqrt(mse))

#### 6. Main Training Loop

In [None]:
def train():
    # 1. Get file splits from the processed PET directory (paired with CT files)
    train_files, val_files, test_files = get_file_splits(processed_pet_dir, test_size=0.1, val_size=0.1)

    # 2. Create Dataset objects for training, validation, and testing.
    train_dataset = QinBreastDataset(train_files, processed_pet_dir, processed_ct_dir, transform=None)
    val_dataset   = QinBreastDataset(val_files,   processed_pet_dir, processed_ct_dir, transform=None)
    test_dataset  = QinBreastDataset(test_files,  processed_pet_dir, processed_ct_dir, transform=None)

    # 3. Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2)
    val_loader   = DataLoader(val_dataset,   batch_size=4, shuffle=False, num_workers=2)
    test_loader  = DataLoader(test_dataset,  batch_size=4, shuffle=False, num_workers=2)

    # 4. Define device and initialize your model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Instantiate the Generator model (this should include the ViT block as needed)
    generator = Generator().to(device)

    # Optionally, if you're using a discriminator and other components, instantiate them here.
    # For now, we focus on training the Generator model for a simple demo.

    # 5. Setup optimizer and loss function (e.g., L1 loss for reconstruction)
    optimizer = torch.optim.Adam(generator.parameters(), lr=1e-4)
    criterion = nn.L1Loss()

    num_epochs = 10  # Adjust number of training epochs

    # 6. Training loop
    for epoch in range(num_epochs):
        generator.train()
        epoch_loss = 0.0

        for pet_img, ct_img in train_loader:
            # Convert numpy arrays to torch tensors and add a channel dimension if needed.
            # Assuming the npy files are in shape [H, W]. We convert them to [B, 1, H, W]
            pet_img = torch.tensor(pet_img, dtype=torch.float32).unsqueeze(1).to(device)
            ct_img = torch.tensor(ct_img, dtype=torch.float32).unsqueeze(1).to(device)

            optimizer.zero_grad()

            # Forward pass: The Generator takes PET image and outputs a predicted CT image.
            pred_ct = generator(pet_img)

            # Compute loss between the predicted CT and the real CT image.
            loss = criterion(pred_ct, ct_img)

            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

    print("Training complete!")

####  7. Execute Pipeline

In [None]:
if __name__ == "__main__":
    train()