In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import pandas as pd
import numpy as np
import cv2
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from torchvision import transforms
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:

class CustomDataset(Dataset):
    def __init__(self, img_dir, mask_dir, resize=None, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.resize = resize
        self.transform = transform
        self.image_files = sorted([f for f in os.listdir(self.img_dir) if f.endswith('.png')])
        self.mask_files = sorted([f for f in os.listdir(self.mask_dir) if f.endswith('.png')])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_files[idx])

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, self.resize)

        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, self.resize)
        mask = (mask > 127).astype(np.uint8)  # binary: white -> 1, black -> 0

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask'].unsqueeze(0)  # Add channel dimension: (1, H, W)

        return image, mask



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torchvision.models.feature_extraction import create_feature_extractor

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, num_classes=1, pretrained=True):
        super(UNet, self).__init__()
        resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2 if pretrained else None)
        self.encoder = create_feature_extractor(resnet, return_nodes={
            "relu": "enc1",          # (B, 64, H, W)
            "layer1": "enc2",        # (B, 256, H/2, W/2)
            "layer2": "enc3",        # (B, 512, H/4, W/4)
            "layer3": "enc4",        # (B, 1024, H/8, W/8)
            "layer4": "bridge"       # (B, 2048, H/16, W/16)
        })
        self.decoder4 = DoubleConv(2048 + 1024, 1024)
        self.decoder3 = DoubleConv(1024 + 512, 512)
        self.decoder2 = DoubleConv(512 + 256, 256)
        self.decoder1 = DoubleConv(256 + 64, 64)
        self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1)

    def forward(self, x):
        features = self.encoder(x)

        enc1 = features["enc1"]  # (B, 64, H, W)
        enc2 = features["enc2"]  # (B, 256, H/2, W/2)
        enc3 = features["enc3"]  # (B, 512, H/4, W/4)
        enc4 = features["enc4"]  # (B, 1024, H/8, W/8)
        bridge = features["bridge"]  # (B, 2048, H/16, W/16)
        dec4 = self.decoder4(torch.cat([F.interpolate(bridge, size=enc4.shape[2:], mode="bilinear", align_corners=True), enc4], dim=1))
        dec3 = self.decoder3(torch.cat([F.interpolate(dec4, size=enc3.shape[2:], mode="bilinear", align_corners=True), enc3], dim=1))
        dec2 = self.decoder2(torch.cat([F.interpolate(dec3, size=enc2.shape[2:], mode="bilinear", align_corners=True), enc2], dim=1))
        dec1 = self.decoder1(torch.cat([F.interpolate(dec2, size=enc1.shape[2:], mode="bilinear", align_corners=True), enc1], dim=1))
        output = self.final_conv(dec1)
        output = F.interpolate(output, size=x.shape[2:], mode="bilinear", align_corners=True)

        return output

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import os
import cv2
import numpy as np
from torch.optim.lr_scheduler import ReduceLROnPlateau

class StructureLoss(nn.Module):
    def __init__(self):
        super(StructureLoss, self).__init__()
        self.bce = nn.BCEWithLogitsLoss(reduction='none')

    def forward(self, pred, mask):
        pred = F.interpolate(pred, size=mask.shape[2:], mode='bilinear', align_corners=True)

        weit = 1 + 5 * torch.abs(F.avg_pool2d(mask.float(), kernel_size=31, stride=1, padding=15) - mask.float())
        bce = self.bce(pred, mask.float())
        bce = (weit * bce).sum(dim=(2, 3)) / weit.sum(dim=(2, 3))

        pred_probs = torch.sigmoid(pred)
        inter = (pred_probs * mask).sum(dim=(2, 3))
        union = (pred_probs + mask).sum(dim=(2, 3))
        iou = 1 - (inter + 1) / (union - inter + 1)

        return (bce + iou).mean()


train_transform = A.Compose(
    [
        A.HorizontalFlip(p=0.4),
        A.VerticalFlip(p=0.4),
        A.RandomGamma(gamma_limit=(70, 130), p=0.2),
        A.RGBShift(p=0.3, r_shift_limit=10, g_shift_limit=10, b_shift_limit=10),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ],
    additional_targets={'mask': 'mask'}  # Tells Albumentations to treat mask separately
)

lr = 3e-4
batch_size = 8
epochs = 300
in_channels = 3
out_channels = 1
H, W = 480, 480

model = UNet(num_classes=out_channels)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

loss_fn = StructureLoss()
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.75, patience=5)
scaler = torch.cuda.amp.GradScaler()

train_dataset = CustomDataset(
    img_dir='/kaggle/input/datacv/TrainDataset/TrainDataset/image',
    mask_dir='/kaggle/input/datacv/TrainDataset/TrainDataset/mask',
    resize=(H, W),
    transform=train_transform,
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

best_loss = float('inf')  # Initialize best loss

for epoch in range(epochs):
    model.train()
    epoch_loss = 0.0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs}", leave=False)

    for images, masks in progress_bar:
        images = images.to(device)
        masks = masks.to(device).float()  # Ensure shape [B, 1, H, W] and float

        optimizer.zero_grad()
        outputs = model(images)
        loss = loss_fn(outputs, masks)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    epoch_loss /= len(train_loader)
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}")

    # Save model if current loss is lower than best
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        torch.save(model.state_dict(), 'Unet.pth')
        print(f"✅ Saved new best model at epoch {epoch + 1} with loss {best_loss:.4f}")

    scheduler.step(epoch_loss)


In [None]:
torch.save(model.state_dict(), "/kaggle/working/Unet_last.pth")
print("Model saved successfully!")

In [None]:
import matplotlib.pyplot as plt
device = 'cuda'
def infer(model, image_path, device, threshold=0.5):
    model.eval()

    # Load and preprocess image
    image = cv2.imread(image_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    transformed = A.Compose([
        A.Resize(480, 480),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ])(image=image_rgb)

    input_tensor = transformed['image'].unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(input_tensor)

        if isinstance(outputs, tuple):
            main_output = outputs[0]
        else:
            main_output = outputs

        # Binary output: apply sigmoid and threshold
        prob_mask = torch.sigmoid(main_output).squeeze().cpu().numpy()
        binary_mask = (prob_mask > threshold).astype(np.uint8) * 255  # foreground=255, background=0

    # Show results
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(image_rgb)
    plt.title("Original Image")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(binary_mask, cmap="gray")
    plt.title("Predicted Mask (Binary)")
    plt.axis("off")
    plt.show()
model = UNet(num_classes=1).to(device)
model.load_state_dict(torch.load("/kaggle/working/Unet.pth", map_location=device))
model.eval()
infer(model, "/kaggle/input/datacv/TrainDataset/TrainDataset/image/10.png", device)

In [None]:
import torch
from torch.utils.data import DataLoader
import numpy as np
from tqdm import tqdm
import os

# Assuming you already have UNet and CustomDataset defined
val_transform = A.Compose([
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])
# Load test dataset
test_dataset = CustomDataset(
    img_dir='/kaggle/input/comvsdataprime/TestDataset/TestDataset/Kvasir/images',
    mask_dir='/kaggle/input/comvsdataprime/TestDataset/TestDataset/Kvasir/masks',
    resize=(480, 480),
    transform=val_transform,  # no strong augmentations
)

test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Load trained model
model = UNet(num_classes=1)
model.load_state_dict(torch.load("/kaggle/input/comvsdataprime/Unet.pth", map_location='cuda' if torch.cuda.is_available() else 'cpu'))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

# Helper functions
def compute_iou(pred, mask, eps=1e-6):
    pred = (pred > 0.5).float()
    mask = (mask > 0.5).float()
    intersection = (pred * mask).sum()
    union = pred.sum() + mask.sum() - intersection
    return (intersection + eps) / (union + eps)

def compute_dice(pred, mask, eps=1e-6):
    pred = (pred > 0.5).float()
    mask = (mask > 0.5).float()
    intersection = (pred * mask).sum()
    return (2 * intersection + eps) / (pred.sum() + mask.sum() + eps)

# Run evaluation
ious, dices = [], []

with torch.no_grad():
    for images, masks in tqdm(test_loader, desc="Evaluating"):
        images = images.to(device)
        masks = masks.to(device).float()

        outputs = model(images)
        probs = torch.sigmoid(outputs)

        for pred, true_mask in zip(probs, masks):
            iou = compute_iou(pred, true_mask)
            dice = compute_dice(pred, true_mask)
            ious.append(iou.item())
            dices.append(dice.item())

macro_iou = np.mean(ious)
macro_dice = np.mean(dices)

print(f"📊 Macro IoU: {macro_iou:.4f}")
print(f"📊 Macro Dice: {macro_dice:.4f}")


  model.load_state_dict(torch.load("/kaggle/input/comvsdataprime/Unet.pth", map_location='cuda' if torch.cuda.is_available() else 'cpu'))
Evaluating: 100%|██████████| 100/100 [00:06<00:00, 16.01it/s]

📊 Macro IoU: 0.8152
📊 Macro Dice: 0.8746





In [None]:
import torch
from torch.utils.data import DataLoader, ConcatDataset
import numpy as np
from tqdm import tqdm
import os
import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset

# CustomDataset class (as you already defined)
class CustomDataset(Dataset):
    def __init__(self, img_dir, mask_dir, resize=None, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.resize = resize
        self.transform = transform
        self.image_files = sorted([f for f in os.listdir(self.img_dir) if f.endswith('.png')])
        self.mask_files = sorted([f for f in os.listdir(self.mask_dir) if f.endswith('.png')])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_files[idx])

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, self.resize)

        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, self.resize)
        mask = (mask > 127).astype(np.uint8)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask'].unsqueeze(0)  # shape: (1, H, W)

        return image, mask

# Define transforms
val_transform = A.Compose([
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

# List of dataset subfolders
dataset_names = ['CVC-300', 'CVC-ClinicDB', 'CVC-ColonDB', 'ETIS-LaribPolypDB', 'Kvasir']
base_path = '/kaggle/input/comvsdataprime/TestDataset/TestDataset'

# Collect all datasets
all_datasets = []
for name in dataset_names:
    img_dir = os.path.join(base_path, name, 'images')
    mask_dir = os.path.join(base_path, name, 'masks')
    ds = CustomDataset(img_dir=img_dir, mask_dir=mask_dir, resize=(480, 480), transform=val_transform)
    all_datasets.append(ds)

# Combine datasets
full_test_dataset = ConcatDataset(all_datasets)
test_loader = DataLoader(full_test_dataset, batch_size=1, shuffle=False)

# Load model
model = UNet(num_classes=1)
model.load_state_dict(torch.load("/kaggle/input/comvsdataprime/Unet.pth", map_location='cuda' if torch.cuda.is_available() else 'cpu'))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

# Metrics
def compute_iou(pred, mask, eps=1e-6):
    pred = (pred > 0.5).float()
    mask = (mask > 0.5).float()
    intersection = (pred * mask).sum()
    union = pred.sum() + mask.sum() - intersection
    return (intersection + eps) / (union + eps)

def compute_dice(pred, mask, eps=1e-6):
    pred = (pred > 0.5).float()
    mask = (mask > 0.5).float()
    intersection = (pred * mask).sum()
    return (2 * intersection + eps) / (pred.sum() + mask.sum() + eps)

# Run evaluation
ious, dices = [], []

with torch.no_grad():
    for images, masks in tqdm(test_loader, desc="Evaluating all datasets"):
        images = images.to(device)
        masks = masks.to(device).float()

        outputs = model(images)
        probs = torch.sigmoid(outputs)

        for pred, true_mask in zip(probs, masks):
            iou = compute_iou(pred, true_mask)
            dice = compute_dice(pred, true_mask)
            ious.append(iou.item())
            dices.append(dice.item())

# Final results
macro_iou = np.mean(ious)
macro_dice = np.mean(dices)

print(f"📊 Macro IoU (All datasets): {macro_iou:.4f}")
print(f"📊 Macro Dice (All datasets): {macro_dice:.4f}")


  model.load_state_dict(torch.load("/kaggle/input/comvsdataprime/Unet.pth", map_location='cuda' if torch.cuda.is_available() else 'cpu'))
Evaluating all datasets: 100%|██████████| 798/798 [00:44<00:00, 17.97it/s]

📊 Macro IoU (All datasets): 0.6403
📊 Macro Dice (All datasets): 0.7051



