In [12]:
import os
import numpy as np
from PIL import Image
import torch as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import evaluate
import pytorch_lightning as pl
from transformers import SegformerForSemanticSegmentation, SegformerImageProcessor
import matplotlib.pyplot as plt
from transformers import SegformerFeatureExtractor
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoint
import multiprocessing

In [24]:

# === Configurations ===
TRAIN_IMAGE_DIR = 'C:/Users/DSBG-Public/segformer/dataset/train/images'
TRAIN_MASK_DIR = 'C:/Users/DSBG-Public/segformer/dataset/train/masks'
VAL_IMAGE_DIR = 'C:/Users/DSBG-Public/segformer/dataset/val/images'
VAL_MASK_DIR = 'C:/Users/DSBG-Public/segformer/dataset/val/masks'
TEST_IMAGE_DIR = 'C:/Users/DSBG-Public/segformer/dataset/test/images'
TEST_MASK_DIR = 'C:/Users/DSBG-Public/segformer/dataset/test/masks'
NUM_CLASSES = 2

# === Dataset ===
class SemanticSegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, feature_extractor, resize=(512, 512)):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.resize = resize
        self.feature_extractor = feature_extractor
        self.images = sorted(os.listdir(image_dir))
        self.masks = sorted(os.listdir(mask_dir))

    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.image_dir, self.images[idx])).convert("RGB")
        mask = Image.open(os.path.join(self.mask_dir, self.masks[idx])).convert("L")
        image = image.resize(self.resize)
        mask = mask.resize(self.resize)
        
        encoding = self.feature_extractor(images=image, return_tensors="pt")
        pixel_values = encoding['pixel_values'].squeeze(0)
        
        mask = np.array(mask)
        mask = (mask > 127).astype(np.uint8)
        mask = np.clip(mask, 0, NUM_CLASSES - 1)
        mask = nn.tensor(mask, dtype=nn.long)
        
        encoding['labels'] = mask
        encoding['pixel_values'] = pixel_values
        return encoding

# === PyTorch Lightning Module ===
class SegformerFinetuner(pl.LightningModule):

    def __init__(self, train_dataloader=None, val_dataloader=None, test_dataloader=None, metrics_interval=100):
        super(SegformerFinetuner, self).__init__()
        self.metrics_interval = metrics_interval
        self.train_dl = train_dataloader
        self.val_dl = val_dataloader
        self.test_dl = test_dataloader

        self.num_classes = NUM_CLASSES 

        self.model = SegformerForSemanticSegmentation.from_pretrained(
            "nvidia/segformer-b0-finetuned-ade-512-512", 
            return_dict=False, 
            num_labels=self.num_classes,
            ignore_mismatched_sizes=True,
        )

        self.train_mean_iou = evaluate.load("mean_iou")
        self.val_mean_iou = evaluate.load("mean_iou")
        self.test_mean_iou = evaluate.load("mean_iou")


    def forward(self, images, masks):
        return self.model(pixel_values=images, labels=masks)
    
    def training_step(self, batch, batch_idx):
        labels = batch["labels"]
        pixel_values = batch["pixel_values"]
        outputs = self.model(pixel_values, labels)
        loss, logits = outputs[0], outputs[1]
        # Resize labels to match logits spatial dimensions if necessary.
        upsampled_logits = F.interpolate(
            logits, 
            size=labels.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )

        predicted = upsampled_logits.argmax(dim=1)

        self.train_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(), 
            references=labels.detach().cpu().numpy()
        )
        if batch_idx % self.metrics_interval == 0:

            metrics = self.train_mean_iou.compute(
                num_labels=self.num_classes, 
                ignore_index=255, 
                reduce_labels=False,
            )
            
            metrics = {'loss': loss, "mean_iou": metrics["mean_iou"], "mean_accuracy": metrics["mean_accuracy"]}
            
            for k,v in metrics.items():
                self.log(k,v)
            
            return(metrics)
        else:
            return({'loss': loss})
    
    def validation_step(self, batch, batch_idx):
        labels = batch["labels"]
        pixel_values = batch["pixel_values"]
        outputs = self.model(pixel_values,labels)
        loss, logits = outputs[0], outputs[1]
        upsampled_logits = F.interpolate(
            logits, 
            size=labels.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)
        
        self.val_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(), 
            references=labels.detach().cpu().numpy()
        )
        
        return({'val_loss': loss})

    
    def test_step(self, batch, batch_idx):
        
        images, masks = batch['pixel_values'], batch['labels']
        
        outputs = self(images, masks)
        
        loss, logits = outputs[0], outputs[1]
        
        upsampled_logits = F.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)
        
        self.test_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(), 
            references=masks.detach().cpu().numpy()
        )
            
        return({'test_loss': loss})
    
    def configure_optimizers(self):
        return nn.optim.Adam([p for p in self.parameters() if p.requires_grad], lr=2e-05, eps=1e-08)
    def train_dataloader(self):
        return self.train_dl
    
    def val_dataloader(self):
        return self.val_dl
    
    def test_dataloader(self):
        return self.test_dl


In [18]:
feature_extractor = SegformerFeatureExtractor.from_pretrained("nvidia/segformer-b0-finetuned-ade-512-512")
feature_extractor.reduce_labels = False

train_dataset = SemanticSegmentationDataset(TRAIN_IMAGE_DIR,TRAIN_MASK_DIR, feature_extractor)
val_dataset = SemanticSegmentationDataset(VAL_IMAGE_DIR,VAL_MASK_DIR, feature_extractor)
test_dataset = SemanticSegmentationDataset(TEST_IMAGE_DIR,TEST_MASK_DIR, feature_extractor)

batch_size = 8
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

segformer_finetuner = SegformerFinetuner(
    train_dataloader=train_dataloader, 
    val_dataloader=val_dataloader, 
    test_dataloader=test_dataloader, 
    metrics_interval=10,
)

early_stop_callback = EarlyStopping(
    monitor="loss", 
    min_delta=0.00, 
    patience=3, 
    verbose=False, 
    mode="min",
)

checkpoint_callback = ModelCheckpoint(save_top_k=1, monitor="val_loss")

trainer = pl.Trainer(
    callbacks=[early_stop_callback],
    max_epochs=1,
    val_check_interval=len(train_dataloader),
)

trainer.fit(segformer_finetuner)

Some weights of SegformerForSemanticSegmentation were not initialized from the model checkpoint at nvidia/segformer-b0-finetuned-ade-512-512 and are newly initialized because the shapes did not match:
- decode_head.classifier.bias: found shape torch.Size([150]) in the checkpoint and torch.Size([2]) in the model instantiated
- decode_head.classifier.weight: found shape torch.Size([150, 256, 1, 1]) in the checkpoint and torch.Size([2, 256, 1, 1]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type                             | Params | Mode
------------------------------------------------------------------
0 | model | SegformerForSemanticSegmentation | 3.7 M  | eval
------------------------------------------------------------------

Epoch 0: 100%|██████████| 39/39 [00:24<00:00,  1.62it/s, v_num=4]          

`Trainer.fit` stopped: `max_epochs=1` reached.


Epoch 0: 100%|██████████| 39/39 [00:24<00:00,  1.62it/s, v_num=4]


In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/version_4

In [25]:
res = trainer.test(ckpt_path="best")

Restoring states from the checkpoint path at c:\Users\DSBG-Public\segformer\segformer-1\lightning_logs\version_4\checkpoints\epoch=0-step=39.ckpt
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Loaded model weights from the checkpoint at c:\Users\DSBG-Public\segformer\segformer-1\lightning_logs\version_4\checkpoints\epoch=0-step=39.ckpt


Testing: |          | 0/? [00:11<?, ?it/s]


FileNotFoundError: [Errno 2] No such file or directory: 'C:/Users/DSBG-Public/segformer/dataset/test/images\\Thumbs.db'

In [None]:
color_map = {
    0:(0,0,0),
    1:(255,0,0),
}

def prediction_to_vis(prediction):
    vis_shape = prediction.shape + (3,)
    vis = np.zeros(vis_shape)
    for i,c in color_map.items():
        vis[prediction == i] = color_map[i]
    return Image.fromarray(vis.astype(np.uint8))

for batch in test_dataloader:
    images, masks = batch['pixel_values'], batch['labels']
    outputs = segformer_finetuner.model(images, masks)
        
    loss, logits = outputs[0], outputs[1]

    upsampled_logits = nn.functional.interpolate(
        logits, 
        size=masks.shape[-2:], 
        mode="bilinear", 
        align_corners=False
    )

    predicted = upsampled_logits.argmax(dim=1).cpu().numpy()
    masks = masks.cpu().numpy()

from matplotlib import pyplot as plt
f, axarr = plt.subplots(predicted.shape[0],2)
for i in range(predicted.shape[0]):
    axarr[i,0].imshow(prediction_to_vis(predicted[i,:,:]))
    axarr[i,1].imshow(prediction_to_vis(masks[i,:,:]))
