## Import

In [1]:
!pip install segmentation-models-pytorch -q

[0m[31mERROR: Could not find a version that satisfies the requirement segmentation-models-pytorch (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for segmentation-models-pytorch[0m[31m
[0m

In [2]:
import os
import random
import numpy as np
from PIL import Image
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
import torchvision.transforms as transforms
from tqdm import tqdm
import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch import ToTensorV2
import time
from sklearn.model_selection import train_test_split

  data = fetch_version_info()


In [3]:
# Try importing segmentation_models_pytorch
try:
    import segmentation_models_pytorch as smp
except ImportError:
    print("Please install segmentation_models_pytorch: pip install segmentation-models-pytorch")
    exit()

Please install segmentation_models_pytorch: pip install segmentation-models-pytorch


## CONFIG

In [None]:
NAME_VERSION = "unet-v2.1"    
# Parameters
TRAIN_IMAGE_DIR = '/kaggle/input/gd-go-c-hcmus-aic-fragment-segmentation-track/train/images'
TRAIN_MASK_DIR = '/kaggle/input/gd-go-c-hcmus-aic-fragment-segmentation-track/train/masks'
VAL_IMAGE_DIR = '/kaggle/input/gd-go-c-hcmus-aic-fragment-segmentation-track/val/images'

OUTPUT_DIR = f'/kaggle/working/{NAME_VERSION}'

TARGET_SIZE = 512  # Higher resolution for better segmentation
BATCH_SIZE = 4     # Smaller batch size for higher resolution
NUM_EPOCHS = 50    # More epochs for better convergence
LEARNING_RATE = 5e-4
WEIGHT_DECAY = 1e-5  # L2 regularization
USE_AMP = False     # Use mixed precision for faster training

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"Using device: {DEVICE}")

In [None]:
# Check if files exist in directories
if not os.path.exists(TRAIN_IMAGE_DIR):
    raise FileNotFoundError(f"Training image directory not found: {TRAIN_IMAGE_DIR}")
if not os.path.exists(TRAIN_MASK_DIR):
    raise FileNotFoundError(f"Training mask directory not found: {TRAIN_MASK_DIR}")
if not os.path.exists(VAL_IMAGE_DIR):
    raise FileNotFoundError(f"Validation image directory not found: {VAL_IMAGE_DIR}")

## Seed

In [None]:
# Set random seed for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed()


## Dataset and Transformation

In [None]:
# Dataset class for training
class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir=None, transform=None, image_list=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        
        # Get list of image files
        if image_list is not None:
            self.images = image_list
        else:
            self.images = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.image_dir, img_name)
        
        image = np.array(Image.open(img_path).convert("RGB"))
        
        if self.mask_dir:  # Training or validation with masks
            mask_name = os.path.splitext(img_name)[0] + '.png'
            mask_path = os.path.join(self.mask_dir, mask_name)
            
            # Handle mask loading errors gracefully
            try:
                mask = np.array(Image.open(mask_path).convert("L"))  # Grayscale
                # Ensure mask is binary (0 or 1)
                mask = (mask > 0).astype(np.float32)
            except Exception as e:
                print(f"Error loading mask {mask_path}: {e}")
                # Create an empty mask if there's an error
                mask = np.zeros(image.shape[:2], dtype=np.float32)
            
            if self.transform:
                augmentations = self.transform(image=image, mask=mask)
                image = augmentations["image"]
                mask = augmentations["mask"]
            return image, mask
        else:  # Test set, no masks
            if self.transform:
                augmentations = self.transform(image=image)
                image = augmentations["image"]
            return image, img_name

In [None]:
# Define transformations with specific target size for blast fragments
def get_transforms(train=True, target_size=512):  # Increased size for better detail
    if train:
        return A.Compose([
            A.Resize(height=target_size, width=target_size),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.RandomRotate90(p=0.5),
            A.ShiftScaleRotate(p=0.5),
            A.GaussianBlur(p=0.3),
            A.GaussNoise(p=0.2),
            A.RandomBrightnessContrast(p=0.3),
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ToTensorV2(),
        ])
    else:
        return A.Compose([
            A.Resize(height=target_size, width=target_size),
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ToTensorV2(),
        ])

## Utils

In [None]:
# RLE encoding function from the competition description
def mask_to_rle(mask):
    """
    Convert a binary mask to run-length encoding (RLE)
    """
    # Flatten mask
    pixels = mask.flatten()
    # Compress the mask with RLE
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    # Convert to string
    rle = ' '.join(str(x) for x in runs)
    return rle

In [None]:
# RLE decoding function for the competition
def rle_to_mask(rle, shape):
    """
    Convert RLE to mask
    """
    if rle == '' or rle is None:
        return np.zeros(shape, dtype=np.uint8)
    
    s = rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1  # RLE starts from 1, convert to 0-based indexing
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape)

In [None]:
# IoU (Jaccard Index) for evaluation as per competition metric
def iou_score(pred, target, smooth=1e-6):
    pred = (pred > 0.5).float()
    intersection = (pred * target).sum()
    union = pred.sum() + target.sum() - intersection
    return (intersection + smooth) / (union + smooth)

In [None]:
# Display predictions
def display_predictions(model, dataset, device, num_samples=3, save_path='predictions.png'):
    model.eval()
    fig, axes = plt.subplots(num_samples, 3, figsize=(15, 5*num_samples))
    
    # Sample indices randomly
    indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
    
    for i, idx in enumerate(indices):
        image, mask = dataset[idx]
        
        image_tensor = image.unsqueeze(0).to(device)
        with torch.no_grad():
            output = model(image_tensor)
            pred_mask = torch.sigmoid(output) > 0.5
        
        # Convert tensors to numpy for display
        image_np = image.permute(1, 2, 0).cpu().numpy()
        mask_np = mask.cpu().numpy()
        pred_mask_np = pred_mask.squeeze().cpu().numpy()
        
        # Denormalize image
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        image_np = std * image_np + mean
        image_np = np.clip(image_np, 0, 1)
        
        # Display
        axes[i, 0].imshow(image_np)
        axes[i, 0].set_title("Image")
        axes[i, 0].axis('off')
        
        axes[i, 1].imshow(mask_np, cmap='gray')
        axes[i, 1].set_title("Ground Truth")
        axes[i, 1].axis('off')
        
        axes[i, 2].imshow(pred_mask_np, cmap='gray')
        axes[i, 2].set_title("Prediction")
        axes[i, 2].axis('off')
    
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

In [None]:
# Save training history plot and metrics
def save_training_history(train_losses, val_losses, train_ious, val_ious, save_dir):
    # Create directory if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)
    
    # Save metrics to CSV
    history = {
        'epoch': list(range(1, len(train_losses) + 1)),
        'train_loss': train_losses,
        'val_loss': val_losses,
        'train_iou': train_ious,
        'val_iou': val_ious
    }
    pd.DataFrame(history).to_csv(os.path.join(save_dir, 'training_history.csv'), index=False)
    
    # Plot training history
    plt.figure(figsize=(15, 6))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['epoch'], train_losses, 'b-', label='Training Loss')
    plt.plot(history['epoch'], val_losses, 'r-', label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.plot(history['epoch'], train_ious, 'b-', label='Training IoU')
    plt.plot(history['epoch'], val_ious, 'r-', label='Validation IoU')
    plt.title('Training and Validation IoU')
    plt.xlabel('Epoch')
    plt.ylabel('IoU Score')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, 'training_history.png'))
    plt.close()

## Train and val helper

In [None]:
# Training function with mixed precision
def train_epoch(model, loader, optimizer, criterion, device, scaler=None):
    model.train()
    epoch_loss = 0
    iou_scores = []
    
    for images, masks in tqdm(loader):
        images = images.to(device)
        masks = masks.float().to(device).unsqueeze(1)  # Add channel dimension
        
        # Use mixed precision if scaler is provided
        if scaler is not None:
            with autocast():
                outputs = model(images)
                loss = criterion(outputs, masks)
                
            # Scale gradients and optimize
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
        else:
            outputs = model(images)
            loss = criterion(outputs, masks)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        epoch_loss += loss.item()
        
        # Calculate IoU score for this batch
        with torch.no_grad():
            pred_masks = torch.sigmoid(outputs)
            iou = iou_score(pred_masks, masks)
            iou_scores.append(iou.item())
    
    return epoch_loss / len(loader), np.mean(iou_scores)

In [None]:
# Validation function
def valid_epoch(model, loader, criterion, device):
    model.eval()
    epoch_loss = 0
    iou_scores = []
    
    with torch.no_grad():
        for images, masks in tqdm(loader):
            images = images.to(device)
            masks = masks.float().to(device).unsqueeze(1)
            
            outputs = model(images)
            loss = criterion(outputs, masks)
            
            epoch_loss += loss.item()
            pred_masks = torch.sigmoid(outputs)
            iou = iou_score(pred_masks, masks)
            iou_scores.append(iou.item())
    
    return epoch_loss / len(loader), np.mean(iou_scores)

## Prediction Helpers

In [None]:
# Prediction and RLE encoding for competition submission
def predict_and_encode(model, loader, device, target_size=512):
    model.eval()
    predictions = []
    
    with torch.no_grad():
        for images, img_names in tqdm(loader):
            images = images.to(device)
            outputs = model(images)
            pred_masks = (torch.sigmoid(outputs) > 0.5).float().cpu().numpy()
            
            for pred_mask, img_name in zip(pred_masks, img_names):
                pred_mask = pred_mask.squeeze()  # Remove channel dimension
                # Convert to RLE format
                rle = mask_to_rle(pred_mask)
                image_id = os.path.splitext(img_name)[0]
                predictions.append({'id': image_id, 'rle': rle})
    
    return predictions

## Split train and val

In [None]:
# Get all images
all_images = [f for f in os.listdir(TRAIN_IMAGE_DIR) if f.endswith('.jpg')]
print(f"Found {len(all_images)} images in training directory")

# Verify mask files exist for training images
valid_images = []
for img_file in all_images:
    mask_file = os.path.splitext(img_file)[0] + '.png'
    if os.path.exists(os.path.join(TRAIN_MASK_DIR, mask_file)):
        valid_images.append(img_file)

if len(valid_images) < len(all_images):
    print(f"Warning: Only {len(valid_images)} of {len(all_images)} images have corresponding masks")

# Split data into train and validation
train_images, valid_images = train_test_split(
    valid_images, test_size=0.2, random_state=42
)

print(f"Training images: {len(train_images)}")
print(f"Validation images: {len(valid_images)}")

## DataLoader

In [None]:
# Define transformations
train_transform = get_transforms(train=True, target_size=TARGET_SIZE)
valid_transform = get_transforms(train=False, target_size=TARGET_SIZE)

# Data loaders with appropriate number of workers
num_workers = min(8, os.cpu_count() or 1)

# Create datasets
train_dataset = SegmentationDataset(
    image_dir=TRAIN_IMAGE_DIR,
    mask_dir=TRAIN_MASK_DIR,
    transform=train_transform,
    image_list=train_images
)

valid_dataset = SegmentationDataset(
    image_dir=TRAIN_IMAGE_DIR,
    mask_dir=TRAIN_MASK_DIR,
    transform=valid_transform,
    image_list=valid_images
)

test_dataset = SegmentationDataset(
    image_dir=VAL_IMAGE_DIR,
    mask_dir=None,
    transform=valid_transform
)


train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=True
)

valid_loader = DataLoader(
    valid_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

## Model

In [None]:
# Model - try a different architecture
model = smp.UnetPlusPlus(  # UNet++ often performs better for medical-like segmentation
    encoder_name="efficientnet-b3",  # Higher capacity encoder
    encoder_weights="imagenet",
    in_channels=3,
    classes=1
).to(DEVICE)

## Loss/Optimizer/Scheduler/Scaler

In [None]:
# Loss function - Combined BCE and Dice loss for better boundary detection
class BCEDiceLoss(nn.Module):
    def __init__(self, weight=0.5):
        super(BCEDiceLoss, self).__init__()
        self.weight = weight  # Weight for BCE vs Dice
        self.bce = nn.BCEWithLogitsLoss()
        
    def forward(self, pred, target):
        # BCE Loss
        bce_loss = self.bce(pred, target)
        
        # Dice Loss
        pred_sigmoid = torch.sigmoid(pred)
        intersection = (pred_sigmoid * target).sum()
        dice_loss = 1 - (2. * intersection + 1) / (pred_sigmoid.sum() + target.sum() + 1)
        
        # Combine losses
        return self.weight * bce_loss + (1 - self.weight) * dice_loss

criterion = BCEDiceLoss(weight=0.7)

# Initialize mixed precision scaler
scaler = GradScaler() if USE_AMP and DEVICE.type == 'cuda' else None


# Optimizer with weight decay for regularization
optimizer = optim.AdamW(
    model.parameters(),
    lr=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY
)

# Learning rate scheduler with cosine annealing
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer, 
    T_0=5,  # Restart every 5 epochs
    T_mult=1, 
    eta_min=1e-6
)

## Traing loop

In [None]:
# Training loop
best_iou = -1
model_save_path = os.path.join(OUTPUT_DIR, "best_model.pth")

# For tracking metrics
train_losses, val_losses = [], []
train_ious, val_ious = [], []

start_time = time.time()

for epoch in range(NUM_EPOCHS):
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
    
    # Train
    train_loss, train_iou = train_epoch(model, train_loader, optimizer, criterion, DEVICE, scaler)
    
    # Validate
    valid_loss, valid_iou = valid_epoch(model, valid_loader, criterion, DEVICE)
    
    # Update learning rate
    scheduler.step()
    
    # Track metrics
    train_losses.append(train_loss)
    val_losses.append(valid_loss)
    train_ious.append(train_iou)
    val_ious.append(valid_iou)
    
    print(f"Train Loss: {train_loss:.4f}, Train IoU: {train_iou:.4f}")
    print(f"Valid Loss: {valid_loss:.4f}, Valid IoU: {valid_iou:.4f}")
    print(f"Learning rate: {optimizer.param_groups[0]['lr']:.6f}")
    
    # Save best model
    if valid_iou > best_iou:
        best_iou = valid_iou
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'best_iou': best_iou,
        }, model_save_path)
        print(f"Saved best model to {model_save_path}!")

# Calculate training time
time_elapsed = time.time() - start_time
print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
print(f'Best validation IoU: {best_iou:.4f}')

## Save

In [None]:
# Save training history
save_training_history(train_losses, val_losses, train_ious, val_ious, OUTPUT_DIR)

# Load best model for evaluation and prediction
checkpoint = torch.load(model_save_path)
model.load_state_dict(checkpoint['model_state_dict'])

# Validate with best model
valid_loss, valid_iou = valid_epoch(model, valid_loader, criterion, DEVICE)
print(f"Best model - Valid Loss: {valid_loss:.4f}, Valid IoU: {valid_iou:.4f}")

# Save visualizations of predictions
display_predictions(
    model, 
    valid_dataset, 
    DEVICE, 
    num_samples=5, 
    save_path=os.path.join(OUTPUT_DIR, 'predictions.png')
)
print(f"Predictions visualization saved to '{os.path.join(OUTPUT_DIR, 'predictions.png')}'")

## Infer

In [None]:
# Generate predictions for test set and save in RLE format
test_predictions = predict_and_encode(model, test_loader, DEVICE, target_size=TARGET_SIZE)
submission_df = pd.DataFrame(test_predictions)
submission_path = os.path.join(OUTPUT_DIR, 'submission.csv')
submission_df.to_csv(submission_path, index=False)
print(f"Submission file saved to '{submission_path}'")