In [27]:
import os
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
import torch

DEVICE = 'cuda' if torch.cuda.is_available() else "cpu"
# Your existing function to load images and masks
def load_images_and_masks(image_dir, mask_dir, image_filenames, mask_filenames):
    images = []
    masks = []
    instance_class_mapping = []

    for img_file, mask_file in zip(image_filenames, mask_filenames):
        # Load image and mask
        img = Image.open(os.path.join(image_dir, img_file))
        mask = Image.open(os.path.join(mask_dir, mask_file))
        
        # Convert to numpy arrays
        img = np.array(img)
        mask = np.array(mask)
        
        # Extract class labels from the mask (assuming the class is encoded as wound_class * 15)
        mask_class = mask // 15  # This extracts the class label for each pixel

        # Create an instance mask (simply for this example, assuming each unique value is an instance)
        instance_ids = mask  # Use the raw mask values as instance IDs (assuming each unique value is an instance)

        # Find all unique instance IDs (ignoring background class 0)
        unique_instance_ids = np.unique(instance_ids[instance_ids != 0])

        # Create a dictionary mapping each instance ID to its class
        class_mapping = {}
        for instance_id in unique_instance_ids:
            # Map the instance ID to the class by taking the first pixel of that instance
            class_mapping[instance_id] = mask_class[mask == instance_id][0]

        # Append the results
        images.append(img)
        masks.append(mask_class)  # Use the class-based mask
        instance_class_mapping.append(class_mapping)  # Map instance IDs to their classes
    
    return np.array(images), np.array(masks), instance_class_mapping


image_dir ='C:/users/comi/Desktop/Wound_segmentation_III/Data/new_images_640_1280'
mask_dir = 'C:/users/comi/Desktop/Wound_segmentation_III/Data/new_masks_640_1280'
image_filenames = sorted([f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.tiff'))])
mask_filenames = sorted([f for f in os.listdir(mask_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.tiff'))])

images, masks, instance_class_mapping = load_images_and_masks(image_dir, mask_dir, image_filenames, mask_filenames)

# Split data into training and validation sets
X_train, X_val, y_train, y_val, mapping_train, mapping_val = train_test_split(
    images, masks, instance_class_mapping, test_size=0.2, random_state=42
)

# Output the number of training and validation images
print(f"Training images: {len(X_train)}, Validation images: {len(X_val)}")
print(f"Instance-Class Mapping for the first training image: {mapping_train[0]}")


Training images: 1600, Validation images: 401
Instance-Class Mapping for the first training image: {15: 1}


In [13]:
def preprocess_mask(mask):
    # Assuming mask has pixel values in steps of 15, representing classes 1, 2, 3, etc.
    class_mask = mask // 15  # Convert pixel values to class indices
    return class_mask.astype(np.int64)  # Convert to the expected dtype for cross_entropy


In [14]:
import torch
from torch.utils.data import Dataset


class PanopticSegmentationDataset(Dataset):
    def __init__(self, images, masks, transforms=None):
        self.images = images
        self.masks = masks
        self.transforms = transforms

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        mask = self.masks[idx]
        
        # Preprocess the mask to convert grayscale values to class indices
        
        #this has already been done
        #mask = preprocess_mask(mask)

        # Apply transforms if any
        if self.transforms:
            image = self.transforms(image)
        
        image = torch.tensor(image, dtype=torch.float32).permute(2, 0, 1)  # HWC -> CHW
        # mask_tensor = torch.tensor(mask, dtype=torch.long)  # Masks should be long type for labels

        return image, torch.tensor(mask, dtype=torch.long)


In [21]:
class PanopticSegmentationDataset2(Dataset):
    def __init__(self, images, masks, feature_extractor):
        self.images = images
        self.masks = masks
        self.feature_extractor = feature_extractor

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        mask = self.masks[idx]

        # Use the feature extractor
        encoded_inputs = self.feature_extractor(
            images=image,
            segmentation_maps=mask,
            return_tensors="pt"
        )

        # Extract pixel values and labels
        pixel_values = encoded_inputs['pixel_values']  # Shape: [channels, 512, 512]
        labels = encoded_inputs['labels']  # Shape: [512, 512]

        return pixel_values, labels.long()


In [22]:
num_classes = 14

def compute_class_weights(masks, num_classes):
    # Flatten all masks to a 1D array
    all_labels = np.concatenate([mask.flatten() for mask in masks])
    class_counts = np.bincount(all_labels, minlength=num_classes)
    total_counts = class_counts.sum()
    class_weights = total_counts / (num_classes * class_counts + 1e-6)
    return class_weights

class_weights = compute_class_weights(y_train, num_classes)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(DEVICE)


In [23]:
from transformers import SegformerForSemanticSegmentation
from transformers import SegformerFeatureExtractor

num_classes = 14

model = SegformerForSemanticSegmentation.from_pretrained(
    'nvidia/segformer-b0-finetuned-ade-512-512',
    num_labels=num_classes,
    ignore_mismatched_sizes = True
)



feature_extractor = SegformerFeatureExtractor.from_pretrained(
    'nvidia/segformer-b0-finetuned-ade-512-512',
    reduce_labels=False,
    size=512  # Resize images and masks to 512x512
)


Some weights of SegformerForSemanticSegmentation were not initialized from the model checkpoint at nvidia/segformer-b0-finetuned-ade-512-512 and are newly initialized because the shapes did not match:
- decode_head.classifier.bias: found shape torch.Size([150]) in the checkpoint and torch.Size([14]) in the model instantiated
- decode_head.classifier.weight: found shape torch.Size([150, 256, 1, 1]) in the checkpoint and torch.Size([14, 256, 1, 1]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [24]:
from torch.optim import Adam
from torch.utils.data import DataLoader
import torch.nn.functional as F

# Hyperparameters
batch_size = 4
num_epochs = 20
learning_rate = 1e-4
num_classes = 14
epochs = 10
size = 2

# Create DataLoaders
# train_dataset = PanopticSegmentationDataset(X_train, y_train)
# val_dataset = PanopticSegmentationDataset(X_val, y_val)

train_dataset = PanopticSegmentationDataset(X_train, y_train, feature_extractor)
val_dataset = PanopticSegmentationDataset(X_val, y_val, feature_extractor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Initialize optimizer
optimizer = Adam(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss(weight=class_weights_tensor, ignore_index=255)



In [25]:
# IoU calculation function
def calculate_iou(pred, target, num_classes):
    ious = []
    pred = pred.view(-1)  # Flatten
    target = target.view(-1)

    for cls in range(num_classes):
        pred_inds = pred == cls
        target_inds = target == cls

        intersection = (pred_inds & target_inds).sum().float()
        union = (pred_inds | target_inds).sum().float()

        if union == 0:
            ious.append(float('nan'))  # No ground truth
        else:
            ious.append((intersection / union).item())

    # Return mean IoU excluding NaN values
    return torch.tensor(ious).nanmean().item()


In [26]:
import torch.nn.functional as F
from tqdm import tqdm

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    total_iou = 0
    num_batches = 0

    # Training loop
    for batch in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{num_epochs}"):
        optimizer.zero_grad()
        images, labels = batch

        # Move data to the device
        images = images.to(DEVICE)
        labels = labels.to(DEVICE)

        # Forward pass
        outputs = model(pixel_values=images)
        logits = outputs.logits  # Shape: [batch_size, num_classes, H', W']

        # Upsample logits to match the size of labels
        logits = F.interpolate(
            logits, size=labels.shape[-2:], mode='bilinear', align_corners=False
        )  # Now logits shape is [batch_size, num_classes, H, W]

        # Compute loss
        loss = criterion(logits, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Calculate per-batch IoU
        preds = logits.argmax(dim=1)  # Shape: [batch_size, H, W]
        batch_iou = calculate_iou(preds, labels, num_classes)
        total_iou += batch_iou
        num_batches += 1

        # Optionally, print batch metrics
        # print(f"Batch Loss: {loss.item():.4f}, Batch IoU: {batch_iou:.4f}")

    avg_loss = total_loss / num_batches
    avg_iou = total_iou / num_batches
    print(f"Epoch [{epoch+1}/{num_epochs}] Training Loss: {avg_loss:.4f}, Training IoU: {avg_iou:.4f}")

    # Validation loop
    model.eval()
    val_total_loss = 0
    val_total_iou = 0
    val_num_batches = 0
    with torch.no_grad():
        for batch in tqdm(val_loader, desc=f"Validation Epoch {epoch+1}/{num_epochs}"):
            images, labels = batch
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)

            outputs = model(pixel_values=images)
            logits = outputs.logits

            # Upsample logits to match the size of labels
            logits = F.interpolate(
                logits, size=labels.shape[-2:], mode='bilinear', align_corners=False
            )
            loss = criterion(logits, labels)

            val_total_loss += loss.item()

            preds = logits.argmax(dim=1)
            batch_iou = calculate_iou(preds, labels, num_classes)
            val_total_iou += batch_iou
            val_num_batches += 1

    val_avg_loss = val_total_loss / val_num_batches
    val_avg_iou = val_total_iou / val_num_batches
    print(f"Epoch [{epoch+1}/{num_epochs}] Validation Loss: {val_avg_loss:.4f}, Validation IoU: {val_avg_iou:.4f}")


Training Epoch 1/20:   0%|          | 0/400 [00:00<?, ?it/s]


ValueError: could not determine the shape of object type 'BatchFeature'

Some weights of SegformerForSemanticSegmentation were not initialized from the model checkpoint at nvidia/segformer-b0-finetuned-ade-512-512 and are newly initialized because the shapes did not match:
- decode_head.classifier.bias: found shape torch.Size([150]) in the checkpoint and torch.Size([14]) in the model instantiated
- decode_head.classifier.weight: found shape torch.Size([150, 256, 1, 1]) in the checkpoint and torch.Size([14, 256, 1, 1]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [38]:
import torch
import torch.nn.functional as F
from tqdm import tqdm

# IoU calculation
def calculate_iou(pred, target, num_classes):
    ious = []
    pred = pred.view(-1)  # Flatten the prediction and target tensors
    target = target.view(-1)

    for cls in range(0, num_classes):  # Loop over all classes
        pred_inds = (pred == cls)
        target_inds = (target == cls)

        intersection = (pred_inds[target_inds]).sum().float()
        union = pred_inds.sum().float() + target_inds.sum().float() - intersection

        if union == 0:
            ious.append(float('nan'))  # If no ground truth, do not include this class
        else:
            ious.append(intersection / union)

    return torch.tensor(ious).mean().item()  # Return the mean IoU


In [45]:
def train(model, train_loader, val_loader, num_epochs, num_classes, optimizer):
    model.train()

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")

        running_loss = 0.0
        total_iou = 0.0

        # Training phase
        model.train()
        with tqdm(total=len(train_loader), desc="Training", unit="batch") as pbar:
            for batch_idx, (images, masks) in enumerate(train_loader):
                optimizer.zero_grad()

                # Forward pass
                outputs = model(images)

                # Resize the masks to match the output shape
                logits_height, logits_width = outputs.logits.shape[2], outputs.logits.shape[3]
                resized_masks = F.interpolate(masks.unsqueeze(1).float(), size=(logits_height, logits_width), mode='nearest').squeeze(1).long()

                # Calculate loss
                loss = F.cross_entropy(outputs.logits, resized_masks)
                loss.backward()
                optimizer.step()

                # Update running loss
                running_loss += loss.item()

                # Calculate IoU for this batch
                preds = torch.argmax(outputs.logits, dim=1)
                batch_iou = calculate_iou(preds, resized_masks, num_classes)
                total_iou += batch_iou

                # Update progress bar with loss and IoU
                pbar.set_postfix({'loss': running_loss / (batch_idx + 1), 'iou': total_iou / (batch_idx + 1)})
                pbar.update(1)

        # End of epoch
        avg_train_loss = running_loss / len(train_loader)
        avg_train_iou = total_iou / len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs} - Avg Training Loss: {avg_train_loss:.4f}, Avg IoU: {avg_train_iou:.4f}")

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_iou = 0.0
        with torch.no_grad():
            with tqdm(total=len(val_loader), desc="Validating", unit="batch") as pbar:
                for batch_idx, (images, masks) in enumerate(val_loader):
                    outputs = model(images)

                    # Resize masks for validation
                    resized_masks = F.interpolate(masks.unsqueeze(1).float(), size=(logits_height, logits_width), mode='nearest').squeeze(1).long()

                    # Calculate validation loss
                    loss = F.cross_entropy(outputs.logits, resized_masks)
                    val_loss += loss.item()

                    # Calculate IoU for validation batch
                    preds = torch.argmax(outputs.logits, dim=1)
                    batch_iou = calculate_iou(preds, resized_masks, num_classes)
                    val_iou += batch_iou

                    # Update progress bar
                    pbar.set_postfix({'val_loss': val_loss / (batch_idx + 1), 'val_iou': val_iou / (batch_idx + 1)})
                    pbar.update(1)

        avg_val_loss = val_loss / len(val_loader)
        avg_val_iou = val_iou / len(val_loader)
        print(f"Epoch {epoch+1} - Avg Validation Loss: {avg_val_loss:.4f}, Avg IoU: {avg_val_iou:.4f}")


In [46]:
train(model, train_loader, val_loader, epochs, num_classes, optimizer)



Epoch 1/10


Training:  16%|█▌        | 62/400 [11:16<1:01:27, 10.91s/batch, loss=2.71, iou=0.00165]


KeyboardInterrupt: 

In [5]:
def evaluate(model, val_loader):
    model.eval()
    iou_scores = []

    with torch.no_grad():
        for images, masks in val_loader:
            outputs = model(images)
            preds = torch.argmax(outputs.logits, dim=1)
            
            # Calculate metrics like IoU for evaluation
            iou = compute_iou(preds, masks)  # Implement a suitable IoU function
            iou_scores.append(iou)
    
    mean_iou = np.mean(iou_scores)
    print(f"Mean IoU: {mean_iou:.4f}")

evaluate(model, val_loader)


RuntimeError: Given groups=1, weight of size [32, 3, 7, 7], expected input[4, 1280, 640, 3] to have 3 channels, but got 1280 channels instead