In [2]:
import os
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from PIL import Image
from monai.transforms import LoadImage, EnsureChannelFirst, ScaleIntensity, EnsureType, Compose, Resize
from monai.data import Dataset, DataLoader
from monai.metrics import DiceMetric
from sklearn.model_selection import train_test_split


In [5]:
# Define paths
images_dir = r"D:\PROJECTS_FINAL\Cancer Treatment Prediction\final stuff\Dataset_BUSI_with_GT\train\images"
masks_dir = r"D:\PROJECTS_FINAL\Cancer Treatment Prediction\final stuff\Dataset_BUSI_with_GT\train\masks"
output_dir = os.path.join(base_dir, "output_images")
os.makedirs(output_dir, exist_ok=True)

# Load data
image_files = sorted([os.path.join(images_dir, file) for file in os.listdir(images_dir) if file.endswith(".png")])
mask_files = sorted([os.path.join(masks_dir, file) for file in os.listdir(masks_dir) if file.endswith(".png")])

# Train-test split
train_images, test_images, train_masks, test_masks = train_test_split(image_files, mask_files, test_size=0.2, random_state=42)

# Define transforms
train_transforms = Compose([
    LoadImage(image_only=True),
    EnsureChannelFirst(),
    ScaleIntensity(),
    Resize((128, 128)),
    EnsureType()
])

test_transforms = Compose([
    LoadImage(image_only=True),
    EnsureChannelFirst(),
    ScaleIntensity(),
    Resize((128, 128)),
    EnsureType()
])

# Create datasets
train_dataset = Dataset(data=[{"image": img, "label": msk} for img, msk in zip(train_images, train_masks)], transform=train_transforms)
test_dataset = Dataset(data=[{"image": img, "label": msk} for img, msk in zip(test_images, test_masks)], transform=test_transforms)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4)


FileNotFoundError: [WinError 3] The system cannot find the path specified: 'D:\\PROJECTS_FINAL\\Cancer Treatment Prediction\\final stuff\\Dataset_BUSI_with_GT\\images'

In [None]:
class AttentionBlock(nn.Module):
    def __init__(self, F_g, F_l, F_int):
        super(AttentionBlock, self).__init__()
        self.W_g = nn.Sequential(
            nn.Conv2d(F_g, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.W_x = nn.Sequential(
            nn.Conv2d(F_l, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.psi = nn.Sequential(
            nn.Conv2d(F_int, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, g, x):
        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        return x * psi

class AttentionUNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(AttentionUNet, self).__init__()
        self.encoder1 = nn.Sequential(nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
                                       nn.ReLU(inplace=True))
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.encoder2 = nn.Sequential(nn.Conv2d(64, 128, kernel_size=3, padding=1),
                                       nn.ReLU(inplace=True))
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.encoder3 = nn.Sequential(nn.Conv2d(128, 256, kernel_size=3, padding=1),
                                       nn.ReLU(inplace=True))
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.center = nn.Sequential(nn.Conv2d(256, 512, kernel_size=3, padding=1),
                                     nn.ReLU(inplace=True))

        self.att3 = AttentionBlock(F_g=256, F_l=128, F_int=64)
        self.up3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)

        self.decoder3 = nn.Sequential(nn.Conv2d(256, 128, kernel_size=3, padding=1),
                                       nn.ReLU(inplace=True))

        self.final = nn.Conv2d(128, out_channels, kernel_size=1)

    def forward(self, x):
        e1 = self.encoder1(x)
        p1 = self.pool1(e1)

        e2 = self.encoder2(p1)
        p2 = self.pool2(e2)

        e3 = self.encoder3(p2)
        p3 = self.pool3(e3)

        c = self.center(p3)

        d3 = self.up3(c)
        d3 = self.decoder3(d3)

        return self.final(d3)


In [None]:
# Initialize model, loss, optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AttentionUNet(in_channels=1, out_channels=1).to(device)
loss_function = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
dice_metric = DiceMetric(include_background=False, reduction="mean")

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    model.train()
    epoch_loss = 0

    for batch in tqdm(train_loader, desc="Training"):
        inputs, labels = batch["image"].to(device), batch["label"].to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    print(f"Epoch {epoch + 1} Loss: {epoch_loss / len(train_loader)}")


In [None]:
model.eval()
for test_batch in tqdm(test_loader, desc="Inferencing"):
    images, labels = test_batch["image"].to(device), test_batch["label"].to(device)
    with torch.no_grad():
        preds = torch.sigmoid(model(images))
        preds = (preds > 0.5).float()

    for idx in range(images.size(0)):
        img = images[idx, 0].cpu().numpy()
        mask = preds[idx, 0].cpu().numpy()

        overlay = np.stack([img, img, img], axis=-1)
        overlay[mask > 0] = [255, 0, 0]  # Mark tumor in red

        plt.imshow(overlay)
        plt.show()
