In [1]:
import zipfile
import os
from PIL import Image
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF

# Define transforms
def get_transforms():
    return transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
    ])

# UNet model
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()
        self.enc1 = self.double_conv(in_channels, 64)
        self.enc2 = self.double_conv(64, 128)
        self.enc3 = self.double_conv(128, 256)
        self.enc4 = self.double_conv(256, 512)
        self.enc5 = self.double_conv(512, 1024)

        self.pool = nn.MaxPool2d(2, 2)

        self.up4 = self.upconv(1024, 512)
        self.dec4 = self.double_conv(1024, 512)

        self.up3 = self.upconv(512, 256)
        self.dec3 = self.double_conv(512, 256)

        self.up2 = self.upconv(256, 128)
        self.dec2 = self.double_conv(256, 128)

        self.up1 = self.upconv(128, 64)
        self.dec1 = self.double_conv(128, 64)

        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def double_conv(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def upconv(self, in_channels, out_channels):
        return nn.ConvTranspose2d(in_channels, out_channels, 2, stride=2)

    def forward(self, x):
        enc1 = self.enc1(x)
        enc2 = self.enc2(self.pool(enc1))
        enc3 = self.enc3(self.pool(enc2))
        enc4 = self.enc4(self.pool(enc3))
        enc5 = self.enc5(self.pool(enc4))

        x = self.up4(enc5)
        x = torch.cat([x, enc4], dim=1)
        x = self.dec4(x)

        x = self.up3(x)
        x = torch.cat([x, enc3], dim=1)
        x = self.dec3(x)

        x = self.up2(x)
        x = torch.cat([x, enc2], dim=1)
        x = self.dec2(x)

        x = self.up1(x)
        x = torch.cat([x, enc1], dim=1)
        x = self.dec1(x)

        return self.final_conv(x)

# Custom Dataset
class FaceDataset(Dataset):
    def __init__(self, csv_file, face_crop_dir, face_segmentation_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.face_crop_dir = face_crop_dir
        self.face_segmentation_dir = face_segmentation_dir
        self.transform = transform
        self.data = self.data[self.data['with_mask'] == 1]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_name = row['filename'].replace('.jpg', '_1.jpg')
        x1, y1, x2, y2 = row['x1'], row['y1'], row['x2'], row['y2']

        img_path = os.path.join(self.face_crop_dir, img_name)
        mask_path = os.path.join(self.face_segmentation_dir, img_name)

        if not os.path.exists(img_path) or not os.path.exists(mask_path):
            return None  # Skip this sample

        try:
            img = Image.open(img_path).convert('RGB').crop((x1, y1, x2, y2))
            mask = Image.open(mask_path).convert('L').crop((x1, y1, x2, y2))

            if self.transform:
                img = self.transform(img)
                mask = self.transform(mask)
            return img, mask
        except:
            return None  # Handle corrupted or unreadable files

# Collate function to filter out None samples
def custom_collate_fn(batch):
    batch = [b for b in batch if b is not None]
    return torch.utils.data.dataloader.default_collate(batch)

# Evaluation Metrics
def dice_score(pred, target, threshold=0.5):
    pred = torch.sigmoid(pred) > threshold
    target = target > threshold
    intersection = (pred & target).float().sum((1, 2, 3))
    union = pred.float().sum((1, 2, 3)) + target.float().sum((1, 2, 3))
    dice = (2. * intersection + 1e-8) / (union + 1e-8)
    return dice.mean().item()

def iou_score(pred, target, threshold=0.5):
    pred = torch.sigmoid(pred) > threshold
    target = target > threshold
    intersection = (pred & target).float().sum((1, 2, 3))
    union = (pred | target).float().sum((1, 2, 3))
    iou = (intersection + 1e-8) / (union + 1e-8)
    return iou.mean().item()

# Training loop with validation
def train_unet(model, train_loader, val_loader, optimizer, device, num_epochs=3):
    criterion = nn.BCEWithLogitsLoss()

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        for images, masks in train_loader:
            images, masks = images.to(device), masks.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        model.eval()
        with torch.no_grad():
            val_iou, val_dice = 0.0, 0.0
            for images, masks in val_loader:
                images, masks = images.to(device), masks.to(device)
                outputs = model(images)
                val_iou += iou_score(outputs, masks)
                val_dice += dice_score(outputs, masks)

        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {total_loss:.4f}, "
              f"Val IoU: {val_iou/len(val_loader):.4f}, Val Dice: {val_dice/len(val_loader):.4f}")

# Paths (modify as needed)
csv_file = '/kaggle/input/mfsd-dataset/MSFD/1/dataset.csv'
face_crop_dir = '/kaggle/input/mfsd-dataset/MSFD/1/face_crop'
face_segmentation_dir = '/kaggle/input/mfsd-dataset/MSFD/1/face_crop_segmentation'

# Dataset and split
transform = get_transforms()
full_dataset = FaceDataset(csv_file, face_crop_dir, face_segmentation_dir, transform=transform)

train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=10, shuffle=False, collate_fn=custom_collate_fn)

# Model setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=3, out_channels=1).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Train
print("Running train UNet")
train_unet(model, train_loader, val_loader, optimizer, device, num_epochs=10)


Running train UNet
Epoch 1/10 - Loss: 193.6751, Val IoU: 0.8646, Val Dice: 0.8985
Epoch 2/10 - Loss: 80.6396, Val IoU: 0.8812, Val Dice: 0.9110
Epoch 3/10 - Loss: 53.0609, Val IoU: 0.8937, Val Dice: 0.9208
Epoch 4/10 - Loss: 41.2830, Val IoU: 0.9016, Val Dice: 0.9259
Epoch 5/10 - Loss: 35.2688, Val IoU: 0.8775, Val Dice: 0.9075
Epoch 6/10 - Loss: 31.9530, Val IoU: 0.8848, Val Dice: 0.9157
Epoch 7/10 - Loss: 29.6244, Val IoU: 0.9140, Val Dice: 0.9361
Epoch 8/10 - Loss: 28.7847, Val IoU: 0.9180, Val Dice: 0.9379
Epoch 9/10 - Loss: 26.9050, Val IoU: 0.9060, Val Dice: 0.9283
Epoch 10/10 - Loss: 26.1493, Val IoU: 0.9167, Val Dice: 0.9355
