In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import pandas as pd
import numpy as np
import cv2
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from torchvision import transforms
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
import matplotlib.pyplot as plt


  check_for_updates()


In [2]:
class CustomDataset(Dataset):
    def __init__(self, img_dir, label_dir, resize=None, transform=None):
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.resize = resize
        self.transform = transform
        self.images = os.listdir(self.img_dir)

    def __len__(self):
        return len(self.images)
    
    def read_mask(self, mask_path):
        image = cv2.imread(mask_path)
        image = cv2.resize(image, self.resize)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        lower_red1 = np.array([0, 100, 20])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([160, 100, 20])
        upper_red2 = np.array([179, 255, 255])
        
        lower_mask_red = cv2.inRange(image, lower_red1, upper_red1)
        upper_mask_red = cv2.inRange(image, lower_red2, upper_red2)
        red_mask = lower_mask_red + upper_mask_red
        red_mask[red_mask != 0] = 1
        
        green_mask = cv2.inRange(image, (36, 25, 25), (70, 255, 255))
        green_mask[green_mask != 0] = 2
        
        full_mask = cv2.bitwise_or(red_mask, green_mask)
        full_mask = np.expand_dims(full_mask, axis=-1)
        full_mask = full_mask.astype(np.uint8)
        
        return full_mask
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.images[idx])
        label_path = os.path.join(self.label_dir, self.images[idx])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.read_mask(label_path)
        image = cv2.resize(image, self.resize)
        
        if self.transform:
            transformed = self.transform(image=image, mask=label)
            image = transformed['image'].float()
            label = transformed['mask'].float()
            label = label.permute(2, 0, 1)
        
        return image, label

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

class ConvBlock(nn.Module):
    """
    A basic convolution block: Conv2d -> BatchNorm -> ReLU.
    """
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn   = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

class UNetPlusPlus(nn.Module):
    def __init__(self, num_classes=3, H=480, W=480):
        """
        UNet++ with ResNet-50 encoder and nested skip connections.
        Parameters:
            num_classes: Number of segmentation classes.
            H, W: Input image dimensions.
        """
        super(UNetPlusPlus, self).__init__()
        self.H = H
        self.W = W
        resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)

        self.x00 = nn.Sequential(resnet.conv1, resnet.bn1, resnet.relu, resnet.maxpool)  # [B, 64, 120, 120]
        self.x10 = resnet.layer1   # [B, 256, 120, 120]
        self.x20 = resnet.layer2   # [B, 512, 60, 60]
        self.x30 = resnet.layer3   # [B, 1024, 30, 30]
        self.x40 = resnet.layer4   # [B, 2048, 15, 15]
        self.upsample = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)

        self.conv01 = ConvBlock(in_channels=64 + 256, out_channels=64)
        self.conv11 = ConvBlock(in_channels=256 + 512, out_channels=256)
        self.conv21 = ConvBlock(in_channels=512 + 1024, out_channels=512)
        self.conv31 = ConvBlock(in_channels=1024 + 2048, out_channels=1024)

        self.conv02 = ConvBlock(in_channels=64 + 64 + 256, out_channels=64)
        self.conv12 = ConvBlock(in_channels=256 + 256 + 512, out_channels=256)
        self.conv22 = ConvBlock(in_channels=512 + 512 + 1024, out_channels=512)
        
        self.conv03 = ConvBlock(in_channels=64 + 64 + 64 + 256, out_channels=64)
        self.conv13 = ConvBlock(in_channels=256 + 256 + 256 + 512, out_channels=256)
        
        self.conv04 = ConvBlock(in_channels=64 + 64 + 64 + 64 + 256, out_channels=64)
        
        self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1)
    
    def forward(self, x):
        x00 = self.x00(x)   # [B, 64, 120, 120]
        x10 = self.x10(x00) # [B, 256, 120, 120]
        x20 = self.x20(x10) # [B, 512, 60, 60]
        x30 = self.x30(x20) # [B, 1024, 30, 30]
        x40 = self.x40(x30) # [B, 2048, 15, 15]
        
        x01 = self.conv01(torch.cat([x00, x10], dim=1))
        x11 = self.conv11(torch.cat([x10, self.upsample(x20)], dim=1))
        x21 = self.conv21(torch.cat([x20, self.upsample(x30)], dim=1))
        x31 = self.conv31(torch.cat([x30, self.upsample(x40)], dim=1))

        x02 = self.conv02(torch.cat([x00, x01, x11], dim=1))
        x12 = self.conv12(torch.cat([x10, x11, self.upsample(x21)], dim=1))
        x22 = self.conv22(torch.cat([x20, x21, self.upsample(x31)], dim=1))

        x03 = self.conv03(torch.cat([x00, x01, x02, x12], dim=1))
        x13 = self.conv13(torch.cat([x10, x11, x12, self.upsample(x22)], dim=1))
        
        x04 = self.conv04(torch.cat([x00, x01, x02, x03, x13], dim=1))
        

        output = self.final_conv(x04) 
        output = F.interpolate(output, size=(self.H, self.W), mode="bilinear", align_corners=True)
        
        return output


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import os
import cv2
import numpy as np
from torch.optim.lr_scheduler import ReduceLROnPlateau

class IoULoss(nn.Module):
    def __init__(self, eps=1e-6):
        super(IoULoss, self).__init__()
        self.eps = eps

    def forward(self, preds, targets):
        preds = F.softmax(preds, dim=1) 
        targets_one_hot = F.one_hot(targets, num_classes=preds.shape[1]).permute(0, 3, 1, 2) 

        intersection = (preds * targets_one_hot).sum(dim=(2, 3)) 
        union = (preds + targets_one_hot).sum(dim=(2, 3)) - intersection
        iou = (intersection + self.eps) / (union + self.eps)
        return 1 - iou.mean()

class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5):
        super(CombinedLoss, self).__init__()
        self.alpha = alpha
        self.ce_loss = nn.CrossEntropyLoss()
        self.iou_loss = IoULoss()

    def forward(self, preds, targets):
        ce = self.ce_loss(preds, targets)
        iou = self.iou_loss(preds, targets)
        return self.alpha * ce + (1 - self.alpha) * iou

train_transform = A.Compose([
    A.HorizontalFlip(p=0.4),
    A.VerticalFlip(p=0.4),
    A.RandomGamma(gamma_limit=(70, 130), p=0.2),
    A.RGBShift(p=0.3, r_shift_limit=10, g_shift_limit=10, b_shift_limit=10),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

val_transform = A.Compose([
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

lr = 0.005
batch_size = 8
epochs = 150
in_channels = 3 
out_channels = 3  
H, W = 480, 480 

model = UNetPlusPlus(num_classes=out_channels)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

loss_fn = CombinedLoss(alpha=0.5) 

class CustomDataset(Dataset):
    def __init__(self, img_dir, label_dir, resize=None, transform=None):
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.resize = resize
        self.transform = transform
        self.images = os.listdir(self.img_dir)

    def __len__(self): 
        return len(self.images)
    
    def read_mask(self, mask_path):
        image = cv2.imread(mask_path)
        image = cv2.resize(image, self.resize)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        lower_red1 = np.array([0, 100, 20])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([160, 100, 20])
        upper_red2 = np.array([179, 255, 255])
        
        lower_mask_red = cv2.inRange(image, lower_red1, upper_red1)
        upper_mask_red = cv2.inRange(image, lower_red2, upper_red2)
        red_mask = lower_mask_red + upper_mask_red
        red_mask[red_mask != 0] = 1 
        
        green_mask = cv2.inRange(image, (36, 25, 25), (70, 255, 255))
        green_mask[green_mask != 0] = 2  
        
        full_mask = cv2.bitwise_or(red_mask, green_mask)
        full_mask = full_mask.astype(np.uint8)
        
        return full_mask
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.images[idx])
        label_path = os.path.join(self.label_dir, self.images[idx])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.read_mask(label_path)
        image = cv2.resize(image, self.resize)
        
        if self.transform:
            transformed = self.transform(image=image, mask=label)
            image = transformed['image'].float()
            label = transformed['mask'].long() 
        
        return image, label

train_dataset = CustomDataset(
    img_dir='/kaggle/input/bkai-igh-neopolyp/train/train',
    label_dir='/kaggle/input/bkai-igh-neopolyp/train_gt/train_gt',
    resize=(H, W),
    transform=train_transform,
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

for epoch in range(epochs):
    model.train()
    epoch_loss = 0.0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs}", leave=False)

    for images, masks in progress_bar:
        images = images.to(device)
        masks = masks.to(device) 

        optimizer.zero_grad()

        outputs = model(images)
        loss = loss_fn(outputs, masks) 

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    epoch_loss /= len(train_loader)
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}")

    scheduler.step(epoch_loss)



                                                                         

KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), "/kaggle/working/model_weights.pth")


In [None]:

def infer(model, image_path, device):
    model.eval()
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    transformed = A.Compose([
        A.Resize(480, 640),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ])(image=image)
    
    input_tensor = transformed['image'].unsqueeze(0).to(device)
    
    with torch.no_grad():
        output = model(input_tensor)
        prediction = torch.argmax(output, dim=1).squeeze(0).cpu().numpy()
    
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(image)
    plt.title("Original Image")
    plt.axis("off")
    
    plt.subplot(1, 2, 2)
    plt.imshow(prediction, cmap="jet")
    plt.title("Predicted Mask")
    plt.axis("off")
    plt.show()
model = UNetPlusPlus(num_classes=3).to(device) 
model.load_state_dict(torch.load("/kaggle/working/model_weights.pth", map_location=device))
model.eval()
device="cuda"
infer(model, "/kaggle/input/bkai-igh-neopolyp/train/train/0081835cf877e004e8bfb905b78a9139.jpeg", device)