<a href="https://colab.research.google.com/github/Abhi-213/Deep-learning/blob/main/ATTENTION_UNET_MODEL_TRAINING.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!pip install wget



Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=6f0dbe40f54d91fcb49ee3f4953c8e756608d55bd1abd8b198233eee517afb39
  Stored in directory: /root/.cache/pip/wheels/8b/f1/7f/5c94f0a7a505ca1c81cd1d9208ae2064675d97582078e6c769
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2


In [9]:
import wget


url = "https://dicom5c.blob.core.windows.net/public/Data.zip"

dataset_filename = wget.download(url)

In [10]:
!unzip /content/Data.zip


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: Data/TCGA_DU_5855_19951217/TCGA_DU_5855_19951217_1_mask.tif  
  inflating: __MACOSX/Data/TCGA_DU_5855_19951217/._TCGA_DU_5855_19951217_1_mask.tif  
  inflating: Data/TCGA_DU_5855_19951217/TCGA_DU_5855_19951217_17_mask.tif  
  inflating: __MACOSX/Data/TCGA_DU_5855_19951217/._TCGA_DU_5855_19951217_17_mask.tif  
  inflating: Data/TCGA_DU_5855_19951217/TCGA_DU_5855_19951217_16_mask.tif  
  inflating: __MACOSX/Data/TCGA_DU_5855_19951217/._TCGA_DU_5855_19951217_16_mask.tif  
  inflating: Data/TCGA_DU_5855_19951217/TCGA_DU_5855_19951217_24_mask.tif  
  inflating: __MACOSX/Data/TCGA_DU_5855_19951217/._TCGA_DU_5855_19951217_24_mask.tif  
  inflating: Data/TCGA_DU_5855_19951217/TCGA_DU_5855_19951217_25_mask.tif  
  inflating: __MACOSX/Data/TCGA_DU_5855_19951217/._TCGA_DU_5855_19951217_25_mask.tif  
  inflating: Data/TCGA_DU_5855_19951217/TCGA_DU_5855_19951217_11.tif  
  inflating: __MACOSX/Data/TCGA_DU_5855_19951217/._

In [11]:
!pip install albumentations



In [12]:
import os
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Define the CLAHE preprocessing function
def apply_clahe(image):
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    return clahe.apply(image)

# Custom dataset class for Brain MRI images and masks
class BrainMRIDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load and preprocess the image and mask
        image = cv2.imread(self.image_paths[idx], cv2.IMREAD_GRAYSCALE)
        mask = cv2.imread(self.mask_paths[idx], cv2.IMREAD_GRAYSCALE)

        # Apply CLAHE to the image
        image = apply_clahe(image)

        # Resize the image and mask to (256, 256) to match U-Net input
        #image = cv2.resize(image, (256, 256))
        #mask = cv2.resize(mask, (256, 256))

        # Normalize the image to [0, 1]
        image = image / 255.0

        # Binarize the mask
        mask = np.where(mask > 0, 1, 0).astype(np.float32)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        image = image.clone().detach().float()  # Convert image to tensor and add channel dimension
        mask = mask.unsqueeze(0)  # Convert mask to tensor and add channel dimension
        return image, mask

# Function to load the images and masks
def load_data(image_dir):
    images = []
    masks = []

    for folder in os.listdir(image_dir):
        folder_path = os.path.join(image_dir, folder)
        if os.path.isdir(folder_path):
            for file in os.listdir(folder_path):
                file_path = os.path.join(folder_path, file)
                if 'mask' in file:
                    masks.append(file_path)
                else:
                    images.append(file_path)
    return images, masks

# Define transformations (augmentation + tensor conversion)
train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Rotate(limit=20, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.GaussianBlur(p=0.2),
    ToTensorV2()
])
val_transform = A.Compose([
    ToTensorV2()
])

# Load images and masks
image_dir = '/content/Data'  # Change this to your actual image directory
images, masks = load_data(image_dir)

# Split dataset into train and validation
train_images, val_images, train_masks, val_masks = train_test_split(images, masks, test_size=0.2, random_state=42)

train_dataset = BrainMRIDataset(train_images, train_masks, transform=train_transform)
val_dataset = BrainMRIDataset(val_images, val_masks,transform=val_transform)

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

dataloaders = {'train': train_loader, 'val': val_loader}

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define a convolution block with two convolution layers
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        return x

# Define the attention block
class AttentionBlock(nn.Module):
    def __init__(self, g_channels, x_channels, intermediate_channels):
        super(AttentionBlock, self).__init__()
        self.W_g = nn.Conv2d(g_channels, intermediate_channels, kernel_size=1)
        self.W_x = nn.Conv2d(x_channels, intermediate_channels, kernel_size=1)
        self.psi = nn.Conv2d(intermediate_channels, 1, kernel_size=1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, g, x):
        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.sigmoid(self.psi(psi))
        return x * psi

# Define the Attention U-Net model
class AttentionUNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(AttentionUNet, self).__init__()

        # Downsampling path
        self.enc1 = ConvBlock(in_channels, 64)
        self.enc2 = ConvBlock(64, 128)
        self.enc3 = ConvBlock(128, 256)
        self.enc4 = ConvBlock(256, 512)

        # Bottleneck
        self.bottleneck = ConvBlock(512, 1024)

        # Upsampling path
        self.upconv4 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.att4 = AttentionBlock(512, 512, 256)
        self.dec4 = ConvBlock(1024, 512)

        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.att3 = AttentionBlock(256, 256, 128)
        self.dec3 = ConvBlock(512, 256)

        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.att2 = AttentionBlock(128, 128, 64)
        self.dec2 = ConvBlock(256, 128)

        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.att1 = AttentionBlock(64, 64, 32)
        self.dec1 = ConvBlock(128, 64)

        # Output layer for segmentation
        self.conv_out = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        # Encoder path
        enc1 = self.enc1(x)
        enc2 = self.enc2(F.max_pool2d(enc1, 2))
        enc3 = self.enc3(F.max_pool2d(enc2, 2))
        enc4 = self.enc4(F.max_pool2d(enc3, 2))

        # Bottleneck
        bottleneck = self.bottleneck(F.max_pool2d(enc4, 2))

        # Decoder path
        up4 = self.upconv4(bottleneck)
        att4 = self.att4(up4, enc4)
        dec4 = self.dec4(torch.cat([up4, att4], dim=1))

        up3 = self.upconv3(dec4)
        att3 = self.att3(up3, enc3)
        dec3 = self.dec3(torch.cat([up3, att3], dim=1))

        up2 = self.upconv2(dec3)
        att2 = self.att2(up2, enc2)
        dec2 = self.dec2(torch.cat([up2, att2], dim=1))

        up1 = self.upconv1(dec2)
        att1 = self.att1(up1, enc1)
        dec1 = self.dec1(torch.cat([up1, att1], dim=1))

        # Output layer for segmentation
        out = self.conv_out(dec1)
        return out



In [19]:
import torch.optim as optim
from tqdm import tqdm

# Dice coefficient metric
def dice_coef(y_true, y_pred, smooth=1):
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    intersection = (y_true_f * y_pred_f).sum()
    return (2. * intersection + smooth) / (y_true_f.sum() + y_pred_f.sum() + smooth)

# Loss function: BCE + Dice Loss
def bce_dice_loss(preds, targets):
    bce = F.binary_cross_entropy_with_logits(preds, targets)
    dice = dice_coef(torch.sigmoid(preds), targets)
    return bce - torch.log(dice)

# Training function
def train_model(model, dataloaders, optimizer, num_epochs=25):
    best_dice = 0.0
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        model.train()
        running_loss = 0.0
        running_dice = 0.0

        for images, masks in tqdm(dataloaders['train']):
            images = images.to(device)
            masks = masks.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = bce_dice_loss(outputs, masks)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            running_dice += dice_coef(torch.sigmoid(outputs), masks).item() * images.size(0)

        epoch_loss = running_loss / len(dataloaders['train'].dataset)
        epoch_dice = running_dice / len(dataloaders['train'].dataset)

        print(f'Training Loss: {epoch_loss:.4f} | Training Dice: {epoch_dice:.4f}')

        # Evaluate model on validation set
        model.eval()
        val_dice = 0.0
        with torch.no_grad():
            for images, masks in dataloaders['val']:
                images = images.to(device)
                masks = masks.to(device)
                outputs = model(images)
                val_dice += dice_coef(torch.sigmoid(outputs), masks).item() * images.size(0)

        val_dice /= len(dataloaders['val'].dataset)
        print(f'Validation Dice: {val_dice:.4f}')

        if val_dice > best_dice:
            best_dice = val_dice
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Saving best model with Dice: {best_dice:.4f}')

        attention_unet_checkpoint = {
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'best_dice': best_dice
        }
        torch.save(attention_unet_checkpoint, f'/content/drive/MyDrive/ATTENTION_U_NET/attention_unet_checkpoint_epoch_{epoch+1}.pth')

    return best_dice

# Initialize DataLoader, Model, Optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Instantiate both models
attention_unet = AttentionUNet(in_channels=1, out_channels=1).to(device)

# Optimizers
optimizer_attention_unet = optim.Adam(attention_unet.parameters(), lr=1e-4)

dataloaders = {
    'train': train_loader,
    'val': val_loader
}

# best_dice_attention_unet = train_model(attention_unet, dataloaders, optimizer_attention_unet, num_epochs=5)

# print(f"Best Dice score for Attention U-Net: {best_dice_attention_unet}")


In [20]:
from tqdm import tqdm

# Evaluation function to compute metrics
def evaluate_model(model, dataloader):
    model.eval()
    dice_total = 0.0
    iou_total = 0.0
    with torch.no_grad():
        for images, masks in tqdm(dataloader):
            images = images.to(device)
            masks = masks.to(device)

            # Forward pass
            outputs = model(images)
            preds = torch.sigmoid(outputs)  # Apply sigmoid to get probabilities
            preds = (preds > 0.5).float()  # Binarize predictions at 0.5 threshold

            # Calculate Dice coefficient
            dice = dice_coef(preds, masks).item()
            dice_total += dice * images.size(0)

            # Calculate IoU
            intersection = (preds * masks).sum(dim=(2, 3))
            union = (preds + masks).sum(dim=(2, 3)) - intersection
            iou = (intersection + 1e-6) / (union + 1e-6)
            iou_total += iou.mean().item() * images.size(0)

    dice_avg = dice_total / len(dataloader.dataset)
    iou_avg = iou_total / len(dataloader.dataset)
    return dice_avg, iou_avg


In [23]:
import torch
from torch.optim import Adam

# Ensure the Attention U-Net architecture is already defined
# If not, define the AttentionUNet class here

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize the Attention U-Net model
attention_unet = AttentionUNet(in_channels=1, out_channels=1).to(device)

# Load the saved checkpoint
checkpoint = torch.load('/content/drive/MyDrive/ATTENTION_U_NET/attention_unet_checkpoint_epoch_3.pth')

# Load the model state dict
attention_unet.load_state_dict(checkpoint['model_state_dict'])

# If optimizer state is needed (not mandatory for evaluation):
optimizer = Adam(attention_unet.parameters(), lr=1e-4)
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Set the model to evaluation mode
attention_unet.eval()


  checkpoint = torch.load('/content/drive/MyDrive/ATTENTION_U_NET/attention_unet_checkpoint_epoch_3.pth')


AttentionUNet(
  (enc1): ConvBlock(
    (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu): ReLU()
  )
  (enc2): ConvBlock(
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu): ReLU()
  )
  (enc3): ConvBlock(
    (conv1): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu): ReLU()
  )
  (enc4): ConvBlock(
    (conv1): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu): ReLU()
  )
  (bottleneck): ConvBlock(
    (conv1): Conv2d(512, 1024, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(1024, 1024, kernel_size=(3, 3), stride

In [24]:
import numpy as np
from tqdm import tqdm

# Assuming you have a DataLoader for test images and masks
# Modify the `test_loader` if necessary
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Evaluation function for the test set
def evaluate_model(model, dataloader):
    model.eval()  # Set model to evaluation mode
    total_dice = 0.0

    with torch.no_grad():
        for images, masks in tqdm(dataloader):
            images = images.to(device)
            masks = masks.to(device)

            # Forward pass
            outputs = model(images)
            outputs = torch.sigmoid(outputs)  # Apply sigmoid to get probabilities
            preds = (outputs > 0.5).float()  # Threshold the probabilities to get binary predictions

            # Calculate Dice coefficient
            dice = dice_coef(preds, masks)
            total_dice += dice.item()

    # Average Dice score
    avg_dice = total_dice / len(dataloader)
    print(f'Average Dice Coefficient: {avg_dice:.4f}')
    return avg_dice

# Evaluate the loaded Attention U-Net model
evaluate_model(attention_unet, test_loader)


NameError: name 'test_dataset' is not defined