In [None]:
# Install necessary libraries for the project
!pip install -q "pytorch-lightning<2.0.0"
!pip install -q transformers datasets
!pip install -q huggingface_hub evaluate
!pip install -q albumentations

In [10]:
# Import required libraries
import pytorch_lightning as pl
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from transformers import SegformerImageProcessor, SegformerForSemanticSegmentation
from huggingface_hub import Repository
from evaluate import combine,load
# Import required libraries
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
# Import required libraries
import os
from PIL import Image
# Import required libraries
import numpy as np
# Import required libraries
import random
# Import required libraries
import albumentations as A
from albumentations.pytorch import ToTensorV2
from huggingface_hub import HfApi,hf_hub_download
import json
from torchmetrics import JaccardIndex,Dice
from collections import namedtuple

In [28]:
class SegformerFinetuner(pl.LightningModule):
    """
    A PyTorch Lightning Module for fine-tuning the SegFormer model.
    Args:
        id2label (dict): Mapping of class IDs to labels.
        train_dataloader (DataLoader): Dataloader for training data.
        validation_dataloader (DataLoader): Dataloader for validation data.
        test_dataloader (DataLoader): Dataloader for test data.
        metrics_interval (int): Interval for logging metrics during training.
    """
    def __init__(self, id2label, train_dataloader=None, validation_dataloader=None, test_dataloader=None, metrics_interval=100):
        """
        Initializes the class with the provided arguments.
        """
        super(SegformerFinetuner, self).__init__()
        self.id2label = id2label
        self.metrics_interval = metrics_interval
        self.train_dl = train_dataloader
        self.val_dl = validation_dataloader
        self.test_dl = test_dataloader

        self.num_classes = len(id2label.keys())
        self.label2id = {v: k for k, v in self.id2label.items()}
        self.ignore_index = 0  # Set the ignore index to 0

        self.model = SegformerForSemanticSegmentation.from_pretrained(
            "nvidia/segformer-b0-finetuned-ade-512-512",
            return_dict=False,
            num_labels=self.num_classes,
            id2label=self.id2label,
            label2id=self.label2id,
            ignore_mismatched_sizes=True
        )

        self.train_mean_iou = JaccardIndex(task='multiclass', num_classes=self.num_classes, ignore_index=0)
        self.train_dice = Dice(average='micro', num_classes=self.num_classes, ignore_index=0)

        self.val_mean_iou = JaccardIndex(task='multiclass', num_classes=self.num_classes, ignore_index=0)
        self.val_dice = Dice(average='micro', num_classes=self.num_classes, ignore_index=0)

        self.test_mean_iou = JaccardIndex(task='multiclass', num_classes=self.num_classes, ignore_index=0)
        self.test_dice = Dice(average='micro', num_classes=self.num_classes, ignore_index=0)

    def forward(self, images, masks):
        outputs = self.model(pixel_values=images, labels=masks)
        return(outputs)

    def forward_pass(self, images, masks):
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]

        upsampled_logits = nn.functional.interpolate(
            logits,
            size=masks.shape[-2:],
            mode="bilinear",
            align_corners=False
        )
        predicted = upsampled_logits.argmax(dim=1)

        return loss, predicted

    def training_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        loss, predicted = self.forward_pass(images, masks)

        # Update torchmetrics
        self.train_mean_iou(predicted, masks)
        self.train_dice(predicted, masks)

        if batch_nb % self.metrics_interval == 0:
            iou = self.train_mean_iou.compute()
            dice = self.train_dice.compute()

            # Log metrics
            self.log('train_mean_iou', iou, on_step=True, on_epoch=True, prog_bar=True)
            self.log('train_dice', dice, on_step=True, on_epoch=True, prog_bar=True)

            # Reset metrics after logging
            self.train_mean_iou.reset()
            self.train_dice.reset()

        return {'loss': loss}

    def validation_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        loss, predicted = self.forward_pass(images, masks)

        # Update torchmetrics
        self.val_mean_iou(predicted, masks)
        self.val_dice(predicted, masks)

        return {'val_loss': loss}

    def validation_epoch_end(self, outputs):
        iou = self.val_mean_iou.compute()
        dice = self.val_dice.compute()
        avg_val_loss = torch.stack([x["val_loss"] for x in outputs]).mean()

        # Log metrics
        self.log('val_mean_iou', iou, on_epoch=True, prog_bar=True)
        self.log('val_dice', dice, on_epoch=True, prog_bar=True)
        self.log('val_loss', avg_val_loss, on_epoch=True, prog_bar=True)

        # Reset metrics after logging
        self.val_mean_iou.reset()
        self.val_dice.reset()

    def test_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        loss, predicted = self.forward_pass(images, masks)

        # Add batch to combined metrics
        self.test_mean_iou(predicted, masks)
        self.test_dice(predicted, masks)

        return {'test_loss': loss}

    def test_epoch_end(self, outputs):
        iou = self.test_mean_iou.compute()
        dice = self.test_dice.compute()
        avg_test_loss = torch.stack([x["test_loss"] for x in outputs]).mean()

        # Log test metrics
        self.log('test_mean_iou', iou, on_epoch=True, prog_bar=True)
        self.log('test_dice', dice, on_epoch=True, prog_bar=True)
        self.log('test_loss', avg_test_loss, on_epoch=True, prog_bar=True)

        # Reset metrics after logging
        self.test_mean_iou.reset()
        self.test_dice.reset()


    def configure_optimizers(self):
        return torch.optim.Adam([p for p in self.parameters() if p.requires_grad], lr=2e-05, eps=1e-08)

    def train_dataloader(self):
        return self.train_dl

    def val_dataloader(self):
        return self.val_dl

    def test_dataloader(self):
        return self.test_dl


In [24]:
# SemanticSegmentationDataset class handles data loading and augmentation for semantic segmentation tasks
class SemanticSegmentationDataset(Dataset):
    """
    Dataset class for semantic segmentation tasks.
    Args:
        data (list): List of dictionaries with image and mask file paths.
        feature_extractor: Feature extractor for preprocessing images.
        augment (bool): If True, apply data augmentation.
    """
    def __init__(self, data, feature_extractor=None, transform=None):
        """
        Initializes the class with the provided arguments.
        """
        self.data = data  # Data loaded from Hugging Face (list of dictionaries)
        self.feature_extractor = feature_extractor
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]['image'].convert("RGB")  # Get image path from the loaded data
        mask = self.data[idx]['mask'].convert("L")  # Get mask path from the loaded data

        image = np.array(image)
        mask = np.array(mask)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        if self.feature_extractor:
            encoded_inputs = self.feature_extractor(image, mask, return_tensors="pt")

        for k,v in encoded_inputs.items():
          encoded_inputs[k].squeeze_() # remove batch dimension

        return encoded_inputs


In [None]:
class ImageTransform:
    def __init__(self):
        # Initialize the transformation pipeline using Albumentations
        self.transform = A.Compose([
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.RandomRotate90(p=0.5),
            A.OneOf([
                A.RandomBrightnessContrast(),
                A.RandomGamma(),
            ], p=0.3),
            A.OneOf([
                A.ElasticTransform(),
                A.GridDistortion(),
                A.OpticalDistortion(),
            ], p=0.3),
            ToTensorV2()
        ])

    def __call__(self, image, mask):
        # Apply the transformation to the image and mask
        augmented = self.transform(image=image, mask=mask)
        return augmented['image'], augmented['mask']

In [22]:
class PushToHubCallback(pl.Callback):
    def __init__(self, repo_id, hf_model, token=None, check_every_n_epochs=5):
        super(PushToHubCallback, self).__init__()
        self.repo_id = repo_id
        self.hf_model = hf_model
        self.token = token
        self.check_every_n_epochs = check_every_n_epochs

    def push_model(self, trainer):
        try:
            print("Pushing Hugging Face model...")
            # Use the checkpoint directory
            checkpoint_path = trainer.checkpoint_callback.best_model_path
            self.hf_model.load_state_dict(torch.load(checkpoint_path)['state_dict'])
            # Directory to save the Hugging Face model
            model_dir = os.path.join("/workspace", "hf_model")
            os.makedirs(model_dir, exist_ok=True)

            self.hf_model.model.save_pretrained(model_dir)

            api = HfApi()
            api.upload_folder(
                folder_path=model_dir,
                repo_id=self.repo_id,
                repo_type="model",
                token=self.token
            )
            print(f"Model pushed to {self.repo_id}")
        except Exception as e:
            print(f"Error saving or pushing Hugging Face model: {e}")

    def on_train_epoch_end(self, trainer, pl_module):
        # Push the model every n epochs
        if (trainer.current_epoch + 1) % self.check_every_n_epochs == 0:
            print(f"Epoch {trainer.current_epoch + 1}: Pushing the checkpointed model.")
            self.push_model(trainer)

    def on_train_end(self, trainer, pl_module):
        # Final push at the end of training
        self.push_model(trainer)

In [None]:
dataset = load_dataset('nave1616/landcover-urban-climate') #nave1616/building-urban-climate for building segmentation

train_data = dataset['train']
validation_data = dataset['validation']
test_data = dataset['test']

feature_extractor = SegformerImageProcessor.from_pretrained("nvidia/segformer-b0-finetuned-ade-512-512")
feature_extractor.do_reduce_labels = False
feature_extractor.size = 512


transform = ImageTransform()
small_train_data = train_data.select(range(10))
train_dataset = SemanticSegmentationDataset(
    data=small_train_data,  # Data loaded from Hugging Face
    feature_extractor=feature_extractor,
    transform=transform,
)

validation_dataset = SemanticSegmentationDataset(
    data=validation_data,  # Data loaded from Hugging Face
    feature_extractor=feature_extractor
)
test_dataset = SemanticSegmentationDataset(
    data=test_data,  # Data loaded from Hugging Face
    feature_extractor=feature_extractor
)
batch_size = 8
num_workers = os.cpu_count()

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,num_workers=num_workers)
validation_dataloader = DataLoader(validation_dataset, batch_size=batch_size,num_workers=num_workers)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size,num_workers=num_workers)

In [None]:
import numpy as np
from collections import namedtuple
import matplotlib.pyplot as plt
import cv2

# Define the LandCover class labels with specified colors
lc_labels = namedtuple('LandCoverClasses', ['name', 'label', 'color'])
# Real label in dataset, need to be mapped
lc_classes = [
    lc_labels('background', 1, (255, 255, 255)),# White
    lc_labels('building', 2, (255, 0, 0)),      # Red
    lc_labels('road', 3, (255, 255, 0)),        # Yellow
    lc_labels('water', 4, (0, 0, 255)),         # Blue
    lc_labels('barren', 5, (139, 69, 19)),      # Brown
    lc_labels('forest', 6, (0, 255, 0)),        # Green
    lc_labels('agriculture', 7, (0, 255, 255)), # Cyan
]
# Create the id2label mapping
lc_id2label = {cls.label: cls.name for cls in lc_classes}

# Convert train_id to color mapping
lc_label_id_to_color = [c.color for c in lc_classes]

# real label in dataset, need to be mapped
building_labels = namedtuple('BuildingDataset', ['name', 'label', 'color'])
building_class = [
    building_labels('background', 1, (255, 255, 255)),# White
    building_labels('building', 255, (255, 0, 0)),      # Black
]

# Create the id2label mapping
building_id2label = {cls.label: cls.name for cls in building_class}

# Convert train_id to color mapping
building_label_id_to_color = [c.color for c in building_class]


# Function to apply color to a mask
def apply_color_map(mask, classes):
    mask_color = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
    unique_values = np.unique(mask)
    for label in classes:
        if label.label in unique_values:
            mask_color[mask == label.label] = label.color
    return mask_color

# Define the function to visualize the real image, ground truth, and prediction
def view_predict(image_path, mask_path, prediction_path, classes):
    # Read the image, mask, and prediction
    image = cv2.imread(image_path)
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    prediction = cv2.imread(prediction_path, cv2.IMREAD_GRAYSCALE)

    # Raise an error if any of the files are not loaded
    if image is None:
        raise FileNotFoundError(f"Error: Unable to read the image at {image_path}")
    if mask is None:
        raise FileNotFoundError(f"Error: Unable to read the mask image at {mask_path}")
    if prediction is None:
        raise FileNotFoundError(f"Error: Unable to read the prediction image at {prediction_path}")

    # Get unique values in the mask and prediction
    unique_mask_values = np.unique(mask)
    unique_prediction_values = np.unique(prediction)

    # Apply the color mapping to the ground truth mask and prediction
    mask_color = apply_color_map(mask, classes)
    prediction_color = apply_color_map(prediction, classes)

    # Display the real image, ground truth, and prediction side by side
    plt.figure(figsize=(16, 8))

    # Real Image
    plt.subplot(1, 3, 1)
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))  # Convert BGR to RGB for display
    plt.title("Real Image")
    plt.axis('off')

    # Ground Truth (Mask)
    plt.subplot(1, 3, 2)
    plt.imshow(mask_color)
    plt.title("Ground Truth")
    plt.axis('off')

    # Prediction
    plt.subplot(1, 3, 3)
    plt.imshow(prediction_color)
    plt.title("Prediction")
    plt.axis('off')

    # Show the plot
    plt.tight_layout()
    plt.show()

# Function to reduce labels in a dataset
def reduce_labels(dataset):
    """
    Apply the 'reduce labels by 1' transformation to the entire dataset.
    
    Args:
        dataset (Dataset): The dataset containing image and mask pairs.
    
    Returns:
        transformed_data (list): A list of dictionaries with transformed images and masks.
    """
    transformed_data = []
    
    for data in dataset:
        image = data['image']
        mask = np.array(data['mask'])  # Convert mask to numpy array
        
        # Reduce mask labels by 1
        reduced_mask = np.copy(mask)
        reduced_mask[mask > 0] -= 1  # Reduce all valid classes by 1
        
        # Store the transformed data
        transformed_data.append({'image': image, 'mask': reduced_mask})
    
    return transformed_data

# Function to map 255 to 0 in a binary dataset
def map_binary_dataset(dataset):
    """
    Apply the 'map 255 to 0' transformation to the entire dataset.
    
    Args:
        dataset (Dataset): The dataset containing image and mask pairs.
    
    Returns:
        transformed_data (list): A list of dictionaries with transformed images and masks.
    """
    transformed_data = []
    
    for data in dataset:
        image = data['image']
        mask = np.array(data['mask'])  # Convert mask to numpy array
        
        # Map 255 to 0 in the mask
        mapped_mask = np.copy(mask)
        mapped_mask[mapped_mask == 255] = 0  # Map 255 to 0
        
        # Store the transformed data
        transformed_data.append({'image': image, 'mask': mapped_mask})
    
    return transformed_data


In [None]:

id2label = lc_id2label #building_id2label for building dataset

segformer_finetuner = SegformerFinetuner(
    id2label,
    train_dataloader=train_dataloader,
    validation_dataloader=validation_dataloader,
    test_dataloader=test_dataloader,
    metrics_interval=10,
)

In [None]:
torch.set_float32_matmul_precision('medium')

early_stop_callback = EarlyStopping(
    monitor="val_loss",  # Monitor loss for early stopping
    min_delta=0.00,
    patience=10,
    verbose=False,
    mode="min",  # Stop when IoU stops improving
)

checkpoint_callback = ModelCheckpoint(
    dirpath="checkpoints/",
    filename="{epoch}-{val_iou:.2f}",  # Save based on loss
    save_top_k=1,  # Save the best model based on IoU
    monitor="val_loss",  # Monitor IoU for checkpointing
    mode="min",  # Save model with the highest IoU
)

accelerator = 'gpu' if torch.cuda.is_available() else 'cpu'

trainer = pl.Trainer(
    accelerator=accelerator,  # Use GPU if available, otherwise CPU
    devices=1 if accelerator == 'gpu' else None,  # Use one GPU or default to CPU
    callbacks=[early_stop_callback, checkpoint_callback],  # All callbacks
    max_epochs=50,  # Maximum number of epochs to train
    val_check_interval=len(train_dataloader),  # Perform validation after each epoch
    log_every_n_steps=10,  # Log more frequently to monitor training progress
)

trainer.fit(segformer_finetuner)

In [None]:
def evaluate_best_model(trainer, test_dataloader):
    # Run the test loop using the best checkpoint
    test_results = trainer.test(ckpt_path="best", dataloaders=test_dataloader)
    return test_results

# Example usage
# Assuming `trainer` is the trained model with the best checkpoint already saved
test_results = evaluate_best_model(trainer, test_dataloader)
print("Test Results:", test_results)