In [None]:
# Mount Google Drive
from google.colab import drive

# Mount Google Drive to access your files
drive.mount('/content/drive')

In [None]:
import os
import zipfile

# Load zip from Google Drive
zip_path = "/content/drive/MyDrive/678_Team4_Dataset/ISPY1_Sample.zip"
extract_path = "/content"

if zipfile.is_zipfile(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
        print("✅ Extracted patch dataset")

In [None]:
!pip install pydicom

# DICOM Image Metadata Extraction and Organization


In [None]:
import os
import pandas as pd

# Step 1: Define input file and output folder paths
input_file = "/content/drive/MyDrive/metadata.csv"
output_folder = "/content/drive/MyDrive/image_metadata"
output_file = os.path.join(output_folder, "image_metadata.csv")

# Step 2: Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Step 3: Load the original metadata
df = pd.read_csv(input_file)

# Step 4: Fill missing or invalid 'Number of Images' values with 1
df['Number of Images'] = pd.to_numeric(df['Number of Images'], errors='coerce').fillna(1).astype(int)

# Step 5: Replicate rows based on 'Number of Images'
df_expanded = df.loc[df.index.repeat(df['Number of Images'])].reset_index(drop=True)

# Step 6: Select relevant columns for the refined output
refined_df = df_expanded[[
    'Subject ID',
    'Study Date',
    'Series Description',
    'File Location',
    'Study Description',
    'Number of Images'
]]

# Step 7: Save the refined DataFrame as image_metadata.csv in image_metadata/ folder
refined_df.to_csv(output_file, index=False)

print(f"Refined metadata saved at: {output_file}")


#Creating labels to Images

In [None]:
import zipfile
import os
import numpy as np

# Define the path to your zip file stored in Google Drive
zip_file_path = '/content/drive/MyDrive/Tumor Progression ML Datasets/NPY_data1.zip'
extract_folder = '/content/drive/MyDrive/Tumor Progression ML Datasets/NPY_data1/NPY_data1'

# Check if the folder already exists (i.e., files are already extracted)
if not os.path.exists(extract_folder):
    # Unzip the file if the folder does not exist
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_folder)
    print(f"Extracted files to: {extract_folder}")
else:
    print(f"Files already extracted to: {extract_folder}")

# Now, define the directory containing the .npy files
npy_dir = extract_folder  # Path to the folder where you extracted the files

# Check if the .npy files are present by recursively listing all files in the directory
def list_npy_files(directory):
    npy_files = []
    for root, dirs, files in os.walk(directory):  # os.walk recursively lists all files
        for file in files:
            if file.lower().endswith('.npy'):  # Check if it's a .npy file
                npy_files.append(os.path.join(root, file))  # Add the full file path
    return npy_files

# List all .npy files in the directory
npy_files = list_npy_files(npy_dir)

# Check if npy_files is empty
if not npy_files:
    print("No .npy files found in the extracted directory.")
else:
    # Initialize empty lists for image data and labels
    image_data = []
    labels = []

    # Define the target shape for resizing (e.g., 256x256)
    target_shape = (256, 256)  # Define the shape to which you want to resize images

    # Loop through each .npy file and assign label as 1 (tumor)
    for file in npy_files:
        # Load the image
        img = np.load(file)

        # Resize the image to ensure consistency in shape
        if img.shape != target_shape:
            img = np.resize(img, target_shape)  # Resize image to target shape

        # Append the image data
        image_data.append(img)

        # Label all images as 1 (tumor) initially
        labels.append(1)  # Tumor label

    # Convert lists to NumPy arrays
    image_data = np.array(image_data)
    labels = np.array(labels)

    # Display the first few labels for verification
    print(f"First 5 images and labels:")
    for i in range(5):
        print(f"Image {i+1} label: {labels[i]}")


#Saving images and Labels to folders

In [None]:
np.save('/content/drive/MyDrive/Tumor Progression ML Datasets/image_data.npy', image_data)
print(f"Image data saved to /content/drive/MyDrive/Tumor Progression ML Datasets/image_data.npy")

np.save('/content/drive/MyDrive/Tumor Progression ML Datasets/labels.npy', labels)
print(f"Labels saved to /content/drive/MyDrive/Tumor Progression ML Datasets/labels.npy")

#Splitting the Data into Training, Testing and Validation

In [None]:
import os
import numpy as np
from sklearn.model_selection import train_test_split

# Define the paths to the saved .npy files
image_data_path = '/content/drive/MyDrive/Tumor Progression ML Datasets/image_data.npy'
labels_path = '/content/drive/MyDrive/Tumor Progression ML Datasets/labels.npy'

# Check if the files exist before loading
if os.path.exists(image_data_path) and os.path.exists(labels_path):
    # Load the .npy files
    image_data = np.load(image_data_path)
    labels = np.load(labels_path)

    print("Image data and labels successfully loaded.")
else:
    print("Image data or labels file does not exist.")

# Now perform the data splitting
# Split the data into 80% training, 10% validation, and 10% test
X_train, X_temp, y_train, y_temp = train_test_split(image_data, labels, test_size=0.2, random_state=42)  # 80% for training
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)  # 50% of the remaining, 20% for validation and test

# Verify the shapes of the resulting splits
print(f"Training data shape: {X_train.shape}, Training labels shape: {y_train.shape}")
print(f"Validation data shape: {X_val.shape}, Validation labels shape: {y_val.shape}")
print(f"Test data shape: {X_test.shape}, Test labels shape: {y_test.shape}")

#Converting to tensors

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)  # Labels as long type for classification

X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Create DataLoader for batch processing
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Verify DataLoader
print(f"Training loader size: {len(train_loader)} batches")
print(f"Validation loader size: {len(val_loader)} batches")
print(f"Test loader size: {len(test_loader)} batches")

#Printing shapes and lables of sample images

In [None]:
# Inspect some samples from each dataset
print(f"First 5 training samples:")
for i in range(5):
    print(f"Image {i+1} shape: {X_train[i].shape}, Label: {y_train[i]}")

print(f"First 5 validation samples:")
for i in range(5):
    print(f"Image {i+1} shape: {X_val[i].shape}, Label: {y_val[i]}")

print(f"First 5 test samples:")
for i in range(5):
    print(f"Image {i+1} shape: {X_test[i].shape}, Label: {y_test[i]}")

# Saving tensors as .pt files

In [None]:
import torch
torch.save(X_train_tensor, 'X_train.pt')
torch.save(y_train_tensor, 'y_train.pt')
torch.save(X_val_tensor, 'X_val.pt')
torch.save(y_val_tensor, 'y_val.pt')
torch.save(X_test_tensor, 'X_test.pt')
torch.save(y_test_tensor, 'y_test.pt')

# To load the data back later
X_train_tensor = torch.load('X_train.pt')
y_train_tensor = torch.load('y_train.pt')
torch.save(X_train_tensor, '/content/drive/MyDrive/Tumor Progression ML Datasets/X_train.pt')
torch.save(y_train_tensor, '/content/drive/MyDrive/Tumor Progression ML Datasets/y_train.pt')

In [None]:
torch.save(X_val_tensor, '/content/drive/MyDrive/Tumor Progression ML Datasets/X_val.pt')
torch.save(y_val_tensor, '/content/drive/MyDrive/Tumor Progression ML Datasets/y_val.pt')
torch.save(X_test_tensor, '/content/drive/MyDrive/Tumor Progression ML Datasets/X_test.pt')
torch.save(y_test_tensor, '/content/drive/MyDrive/Tumor Progression ML Datasets/y_test.pt')

# To load the data back later with safer loading
X_train_tensor = torch.load('/content/drive/MyDrive/Tumor Progression ML Datasets/X_train.pt', weights_only=True)
y_train_tensor = torch.load('/content/drive/MyDrive/Tumor Progression ML Datasets/y_train.pt', weights_only=True)

In [None]:
# import os
# import zipfile

In [None]:
# # ⬇️ Load zip from Google Drive
# zip_path = "/content/drive/MyDrive/678_Team4_Dataset/X_train.pt.zip"
# extract_path = "/content"

# if zipfile.is_zipfile(zip_path):
#     with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#         zip_ref.extractall(extract_path)
#         print("✅ Extracted patch dataset")

In [None]:
# # ⬇️ Load zip from Google Drive
# zip_path = "/content/drive/MyDrive/678_Team4_Dataset/X_test.pt.zip"
# extract_path = "/content"

# if zipfile.is_zipfile(zip_path):
#     with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#         zip_ref.extractall(extract_path)
#         print("✅ Extracted patch dataset")

In [None]:
# # ⬇️ Load zip from Google Drive
# zip_path = "/content/drive/MyDrive/678_Team4_Dataset/X_val.pt.zip"
# extract_path = "/content"

# if zipfile.is_zipfile(zip_path):
#     with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#         zip_ref.extractall(extract_path)
#         print("✅ Extracted patch dataset")

In [None]:
# import shutil

# src_path = '/content/drive/MyDrive/678_Team4_Dataset/y_train.pt'
# dst_path = '/content'

# shutil.copy(src_path, dst_path)
# print("✅ File copied successfully.")

In [None]:
# import shutil

# src_path = '/content/drive/MyDrive/678_Team4_Dataset/y_test.pt'
# dst_path = '/content'

# shutil.copy(src_path, dst_path)
# print("✅ File copied successfully.")

In [None]:
# import shutil

# src_path = '/content/drive/MyDrive/678_Team4_Dataset/y_val.pt'
# dst_path = '/content'

# shutil.copy(src_path, dst_path)
# print("✅ File copied successfully.")

In [None]:
!pip install torch torchvision pytorch-lightning tqdm

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

### Loading and Preprocessing Image Tensors

In [None]:
import torch

# Paths to the .pt files for train, validation, and test sets
X_train_path = '/content/X_train.pt'
y_train_path = '/content/y_train.pt'

# Load the tensors from Google Drive
X_train_tensor = torch.load(X_train_path)
y_train_tensor = torch.load(y_train_path)


def add_channel_dimension(tensor):
    if len(tensor.shape) == 3:
        tensor = tensor.unsqueeze(1)  # Add channel dimension (1 channel, grayscale)
    if tensor.shape[1] == 1:  # If it has only 1 channel
        tensor = tensor.repeat(1, 3, 1, 1)  # Repeat the grayscale image across 3 channels
    return tensor

# Add channel dimensions if necessary
X_train_tensor = add_channel_dimension(X_train_tensor)

# Verify the shape of the image tensors
print(f"Training images shape: {X_train_tensor.shape}")

In [None]:
X_val_path = '/content/X_val.pt'
y_val_path = '/content/y_val.pt'

X_val_tensor = torch.load(X_val_path)
y_val_tensor = torch.load(y_val_path)

def add_channel_dimension(tensor):
    if len(tensor.shape) == 3:
        tensor = tensor.unsqueeze(1)  # Add channel dimension (1 channel, grayscale)

    if tensor.shape[1] == 1:  # If it has only 1 channel
        tensor = tensor.repeat(1, 3, 1, 1)  # Repeat the grayscale image across 3 channels
    return tensor

X_val_tensor = add_channel_dimension(X_val_tensor)
print(f"Validation images shape: {X_val_tensor.shape}")

In [None]:
X_test_path = '/content/X_test.pt'
y_test_path = '/content/y_test.pt'

X_test_tensor = torch.load(X_test_path)
y_test_tensor = torch.load(y_test_path)

def add_channel_dimension(tensor):
    if len(tensor.shape) == 3:
        tensor = tensor.unsqueeze(1)  # Add channel dimension (1 channel, grayscale)
    if tensor.shape[1] == 1:  # If it has only 1 channel
        tensor = tensor.repeat(1, 3, 1, 1)  # Repeat the grayscale image across 3 channels
    return tensor

X_test_tensor = add_channel_dimension(X_test_tensor)
print(f"Test images shape: {X_test_tensor.shape}")

#Loading data

In [None]:
X_train = torch.load("/content/X_train.pt", map_location="cpu")
y_train = torch.load("/content/y_train.pt", map_location="cpu")
X_val = torch.load("/content/X_val.pt", map_location="cpu")
y_val = torch.load("/content/y_val.pt", map_location="cpu")
X_test = torch.load("/content/X_test.pt", map_location="cpu")
y_test = torch.load("/content/y_test.pt", map_location="cpu")

### Customing Tumor Dataset and DataLoader for Efficient Training


In [None]:
from torch.utils.data import Dataset, DataLoader

class TumorDataset(Dataset):
    def __init__(self, image_tensor, label_tensor):
        self.images = image_tensor
        self.labels = label_tensor

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        if img.ndim == 2:
            img = img.unsqueeze(0)  # (H, W) → (1, H, W)
        if img.size(0) == 1:
            img = img.repeat(3, 1, 1)  # (1, H, W) → (3, H, W)
        return img, self.labels[idx]

# Create a DataLoader for the training dataset
train_dataset = TumorDataset(image_tensor=X_train, label_tensor=y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=12, pin_memory=True)

# Create a DataLoader for the validation dataset
val_dataset = TumorDataset(image_tensor=X_val, label_tensor=y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=12, pin_memory=True)

# To verify, print the first batch from the DataLoader
for images, labels in train_loader:
    print(f"First batch - Images shape: {images.shape}, Labels shape: {labels.shape}")
    break

#UNet Architecture

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=3, init_features=64):
        super(UNet, self).__init__()
        features = init_features

        # Encoder
        self.enc1 = UNet._block(in_channels, features, name="enc1")
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.enc2 = UNet._block(features, features * 2, name="enc2")
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.enc3 = UNet._block(features * 2, features * 4, name="enc3")
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.enc4 = UNet._block(features * 4, features * 8, name="enc4")
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Bottleneck
        self.bottleneck = UNet._block(features * 8, features * 16, name="bottleneck")

        # Decoder
        self.upconv4 = nn.ConvTranspose2d(features * 16, features * 8, kernel_size=2, stride=2)
        self.dec4 = UNet._block(features * 16, features * 8, name="dec4")

        self.upconv3 = nn.ConvTranspose2d(features * 8, features * 4, kernel_size=2, stride=2)
        self.dec3 = UNet._block(features * 8, features * 4, name="dec3")

        self.upconv2 = nn.ConvTranspose2d(features * 4, features * 2, kernel_size=2, stride=2)
        self.dec2 = UNet._block(features * 4, features * 2, name="dec2")

        self.upconv1 = nn.ConvTranspose2d(features * 2, features, kernel_size=2, stride=2)
        self.dec1 = UNet._block(features * 2, features, name="dec1")

        self.final_conv = nn.Conv2d(features, out_channels, kernel_size=1)

    def forward(self, x):
        # Encoder
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool1(e1))
        e3 = self.enc3(self.pool2(e2))
        e4 = self.enc4(self.pool3(e3))

        # Bottleneck
        b = self.bottleneck(self.pool4(e4))

        # Decoder with upsampling and skip connections
        d4 = self.upconv4(b)
        d4 = F.interpolate(d4, size=e4.shape[2:], mode='bilinear', align_corners=False)
        d4 = torch.cat((d4, e4), dim=1)
        d4 = self.dec4(d4)

        d3 = self.upconv3(d4)
        d3 = F.interpolate(d3, size=e3.shape[2:], mode='bilinear', align_corners=False)
        d3 = torch.cat((d3, e3), dim=1)
        d3 = self.dec3(d3)

        d2 = self.upconv2(d3)
        d2 = F.interpolate(d2, size=e2.shape[2:], mode='bilinear', align_corners=False)
        d2 = torch.cat((d2, e2), dim=1)
        d2 = self.dec2(d2)

        d1 = self.upconv1(d2)
        d1 = F.interpolate(d1, size=e1.shape[2:], mode='bilinear', align_corners=False)
        d1 = torch.cat((d1, e1), dim=1)
        d1 = self.dec1(d1)

        return self.final_conv(d1)

    @staticmethod
    def _block(in_channels, out_channels, name):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

#Importing necessary libraries

In [None]:
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import os
import matplotlib.pyplot as plt
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, LearningRateMonitor
from torch.optim.lr_scheduler import CosineAnnealingLR

# DDPM Model for Tumor Progression Prediction


In [None]:
class DDPM(pl.LightningModule):
    def __init__(self, num_steps=1000, lr=1e-4, T_max=100):
        super(DDPM, self).__init__()
        self.num_steps = num_steps
        self.betas = torch.linspace(1e-4, 0.02, num_steps)
        self.alphas = 1.0 - self.betas
        self.alpha_cumprod = torch.cumprod(self.alphas, axis=0)

        # Register buffers for efficient access
        self.register_buffer('sqrt_alpha_cumprod', torch.sqrt(self.alpha_cumprod))
        self.register_buffer('sqrt_one_minus_alpha_cumprod', torch.sqrt(1.0 - self.alpha_cumprod))

        self.unet = UNet(in_channels=3, out_channels=3, init_features=64)
        self.lr = lr
        self.T_max = T_max

        self.log_dir = log_dir
        os.makedirs(self.log_dir, exist_ok=True)
        self.log_file = os.path.join(self.log_dir, 'loss_log.txt')
        self.train_losses, self.val_losses = [], []

    def forward(self, x, t=None, y=None):
        return self.unet(x)

    def forward_process(self, x_0, y):
        """
        Apply the forward diffusion process to the input image.
        Returns:
        - noisy_images: The noisy image at a random timestep
        - noise: The noise that was added
        - t: The random timestep
        """
        batch_size = x_0.size(0)
        device = x_0.device

        # Sample random timesteps for each image in the batch
        t = torch.randint(0, self.num_steps, (batch_size,), device=device).long()

        # Sample noise
        noise = torch.randn_like(x_0)

        # Get the alpha values for the sampled timesteps
        sqrt_alpha = self.sqrt_alpha_cumprod[t].view(-1, 1, 1, 1)
        sqrt_one_minus_alpha = self.sqrt_one_minus_alpha_cumprod[t].view(-1, 1, 1, 1)

        # Add noise to the images
        noisy_images = sqrt_alpha * x_0 + sqrt_one_minus_alpha * noise

        return noisy_images, noise, t

    def reverse_process(self, x_t, t, y):
        """Apply the reverse process using the UNet"""
        return self.unet(x_t)

    def training_step(self, batch, batch_idx):
        x_0, y = batch
        noisy_x, noise, t = self.forward_process(x_0, y)
        pred_noise = self.reverse_process(noisy_x, t, y)
        loss = F.mse_loss(pred_noise, noise)
        self.log('train_loss', loss, prog_bar=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x_0, y = batch
        noisy_x, noise, t = self.forward_process(x_0, y)
        pred_noise = self.reverse_process(noisy_x, t, y)
        loss = F.mse_loss(pred_noise, noise)
        self.log('val_loss', loss, prog_bar=True, on_epoch=True)
        return loss

    def test_step(self, batch, batch_idx):
        x_0, y = batch
        noisy_x, noise, t = self.forward_process(x_0, y)
        pred_noise = self.reverse_process(noisy_x, t, y)
        loss = F.mse_loss(pred_noise, noise)
        self.log('test_loss', loss, prog_bar=True, on_epoch=True)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        scheduler = CosineAnnealingLR(optimizer, T_max=self.T_max)
        return [optimizer], [scheduler]

    def on_train_start(self):
        # Clear the log file
        open(self.log_file, 'w').close()

    def on_train_epoch_end(self):
        avg = self.trainer.callback_metrics['train_loss_epoch'].item()
        self.train_losses.append(avg)
        with open(self.log_file, 'a') as f:
            f.write(f" Epoch {self.current_epoch + 1}: train_loss = {avg:.6f}\n")

    def on_validation_epoch_end(self):
        metrics = self.trainer.callback_metrics
        avg = metrics.get('val_loss_epoch', metrics.get('val_loss', torch.tensor(0.0))).item()
        self.val_losses.append(avg)
        with open(self.log_file, 'a') as f:
            f.write(f" Epoch {self.current_epoch + 1}: val_loss = {avg:.6f}\n")

    def on_train_end(self):
        plt.figure(figsize=(8, 6))
        plt.plot(range(1, len(self.train_losses)+1), self.train_losses, label='Train', marker='o')
        plt.plot(range(1, len(self.val_losses)+1), self.val_losses, label='Val', marker='s')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Loss Convergence')
        plt.legend()
        plt.grid(True)
        plt.savefig(os.path.join(self.log_dir, 'loss_curve.png'))
        plt.close()

In [None]:
# del ddpm_model
# torch.cuda.empty_cache()

# Initialize the model and trainer

In [None]:
ddpm_model = DDPM(num_steps=1000, lr=1e-4, T_max=100)

# Early stopping callback
early_stop_callback = EarlyStopping(
    monitor='val_loss',   # Monitor validation loss
    patience=10,           # Stop after 5 epochs without improvement
    verbose=True,
    mode='min'            # Minimize the validation loss
)

# Define ModelCheckpoint callback to save model checkpoints during training
checkpoint_callback = pl.callbacks.ModelCheckpoint(
    monitor='val_loss',
    dirpath='/content/drive/MyDrive/678_Team4_Outputs_new',  # Folder in Google Drive
    filename='ddpm-{epoch:02d}-{val_loss:.5f}',
    save_top_k=5,
    mode='min',
    save_weights_only=True
)

# Learning Rate Monitor callback (to log learning rate at each step)
lr_monitor = LearningRateMonitor(logging_interval='step')

# Initialize the trainer
trainer = pl.Trainer(
    max_epochs=50,
    precision="16-mixed",
    devices=1 if torch.cuda.is_available() else 0,
    accelerator="cuda" if torch.cuda.is_available() else "cpu",
    callbacks=[checkpoint_callback, early_stop_callback, lr_monitor]
)

# Train the model
trainer.fit(ddpm_model, train_loader, val_loader)
trainer.validate(ddpm_model, val_loader)

In [None]:
# Create a DataLoader for the test dataset
test_dataset = TumorDataset(image_tensor=X_test, label_tensor=y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=12, pin_memory=True)

# Model Evaluation Using PyTorch Lightning Trainer


In [None]:
from pytorch_lightning import Trainer

checkpoint_path = '/content/drive/MyDrive/678_Team4_Outputs_new/ddpm-epoch=29-val_loss=0.05851.ckpt'

# Load the saved checkpoint
ddpm_model = DDPM.load_from_checkpoint(checkpoint_path)

# Initialize trainer for evaluation
trainer = Trainer()

# Evaluate on validation data
trainer.validate(ddpm_model, dataloaders=val_loader)

# Evaluate on test data (optional, after validation)
trainer.test(ddpm_model, dataloaders=test_loader)



# Visualization and Saving of Original, Noisy, and Denoised Images


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os

# Create a directory to store the images if it doesn't exist
output_folder = '/content/drive/MyDrive/678_Team4_Images/Images_final/'

# Make sure the output folder exists
os.makedirs(output_folder, exist_ok=True)

# Get a batch of test images and labels
images, labels = next(iter(test_loader))

# Get noisy images using the forward process (noising)
noisy_images, _ = ddpm_model.forward_process(images)

# Get the model's predictions (denoised images)
predictions = ddpm_model(images)

# Normalize the images to the range [0, 1] for proper visualization
def normalize_image(image):
    # Clip the values between 0 and 1 for safe visualization
    image = np.clip(image, 0, 1)
    return image

batch_size = images.shape[0]

# Save the original images, noisy images, and denoised predictions
for i in range(batch_size):  # Show and save 30 images
    # Plot original images
    img = images[i].cpu().numpy().transpose(1, 2, 0)  # Convert from CHW to HWC
    img = normalize_image(img)  # Normalize image for proper display
    plt.imshow(img)
    plt.title(f"True Label: {labels[i]}")
    plt.axis('off')
    # Save the image
    plt.savefig(f"{output_folder}original_{i+1}_label_{labels[i]}.png")
    plt.clf()  # Clear the figure for the next plot

    # Plot noisy images
    noisy_img = noisy_images[i].cpu().numpy().transpose(1, 2, 0)
    noisy_img = normalize_image(noisy_img)  # Normalize noisy image
    plt.imshow(noisy_img)
    plt.title("Noisy Image")
    plt.axis('off')
    # Save the noisy image
    plt.savefig(f"{output_folder}noisy_{i+1}.png")
    plt.clf()  # Clear the figure for the next plot

    # Plot denoised (predicted) images
    pred_img = predictions[i].cpu().detach().numpy().transpose(1, 2, 0)
    pred_img = normalize_image(pred_img)  # Normalize predicted image
    plt.imshow(pred_img)
    plt.title(f"Pred: {labels[i]}")
    plt.axis('off')
    # Save the predicted image
    plt.savefig(f"{output_folder}predicted_{i+1}_label_{labels[i]}.png")
    plt.clf()  # Clear the figure for the next plot

print(f"Images saved to: {output_folder}")


# Enhanced Image Saving Function for Model Output Visualization


In [None]:
# Enhanced saving function
def save_samples(model, dataloader, device, output_folder, max_samples=100):
    os.makedirs(output_folder, exist_ok=True)
    saved_count = 0

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(dataloader):
            if saved_count >= max_samples:
                break

            images = images.to(device)
            labels = labels.to(device)

            # Process batch
            t = torch.zeros(images.size(0), dtype=torch.long, device=device)
            noisy_images, _, _ = model.forward_process(images, labels)
            predictions = model(images)

            # Save each image in batch
            for i in range(images.size(0)):
                if saved_count >= max_samples:
                    break

                plt.figure(figsize=(12, 4))

                # Original
                plt.subplot(1, 3, 1)
                plt.imshow(images[i].cpu().permute(1, 2, 0).clip(0, 1))
                plt.title(f"Original (Label: {labels[i].item()})")
                plt.axis('off')

                # Noisy
                plt.subplot(1, 3, 2)
                plt.imshow(noisy_images[i].cpu().permute(1, 2, 0).clip(0, 1))
                plt.title("Noisy")
                plt.axis('off')

                # Denoised
                plt.subplot(1, 3, 3)
                plt.imshow(predictions[i].cpu().permute(1, 2, 0).clip(0, 1))
                plt.title("Denoised")
                plt.axis('off')

                plt.tight_layout()
                plt.savefig(f"{output_folder}sample_{saved_count+1}.png", bbox_inches='tight', dpi=100)
                plt.close()

                saved_count += 1

    print(f"Saved {saved_count} samples to {output_folder}")

#Saving in a folder

In [None]:
# Usage - save first 100 samples
save_samples(ddpm_model, test_loader, device, "/content/drive/MyDrive/678_Team4_Images/Images_final2/", max_samples=100)

# Denoising Metrics

In [None]:
import torch
from tqdm import tqdm

def evaluate_denoising(model, dataloader, device, max_batches=None):
    """
    Denoising evaluation (MSE + PSNR only)
    """
    model.eval()
    metrics = {'MSE': 0.0, 'PSNR': 0.0}
    count = 0

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(tqdm(dataloader, desc="Evaluating")):
            if max_batches is not None and batch_idx >= max_batches:
                break

            images = images.to(device)
            labels = labels.to(device)
            B = images.size(0)

            # Generate noise + timestep
            t = torch.randint(0, model.num_steps, (B,), device=device)
            noise = torch.randn_like(images)

            # Forward (diffusion) process
            sqrt_alpha = model.alpha_cumprod[t].sqrt().view(-1, 1, 1, 1)
            sqrt_1m_alpha = (1 - model.alpha_cumprod[t]).sqrt().view(-1, 1, 1, 1)
            noisy = sqrt_alpha * images + sqrt_1m_alpha * noise

            # Reverse (denoising) process
            pred_noise = model.reverse_process(noisy, t, labels)
            denoised = (noisy - pred_noise * sqrt_1m_alpha) / (sqrt_alpha + 1e-8)
            denoised = denoised.clamp(0, 1)

            # MSE + PSNR on GPU
            mse = torch.mean((images - denoised) ** 2, dim=(1, 2, 3))
            psnr = 10 * torch.log10(1.0 / (mse + 1e-10))

            metrics['MSE'] += mse.sum().item()
            metrics['PSNR'] += psnr.sum().item()
            count += B

    return {k: v / count for k, v in metrics.items()}

In [None]:
metrics = evaluate_denoising(ddpm_model, test_loader, device, max_batches=20)
print(f"Denoising Metrics: {metrics}")
print(f"Denoising Metrics:\nMSE: {metrics['MSE']:.4f}, PSNR: {metrics['PSNR']:.2f} dB")

In [None]:
text = (
    f"Denoising Metrics: {metrics}\n"
    f"MSE: {metrics['MSE']:.4f}\n"
    f"PSNR: {metrics['PSNR']:.2f} dB\n"
)

# Save to file
output_path = "/content/drive/MyDrive/678_Team4_Images/denoising_metrics.txt"
with open(output_path, "w") as f:
    f.write(text)

print(f"✅ Metrics saved to: {output_path}")

#Evaluation of Classification Accuracy Using DDPM Model

In [None]:
def evaluate_classification(model, dataloader, device, num_samples=2):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)

            preds = []
            t = torch.randint(0, model.num_steps, (images.size(0),), device=device)
            sqrt_alpha = model.sqrt_alpha_cumprod[t].view(-1, 1, 1, 1)
            sqrt_one_minus_alpha = model.sqrt_one_minus_alpha_cumprod[t].view(-1, 1, 1, 1)

            for _ in range(num_samples):
                noise = torch.randn_like(images)
                noisy_images = sqrt_alpha * images + sqrt_one_minus_alpha * noise
                pred_noise = model.reverse_process(noisy_images, t, labels)
                preds.append(pred_noise)

            avg_pred = torch.mean(torch.stack(preds), dim=0)
            predicted_labels = (avg_pred.mean(dim=[1, 2, 3]) > 0.5).long()
            correct += (predicted_labels == labels).sum().item()
            total += labels.size(0)

    return correct / total

In [None]:
accuracy = evaluate_classification(ddpm_model, test_loader, device)
print(f"Classification Accuracy: {accuracy:.2%}")

In [None]:
text = (
    f"Classification Accuracy: {accuracy}\n"
)

# Save to file
output_path = "/content/drive/MyDrive/678_Team4_Images/classification_accuracy.txt"
with open(output_path, "w") as f:
    f.write(text)

print(f"✅ Metrics saved to: {output_path}")

# Evaluation of Denoised Images Using PSNR and SSIM Metrics

In [None]:
import torch
import numpy as np
from skimage.metrics import peak_signal_noise_ratio, structural_similarity
from tqdm import tqdm

# Ensure your model is in eval mode and moved to the correct device
ddpm_model.eval()
ddpm_model.to(device)

# Lists to store PSNR and SSIM scores
psnr_scores = []
ssim_scores = []

# Turn off gradients for evaluation
with torch.no_grad():
    for batch in tqdm(test_loader, desc="Evaluating"):
        images, _ = batch  # Ignore labels if they are not used
        images = images.to(device)

        # Denoised output from DDPM model
        predictions = ddpm_model(images)  # Expected shape: [B, 3, H, W]

        # Process each image in the batch individually
        for i in range(images.size(0)):
            # Ground truth image
            gt = images[i].cpu().permute(1, 2, 0).numpy()  # CHW → HWC
            # Denoised image
            pred = predictions[i].cpu().permute(1, 2, 0).numpy()

            # Normalize to [0, 1] for comparison
            gt = np.clip(gt, 0, 1)
            pred = np.clip(pred, 0, 1)

            # PSNR
            psnr = peak_signal_noise_ratio(gt, pred, data_range=1.0)
            psnr_scores.append(psnr)

            # SSIM
            ssim = structural_similarity(gt, pred, channel_axis=-1, data_range=1.0)
            ssim_scores.append(ssim)

# Calculate and print average scores
avg_psnr = np.mean(psnr_scores)
avg_ssim = np.mean(ssim_scores)

print(f"\n✅ Average PSNR: {avg_psnr:.2f}")
print(f"✅ Average SSIM: {avg_ssim:.4f}")