In [1]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from glob import glob
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import jaccard_score, f1_score

ModuleNotFoundError: No module named 'cv2'

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
class Config:
    # Paths
    DATA_PATH = "MFSD"  # Change this to your dataset path
    OUTPUT_PATH = "results/unet"
    
    # Dataset parameters
    IMG_SIZE = 256
    BATCH_SIZE = 8
    
    # Training parameters
    EPOCHS = 50
    LEARNING_RATE = 1e-4
    EARLY_STOPPING_PATIENCE = 10
    
    # Model parameters
    N_CLASSES = 1  # Binary segmentation
    INIT_FEATURES = 32  # Number of features in the first layer
    
    # Create output directories
    os.makedirs(OUTPUT_PATH, exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_PATH, "models"), exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_PATH, "visualizations"), exist_ok=True)

In [None]:
 
    # Dataset parameters
    IMG_SIZE = 256
    BATCH_SIZE = 8
    
    # Training parameters
    EPOCHS = 50
    LEARNING_RATE = 1e-4
    EARLY_STOPPING_PATIENCE = 10
    
    # Model parameters
    N_CLASSES = 1  # Binary segmentation
    INIT_FEATURES = 32  # Number of features in the first layer
    
    # Create output directories
    os.makedirs(OUTPUT_PATH, exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_PATH, "models"), exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_PATH, "visualizations"), exist_ok=True)

In [None]:
class MaskDataset(Dataset):
    """Dataset for mask segmentation."""
    
    def __init__(self, images_path, masks_path, transform=None):
        """
        Initialize the dataset.
        
        Args:
            images_path: List of paths to images
            masks_path: List of paths to corresponding masks
            transform: Optional transform to be applied on samples
        """
        self.images_path = images_path
        self.masks_path = masks_path
        self.transform = transform
    
    def __len__(self):
        return len(self.images_path)
    
    def __getitem__(self, idx):
        # Read image and mask
        image = cv2.imread(self.images_path[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        mask = cv2.imread(self.masks_path[idx], cv2.IMREAD_GRAYSCALE)
        
        # Normalize image
        image = image / 255.0
        mask = mask / 255.0
        
        # Resize
        image = cv2.resize(image, (Config.IMG_SIZE, Config.IMG_SIZE))
        mask = cv2.resize(mask, (Config.IMG_SIZE, Config.IMG_SIZE))
        
        # Convert to tensors
        image = torch.from_numpy(image).permute(2, 0, 1).float()
        mask = torch.from_numpy(mask).unsqueeze(0).float()
        
        if self.transform:
            # Apply transforms if provided
            sample = self.transform({"image": image, "mask": mask})
            image, mask = sample["image"], sample["mask"]
        
        return image, mask

In [None]:
class DoubleConv(nn.Module):
    """Double convolution block for U-Net."""
    
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        return self.double_conv(x)


In [None]:
class UNet(nn.Module):
    """U-Net architecture for image segmentation."""
    
    def __init__(self, n_channels=3, n_classes=1, init_features=32):
        super(UNet, self).__init__()
        
        # Encoder
        self.enc1 = DoubleConv(n_channels, init_features)
        self.enc2 = DoubleConv(init_features, init_features * 2)
        self.enc3 = DoubleConv(init_features * 2, init_features * 4)
        self.enc4 = DoubleConv(init_features * 4, init_features * 8)
        
        # Bottleneck
        self.bottleneck = DoubleConv(init_features * 8, init_features * 16)
        
        # Decoder
        self.upconv4 = nn.ConvTranspose2d(init_features * 16, init_features * 8, kernel_size=2, stride=2)
        self.dec4 = DoubleConv(init_features * 16, init_features * 8)
        
        self.upconv3 = nn.ConvTranspose2d(init_features * 8, init_features * 4, kernel_size=2, stride=2)
        self.dec3 = DoubleConv(init_features * 8, init_features * 4)
        
        self.upconv2 = nn.ConvTranspose2d(init_features * 4, init_features * 2, kernel_size=2, stride=2)
        self.dec2 = DoubleConv(init_features * 4, init_features * 2)
        
        self.upconv1 = nn.ConvTranspose2d(init_features * 2, init_features, kernel_size=2, stride=2)
        self.dec1 = DoubleConv(init_features * 2, init_features)
        
        # Output layer
        self.out = nn.Conv2d(init_features, n_classes, kernel_size=1)
        
        # Max pooling
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Initialize weights
        self._initialize_weights()
    
    def forward(self, x):
        # Encoder
        enc1 = self.enc1(x)
        x = self.pool(enc1)
        
        enc2 = self.enc2(x)
        x = self.pool(enc2)
        
        enc3 = self.enc3(x)
        x = self.pool(enc3)
        
        enc4 = self.enc4(x)
        x = self.pool(enc4)
        
        # Bottleneck
        x = self.bottleneck(x)
        
        # Decoder
        x = self.upconv4(x)
        x = torch.cat([x, enc4], dim=1)
        x = self.dec4(x)
        
        x = self.upconv3(x)
        x = torch.cat([x, enc3], dim=1)
        x = self.dec3(x)
        
        x = self.upconv2(x)
        x = torch.cat([x, enc2], dim=1)
        x = self.dec2(x)
        
        x = self.upconv1(x)
        x = torch.cat([x, enc1], dim=1)
        x = self.dec1(x)
        
        # Output
        x = self.out(x)
        x = torch.sigmoid(x)
        
        return x
    
    def _initialize_weights(self):
        """Initialize model weights for better convergence."""
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

In [None]:
class DiceLoss(nn.Module):
    """Dice loss for segmentation."""
    
    def __init__(self, smooth=1.0):
        super(DiceLoss, self).__init__()
        self.smooth = smooth
    
    def forward(self, predictions, targets):
        # Flatten
        predictions = predictions.view(-1)
        targets = targets.view(-1)
        
        # Calculate Dice coefficient
        intersection = (predictions * targets).sum()
        dice = (2.0 * intersection + self.smooth) / (predictions.sum() + targets.sum() + self.smooth)
        
        return 1 - dice

In [None]:
class MaskSegmentation:
    """Class for mask segmentation using U-Net."""
    
    def __init__(self, config):
        """
        Initialize the segmentation model.
        
        Args:
            config: Configuration class with model parameters
        """
        self.config = config
        self.model = None
        self.best_model_path = None
        
        # Setup data paths
        self.images_path = os.path.join(config.DATA_PATH, "images")
        self.masks_path = os.path.join(config.DATA_PATH, "masks")
        
        # Get list of image files that have masks
        self.image_files = sorted(glob(os.path.join(self.images_path, "with_mask", "*.jpg")))
        self.mask_files = [os.path.join(self.masks_path, os.path.basename(img)) for img in self.image_files]
        
        print(f"Found {len(self.image_files)} images with masks.")
    
    def prepare_data(self):
        """
        Prepare datasets and dataloaders.
        
        Returns:
            Tuple of train, validation, and test dataloaders
        """
        # Split data into train, validation, and test sets
        X_train, X_temp, y_train, y_temp = train_test_split(
            self.image_files, self.mask_files, test_size=0.3, random_state=42)
        
        X_val, X_test, y_val, y_test = train_test_split(
            X_temp, y_temp, test_size=0.5, random_state=42)
        
        print(f"Train: {len(X_train)}, Validation: {len(X_val)}, Test: {len(X_test)}")
        
        # Create datasets
        train_dataset = MaskDataset(X_train, y_train)
        val_dataset = MaskDataset(X_val, y_val)
        test_dataset = MaskDataset(X_test, y_test)
        
        # Create dataloaders
        train_loader = DataLoader(
            train_dataset, batch_size=self.config.BATCH_SIZE, shuffle=True, num_workers=4)
        val_loader = DataLoader(
            val_dataset, batch_size=self.config.BATCH_SIZE, shuffle=False, num_workers=4)
        test_loader = DataLoader(
            test_dataset, batch_size=self.config.BATCH_SIZE, shuffle=False, num_workers=4)
        
        return train_loader, val_loader, test_loader
    
    def build_model(self):
        """Build and initialize the U-Net model."""
        model = UNet(
            n_channels=3,
            n_classes=self.config.N_CLASSES,
            init_features=self.config.INIT_FEATURES
        ).to(device)
        
        self.model = model
        return model
    
    def train(self, train_loader, val_loader):
        """
        Train the U-Net model.
        
        Args:
            train_loader: DataLoader for training data
            val_loader: DataLoader for validation data
            
        Returns:
            History of training metrics
        """
        model = self.model
        criterion = DiceLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=self.config.LEARNING_RATE)
        
        # Learning rate scheduler
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=5, verbose=True)
        
        # Initialize variables for early stopping
        best_val_loss = float('inf')
        patience_counter = 0
        
        # Initialize history
        history = {
            'train_loss': [],
            'val_loss': [],
            'val_iou': [],
            'val_dice': []
        }
        
        # Training loop
        for epoch in range(self.config.EPOCHS):
            # Training phase
            model.train()
            train_loss = 0
            
            for images, masks in tqdm(train_loader, desc=f"Epoch {epoch+1}/{self.config.EPOCHS} (Train)"):
                images = images.to(device)
                masks = masks.to(device)
                
                # Forward pass
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, masks)
                
                # Backward pass
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
            
            train_loss /= len(train_loader)
            
            # Validation phase
            model.eval()
            val_loss = 0
            val_iou = 0
            val_dice = 0
            
            with torch.no_grad():
                for images, masks in tqdm(val_loader, desc=f"Epoch {epoch+1}/{self.config.EPOCHS} (Val)"):
                    images = images.to(device)
                    masks = masks.to(device)
                    
                    # Forward pass
                    outputs = model(images)
                    loss = criterion(outputs, masks)
                    
                    val_loss += loss.item()
                    
                    # Calculate IoU and Dice scores
                    preds = (outputs > 0.5).float()
                    iou = self.calculate_iou(preds, masks)
                    dice = 1 - loss.item()  # Since we're using Dice loss
                    
                    val_iou += iou
                    val_dice += dice
            
            val_loss /= len(val_loader)
            val_iou /= len(val_loader)
            val_dice /= len(val_loader)
            
            # Update learning rate
            scheduler.step(val_loss)
            
            # Print metrics
            print(f"Epoch {epoch+1}/{self.config.EPOCHS}, "
                  f"Train Loss: {train_loss:.4f}, "
                  f"Val Loss: {val_loss:.4f}, "
                  f"Val IoU: {val_iou:.4f}, "
                  f"Val Dice: {val_dice:.4f}")
            
            # Save metrics to history
            history['train_loss'].append(train_loss)
            history['val_loss'].append(val_loss)
            history['val_iou'].append(val_iou)
            history['val_dice'].append(val_dice)
            
            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                
                # Save model
                model_path = os.path.join(
                    self.config.OUTPUT_PATH, "models", f"unet_epoch_{epoch+1}_val_loss_{val_loss:.4f}.pt")
                torch.save(model.state_dict(), model_path)
                self.best_model_path = model_path
                
                print(f"Model saved to {model_path}")
            else:
                patience_counter += 1
            
            # Early stopping
            if patience_counter >= self.config.EARLY_STOPPING_PATIENCE:
                print(f"Early stopping after {epoch+1} epochs")
                break
        
        # Save training history
        history_df = pd.DataFrame(history)
        history_df.to_csv(os.path.join(self.config.OUTPUT_PATH, "training_history.csv"), index=False)
        
        # Plot training history
        self.plot_training_history(history)
        
        return history
    
    def calculate_iou(self, pred, target):
        """
        Calculate IoU score.
        
        Args:
            pred: Predicted mask
            target: Ground truth mask
            
        Returns:
            IoU score
        """
        # Flatten tensors
        pred = pred.view(-1).cpu().numpy()
        target = target.view(-1).cpu().numpy()
        
        # Calculate IoU
        intersection = np.logical_and(pred, target).sum()
        union = np.logical_or(pred, target).sum()
        
        if union == 0:
            return 1.0
        
        return intersection / union
    
    def plot_training_history(self, history):
        """
        Plot training history.
        
        Args:
            history: Dictionary of training metrics
        """
        plt.figure(figsize=(15, 5))
        
        # Plot losses
        plt.subplot(1, 2, 1)
        plt.plot(history['train_loss'], label='Train Loss')
        plt.plot(history['val_loss'], label='Validation Loss')
        plt.title('Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        # Plot metrics
        plt.subplot(1, 2, 2)
        plt.plot(history['val_iou'], label='Validation IoU')
        plt.plot(history['val_dice'], label='Validation Dice')
        plt.title('Metrics')
        plt.xlabel('Epoch')
        plt.ylabel('Score')
        plt.legend()
        
        plt.tight_layout()
        plt.savefig(os.path.join(self.config.OUTPUT_PATH, "training_history.png"))
        plt.close()
    
    def load_best_model(self):
        """Load the best model from training."""
        if self.best_model_path is None:
            print("No best model path found. Please train the model first.")
            return False
        
        self.model.load_state_dict(torch.load(self.best_model_path))
        self.model.eval()
        return True
    
    def evaluate(self, test_loader):
        """
        Evaluate the model on test data.
        
        Args:
            test_loader: DataLoader for test data
            
        Returns:
            Dictionary of evaluation metrics
        """
        model = self.model
        model.eval()
        
        test_iou = []
        test_dice = []
        
        # For visualization
        vis_images = []
        vis_gt_masks = []
        vis_pred_masks = []
        
        with torch.no_grad():
            for images, masks in tqdm(test_loader, desc="Evaluating"):
                images = images.to(device)
                masks = masks.to(device)
                
                # Forward pass
                outputs = model(images)
                preds = (outputs > 0.5).float()
                
                # Calculate metrics
                batch_iou = self.calculate_iou(preds, masks)
                batch_dice = self.calculate_dice(preds, masks)
                
                test_iou.append(batch_iou)
                test_dice.append(batch_dice)
                
                # Store some samples for visualization
                if len(vis_images) < 5:  # Store only the first 5 samples
                    for i in range(min(images.size(0), 5 - len(vis_images))):
                        vis_images.append(images[i].cpu())
                        vis_gt_masks.append(masks[i].cpu())
                        vis_pred_masks.append(preds[i].cpu())
        
        # Calculate average metrics
        avg_iou = sum(test_iou) / len(test_iou)
        avg_dice = sum(test_dice) / len(test_dice)
        
        # Print results
        print(f"Test IoU: {avg_iou:.4f}")
        print(f"Test Dice: {avg_dice:.4f}")
        
        # Visualize results
        self.visualize_predictions(vis_images, vis_gt_masks, vis_pred_masks)
        
        return {
            "iou": avg_iou,
            "dice": avg_dice
        }

    def calculate_dice(self, pred, target):
        """
        Calculate Dice coefficient.
        
        Args:
            pred: Predicted mask
            target: Ground truth mask
            
        Returns:
            Dice coefficient
        """
        # Flatten tensors
        pred = pred.view(-1).cpu().numpy()
        target = target.view(-1).cpu().numpy()
        
        # Calculate Dice
        intersection = np.logical_and(pred, target).sum()
        return (2.0 * intersection) / (pred.sum() + target.sum() + 1e-8)

    def visualize_predictions(self, images, gt_masks, pred_masks):
        """
        Visualize and save predictions.
        
        Args:
            images: List of input images
            gt_masks: List of ground truth masks
            pred_masks: List of predicted masks
        """
        n_samples = len(images)
        fig, axes = plt.subplots(n_samples, 3, figsize=(15, 5 * n_samples))
        
        if n_samples == 1:
            axes = axes.reshape(1, -1)
        
        for i in range(n_samples):
            # Original image
            img = images[i].permute(1, 2, 0).numpy()
            axes[i, 0].imshow(img)
            axes[i, 0].set_title("Original Image")
            axes[i, 0].axis("off")
            
            # Ground truth mask
            gt_mask = gt_masks[i].squeeze().numpy()
            axes[i, 1].imshow(gt_mask, cmap="gray")
            axes[i, 1].set_title("Ground Truth Mask")
            axes[i, 1].axis("off")
            
            # Predicted mask
            pred_mask = pred_masks[i].squeeze().numpy()
            axes[i, 2].imshow(pred_mask, cmap="gray")
            axes[i, 2].set_title("Predicted Mask")
            axes[i, 2].axis("off")
        
        plt.tight_layout()
        plt.savefig(os.path.join(self.config.OUTPUT_PATH, "visualizations/predictions.png"))
        plt.close()

    def predict(self, image_path):
        """
        Predict mask for a single image.
        
        Args:
            image_path: Path to input image
            
        Returns:
            Predicted mask as numpy array
        """
        # Load and preprocess image
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Normalize and resize
        image = image / 255.0
        image = cv2.resize(image, (Config.IMG_SIZE, Config.IMG_SIZE))
        
        # Convert to tensor
        image = torch.from_numpy(image).permute(2, 0, 1).float().unsqueeze(0).to(device)
        
        # Make prediction
        self.model.eval()
        with torch.no_grad():
            output = self.model(image)
            pred_mask = (output > 0.5).float()
        
        # Convert to numpy
        pred_mask = pred_mask.squeeze().cpu().numpy()
        
        return pred_mask

    def predict_and_visualize(self, image_path, output_path=None):
        """
        Predict mask for a single image and visualize result.
        
        Args:
            image_path: Path to input image
            output_path: Path to save visualization (optional)
            
        Returns:
            Predicted mask as numpy array
        """
        # Load image
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Get prediction
        pred_mask = self.predict(image_path)
        
        # Visualize
        fig, axes = plt.subplots(1, 2, figsize=(12, 6))
        
        # Original image
        axes[0].imshow(cv2.resize(image, (Config.IMG_SIZE, Config.IMG_SIZE)))
        axes[0].set_title("Original Image")
        axes[0].axis("off")
        
        # Predicted mask
        axes[1].imshow(pred_mask, cmap="gray")
        axes[1].set_title("Predicted Mask")
        axes[1].axis("off")
        
        plt.tight_layout()
        
        if output_path:
            plt.savefig(output_path)
        else:
            plt.savefig(os.path.join(self.config.OUTPUT_PATH, "visualizations", 
                                    f"pred_{os.path.basename(image_path)}"))
        
        plt.close()
        
        return pred_mask

    def run_experiment(self):
        """Run the full experiment: prepare data, train, and evaluate."""
        print("Preparing data...")
        train_loader, val_loader, test_loader = self.prepare_data()
        
        print("Building model...")
        self.build_model()
        
        print("Training model...")
        history = self.train(train_loader, val_loader)
        
        print("Loading best model...")
        self.load_best_model()
        
        print("Evaluating model...")
        results = self.evaluate(test_loader)
        
        # Save results to file
        with open(os.path.join(self.config.OUTPUT_PATH, "results.txt"), "w") as f:
            f.write(f"IoU Score: {results['iou']:.4f}\n")
            f.write(f"Dice Coefficient: {results['dice']:.4f}\n")
        
        return results

In [None]:
# Main execution
if __name__ == "__main__":
    # Set random seeds for reproducibility
    torch.manual_seed(42)
    np.random.seed(42)
    
    # Initialize config
    config = Config()
    
    # Initialize segmentation model
    segmentation = MaskSegmentation(config)
    
    # Run experiment
    results = segmentation.run_experiment()
    
    print("Experiment completed!")
    print(f"Results: IoU={results['iou']:.4f}, Dice={results['dice']:.4f}")