### Importing images

In [1]:
import os, json
import subprocess, sys
import random
import math
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
from tqdm import tqdm
import copy
import cv2
from PIL import Image, ImageDraw

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import MobileViTImageProcessor, MobileViTForSemanticSegmentation

2025-05-09 13:35:44.420055: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746797744.614492      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746797744.669158      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [None]:
package_name = "evaluate"

try:
    __import__(package_name)
    print('already installed')
except ImportError:
    print(f"{package_name} is NOT installed! Installing now...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", package_name]);

evaluate is NOT installed! Installing now...


In [None]:
import wandb

WANDB_USER = "chri-project"  
WANDB_PROJECT = "ML4CV--assignment"
wandb.login(key='2b387b514b9fcec8902df2b863ae0646f56125d6')

In [None]:
!pip install segmentation-models-pytorch albumentations --no-deps

In [None]:
'''
#wget -c http://images.cocodataset.org/zips/train2017.zip -q
#unzip -q train2017.zip
#rm train2017.zip
#mv train2017 coco_images_train
'''

In [None]:
%%bash

wget -c http://images.cocodataset.org/annotations/annotations_trainval2017.zip -q
unzip -q annotations_trainval2017.zip
rm annotations_trainval2017.zip
mv annotations coco_annotations

wget -c http://images.cocodataset.org/zips/val2017.zip -q
unzip -q val2017.zip 
rm val2017.zip
mv val2017 coco_images_val

In [None]:
def fix_random(seed: int) -> None:
    """
    Fix all the possible sources of randomness.

    Args:
        seed: the seed to use.
    """
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

fix_random(seed=42)

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import evaluate

"""
Source: https://github.com/hendrycks/anomaly-seg/issues/15#issuecomment-890300278
"""
COLORS = np.array([
    [  0,   0,   0],  # unlabeled    =   0,
    [ 70,  70,  70],  # building     =   1,
    [190, 153, 153],  # fence        =   2, 
    [250, 170, 160],  # other        =   3,
    [220,  20,  60],  # pedestrian   =   4, 
    [153, 153, 153],  # pole         =   5,
    [157, 234,  50],  # road line    =   6, 
    [128,  64, 128],  # road         =   7,
    [244,  35, 232],  # sidewalk     =   8,
    [107, 142,  35],  # vegetation   =   9, 
    [  0,   0, 142],  # car          =  10,
    [102, 102, 156],  # wall         =  11, 
    [220, 220,   0],  # traffic sign =  12,
    [ 60, 250, 240],  # anomaly      =  13,
]) 

## TODO: Show the imbalance of the classes, if any. In this way you can justify that there are errors
## TODO: create cocodataset in which you order each image with its own annotation, in this way you can remove the for loop inside random_anomaly_injection function

In [None]:
class StreetHazardsDataset(Dataset):
    def __init__(self, odgt_file, coco_path_annotations = None, coco_path_images=None, image_resize=None, augment_both=None, augment_images=None, inject_anomalies = False):
        """
        Args:
            odgt_file (str): Path to the .odgt file (train, val, or test).
            transform (callable, optional): Transformations to apply to images and masks.
        """

        self.augment_both = augment_both
        self.augment_images = augment_images
        self.image_resize = image_resize
        self.inject_anomalies = inject_anomalies
        self.coco_path_images = coco_path_images
        
        if self.inject_anomalies:
            with open(coco_path_annotations, 'r') as file:
                    self.coco_data = json.load(file)

        # Load the .odgt file
        with open(odgt_file, "r") as f:
            odgt_data = json.load(f)

        self.paths = [
            {
                "image": os.path.join(Path(odgt_file).parent, data["fpath_img"]),
                "labels": os.path.join(Path(odgt_file).parent, data["fpath_segm"]),
            }
            for data in odgt_data 
        ]

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):

        image = Image.open(self.paths[idx]["image"]).convert("RGB")
        labels = Image.open(self.paths[idx]["labels"])

        if self.image_resize:
            image = transforms.Resize(self.image_resize, transforms.InterpolationMode.BILINEAR)(image)
            labels = transforms.Resize(self.image_resize, transforms.InterpolationMode.NEAREST)(labels)
            
        if self.augment_both:
            image, labels = self.augment_both(image, labels)

        if self.inject_anomalies:
            
            image, labels = random_anomaly_injection(image, labels, self.coco_data, self.coco_path_images)
            

        #to_tensor
        image = transforms.ToTensor()(image)
        labels = torch.as_tensor(transforms.functional.pil_to_tensor(labels), dtype=torch.int64) - 1
        labels = labels.squeeze(0)
        
        if self.augment_images:
            image = self.augment_images(image)

        return {'image' : image, 'labels' : labels}

In [None]:
def visualize_annotation(annotation_img: np.ndarray|torch.Tensor, ax=None):
    """
    Adapted from https://github.com/CVLAB-Unibo/ml4cv-assignment/blob/master/utils/visualize.py
    """
    if ax is None: ax = plt.gca()
    annotation_img = np.asarray(annotation_img)
    img_new = np.zeros((*annotation_img.shape, 3))

    for index, color in enumerate(COLORS):
        img_new[annotation_img == index] = color

    ax.imshow(img_new / 255.0)
    ax.set_xticks([])
    ax.set_yticks([])
    #wandb.log({"examples": [wandb.Image(img_new / 255.0)]})
    
    

def visualize_scene(image: np.ndarray|torch.Tensor, ax=None):
    if ax is None: ax = plt.gca()
    image = np.asarray(image)
    ax.imshow(np.moveaxis(image, 0, -1))
    ax.set_xticks([])
    ax.set_yticks([])

In [None]:
'''class CocoDataset(Dataset):
    def __init__(self, annotations_path, images_path):
        """
        Args:
            odgt_file (str): Path to the .odgt file (train, val, or test).
            transform (callable, optional): Transformations to apply to images and masks.
        """

        self.images_path = images_path
        
        with open(annotations_path, "r") as f:
            self.coco_data = json.load(f)

        
    def __len__(self):
        return len(os.listdir(self.images_path))

    def __getitem__(self, idx):

        annotations = self.coco_data['annotations'][idx]
        image_id = annotations['image_id']
        
        img_segmentation = annotations['segmentation'][0]
        bbox = list(map(int, annotations['bbox']))
        
        for img in self.coco_data['images']:
            if image_id == img['id']:
                name = img['file_name']
                im = cv2.imread(os.path.join(self.images_path, name))
                
                polygon = np.array([[int(img_segmentation[i]), int(img_segmentation[i + 1])] for i in range(0, len(img_segmentation), 2)])
                mask = np.zeros((im.shape[0], im.shape[1]))
                cv2.fillConvexPoly(mask, polygon, 1)
        
                mask = mask > 0
                out = np.zeros_like(im)
                out[mask] = im[mask]
                x, y, w, h = bbox
                image_out = out[y:y+h, x:x+w]
                #plt.imshow(out)
        
                return image_out'''

In [None]:
def random_anomaly_injection(hazard_image, hazard_label, coco_data, coco_image_path, overlay_scale_range = (0.1, 0.4)):
    
    # Load COCO annotation data
    
    idx = np.random.randint(0, 36781)
    
    annotations = coco_data['annotations'][idx]
    '''categories = {
        c['id']: {'supercategory': c['supercategory'], 'name': c['name']}
        for c in coco_data['categories']
    }
    print(categories[annotations['category_id']])'''
    
    image_id = annotations['image_id']
    
    while (type(annotations['segmentation']) != list):
        idx = np.random.randint(0, 36781)
        annotations = coco_data['annotations'][idx]
        image_id = annotations['image_id']
    
    img_segmentation = annotations['segmentation'][0]

    hazard_img = np.array(hazard_image.copy())
    hazard_lbl = np.array(hazard_label.copy())
    hazard_height, hazard_width, _ = hazard_img.shape

    #randomly compute the scale of the overlay
    overlay_scale  = random.uniform(*overlay_scale_range)
    overlay_size = (int(hazard_height*overlay_scale), int(hazard_width*overlay_scale))
    
    for img_info in coco_data['images']:
        if image_id == img_info['id']:
            name = img_info['file_name']
            coco_img = cv2.imread(f"{coco_image_path}/{name}")
            original_height, original_width = coco_img.shape[:2]
    
            # Create a binary mask from the segmentation polygon
            polygon = np.array(img_segmentation).reshape((-1, 2)).astype(np.int32)
            mask = np.zeros((original_height, original_width), dtype=np.uint8)
            cv2.fillPoly(mask, [polygon], 255)
    
            # Crop object and mask
            x, y, w, h = cv2.boundingRect(mask)
            segmented_object = coco_img[y:y + h, x:x + w]
            try:
                segmented_object = cv2.cvtColor(segmented_object, cv2.COLOR_BGR2RGB)
            except:
                break
            mask_cropped = mask[y:y + h, x:x + w]
        
            # Resize object and mask
            if segmented_object.size > 0:
                resized_object = cv2.resize(segmented_object, overlay_size, interpolation=cv2.INTER_LINEAR)
                resized_mask = cv2.resize(mask_cropped, overlay_size, interpolation=cv2.INTER_NEAREST)
                resized_mask_binary = resized_mask > 0
    
            # Random region in StreetHazards image to insert the anomaly
            roi_x = np.random.randint(0, hazard_width - overlay_size[0])
            roi_y = np.random.randint(0, hazard_height - overlay_size[1])
            roi = hazard_img[roi_y:roi_y + overlay_size[1], roi_x:roi_x + overlay_size[0]].copy()
    
            # Create a mask with the same number of channels as the ROI
            resized_mask_channels = np.expand_dims(resized_mask_binary, axis=-1).astype(float)
            if roi.ndim == 3:
                resized_mask_channels = np.repeat(resized_mask_channels, 3, axis=-1)
            elif roi.ndim == 2:
                resized_mask_channels = np.expand_dims(resized_mask_channels, axis=-1).astype(float) # For grayscale
    
            # Ensure resized_object has the same number of channels as roi
            if resized_object.ndim == 2 and roi.ndim == 3:
                resized_object_expanded = cv2.cvtColor(resized_object, cv2.COLOR_GRAY2BGR).astype(float) / 255.0
            elif resized_object.ndim == 3:
                resized_object = resized_object.astype(float) / 255.0
            else:
                print("Warning: Segmented object has incompatible dimensions for overlay.")
                continue
    
            roi = roi.astype(float) / 255.0
    
            # Blend the ROI with the resized segmented object using the mask
            masked_roi = (roi * (1 - resized_mask_channels))
            overlaid_part = (resized_object * resized_mask_channels)
            blended_roi = cv2.addWeighted(masked_roi, 1, overlaid_part, 1, 0)
            hazard_img[roi_y:roi_y + overlay_size[1], roi_x:roi_x + overlay_size[0]] = (blended_roi * 255).astype(np.uint8)

            # Apply the anomaly also in the label
            hazard_lbl[roi_y:roi_y + overlay_size[1], roi_x:roi_x + overlay_size[0]][resized_mask_binary] = 14

            hazard_img, hazard_lbl = Image.fromarray(hazard_img), Image.fromarray(hazard_lbl)
            return hazard_img, hazard_lbl

In [None]:
def compute_mean_std(loader):
    mean = 0.0
    std = 0.0
    nb_samples = 0

    for batch in tqdm(loader):
        images = batch["image"]
        batch_samples = images.size(0)
        images = images.view(batch_samples, images.size(1), -1) 
    
        mean += images.mean(2).sum(0)  
        std += images.std(2).sum(0)
        nb_samples += batch_samples 
        del batch

    mean /= nb_samples
    std /= nb_samples
    return mean, std

In [None]:
image_resize = (512, 896)

train_dataset = StreetHazardsDataset(
    odgt_file="/kaggle/input/ml4cv-data/streethazards_train/train/train.odgt",
    image_resize = image_resize,
    augment_both=None,
    augment_images=None,
    inject_anomalies=False
)

In [None]:
#compute mean and std on resized images should give better results, if the resize is the same used in the training
train_dl = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=2)
mean_streethazards, std_streethazards = compute_mean_std(train_dl)
print(mean_streethazards, std_streethazards)

In [None]:
mean_imagenet, std_imagenet = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

In [None]:
augment_both = transforms.v2.Compose([
    #transforms.v2.RandomCrop(image_resize_scale),
    transforms.v2.RandomResizedCrop(image_resize, interpolation=transforms.InterpolationMode.NEAREST),
    transforms.v2.RandomHorizontalFlip(),
])

augment_images = transforms.Compose([
    transforms.Normalize(mean = mean_imagenet, std = std_imagenet),
    #transforms.Normalize(mean = mean_streethazards, std = std_streethazards),
    #transforms.RandomErasing(scale=(0.02, 0.15))
])

#only apply resize, to_tensor and normalization (computed on train)
augment_val_test = transforms.Normalize(
    #mean = mean_streethazards, std = std_streethazards
    mean = mean_imagenet, std = std_imagenet
)

#Create dataset
train_dataset = StreetHazardsDataset(
    odgt_file="/kaggle/input/ml4cv-data/streethazards_train/train/train.odgt",
    image_resize = None,
    augment_both=augment_both,
    augment_images=augment_images,
    inject_anomalies = True,
    coco_path_annotations = "/kaggle/working/coco_annotations/instances_val2017.json",
    coco_path_images = "/kaggle/working/coco_images_val",
)

val_dataset = StreetHazardsDataset(
    odgt_file="/kaggle/input/ml4cv-data/streethazards_train/train/validation.odgt",
    image_resize = image_resize,
    augment_both=None,
    augment_images=augment_val_test,
    inject_anomalies = False,
    coco_path_annotations = "/kaggle/working/coco_annotations/instances_val2017.json",
    coco_path_images = "/kaggle/working/coco_images_val",
)

test_dataset = StreetHazardsDataset(
    odgt_file="/kaggle/input/ml4cv-data/streethazards_test/test/test.odgt",
    image_resize = image_resize,
    augment_both=None,
    augment_images=augment_val_test,
    inject_anomalies = False,
)

In [None]:
idx = 2
img, lbl = train_dataset[idx].values()
visualize_scene(img)

In [None]:
visualize_annotation(lbl)

### data loader creations

In [None]:
train_dl = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=2)
val_dl = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=2)
test_dl = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=2)

In [None]:
def as_numpy(obj):
    if torch.is_tensor(obj):
        return obj.cpu().numpy()
    else:
        return np.array(obj)

In [None]:
from transformers import AutoImageProcessor, SegformerForSemanticSegmentation

def get_model(model_name, num_classes, weights_path = None):

    if "segformer" in model_name:
        processor = AutoImageProcessor.from_pretrained(model_name)
        model = SegformerForSemanticSegmentation.from_pretrained(model_name, num_labels=num_classes, ignore_mismatched_sizes=True, semantic_loss_ignore_index = 13)
    elif "deeplabv3":
        processor = MobileViTImageProcessor.from_pretrained(model_name)
        model = MobileViTForSemanticSegmentation.from_pretrained(model_name, num_labels=num_classes, ignore_mismatched_sizes=True, semantic_loss_ignore_index = 13)
    
    if weights_path:
        model.load_state_dict(torch.load(weights_path))
        
    model.to(DEVICE);
    
    return processor, model

In [None]:
class MeanIoU:
    """
    taken from https://github.com/Jun-CEN/Open-World-Semantic-Segmentation/blob/main/DeepLabV3Plus-Pytorch/metrics/stream_metrics.py
    """
    def __init__(self, n_classes= 13):
        self.n_classes = n_classes
        self.confusion_matrix = np.zeros((n_classes, n_classes))
        
    def update(self, label_trues, logits):
        label_preds = torch.argmax(logits, dim=1)
        label_preds, label_trues = label_preds.cpu().numpy(), label_trues.cpu().numpy()
        for lt, lp in zip(label_trues, label_preds):
            self.confusion_matrix += self._fast_hist( lt.flatten(), lp.flatten())

    def _fast_hist(self, label_true, label_pred):
        mask = (label_true >= 0) & (label_true < self.n_classes)
        hist = np.bincount(
            self.n_classes * label_true[mask].astype(int) + label_pred[mask],
            minlength=self.n_classes ** 2,
        ).reshape(self.n_classes, self.n_classes)
        return hist

    def get_results(self):
        """Returns accuracy score evaluation result.
            - overall accuracy
            - mean accuracy
            - mean IU
            - fwavacc
        """
        hist = self.confusion_matrix
        acc = np.diag(hist).sum() / hist.sum()
        acc_cls = np.diag(hist) / hist.sum(axis=1)
        acc_cls = np.nanmean(acc_cls)
        iu = np.diag(hist) / (hist.sum(axis=1) + hist.sum(axis=0) - np.diag(hist))
        mean_iu = np.nanmean(iu, axis= 0)
        freq = hist.sum(axis=1) / hist.sum()
        fwavacc = (freq[freq > 0] * iu[freq > 0]).sum()
        cls_iu = dict(zip(range(self.n_classes), iu))

        return {
                "Overall Acc": acc,
                "Mean Acc": acc_cls,
                "FreqW Acc": fwavacc,
                "Mean IoU": mean_iu,
                "Class IoU": cls_iu,
            }


In [None]:
def get_measures(_pos, _neg):
    pos = np.array(_pos[:]).reshape((-1, 1))
    neg = np.array(_neg[:]).reshape((-1, 1))
    examples = np.squeeze(np.vstack((pos, neg)))
    labels = np.zeros(len(examples), dtype=np.int32)
    labels[:len(pos)] += 1

    auroc = metrics.roc_auc_score(labels, examples)
    aupr = metrics.average_precision_score(labels, examples)

    return auroc, aupr

In [None]:
from sklearn import metrics

def get_aupr(confs, seg_labels, out_label=13):

    in_scores = confs[seg_labels != out_label]
    out_scores = confs[seg_labels == out_label]
    
    if (len(out_scores) != 0) and (len(in_scores) != 0):
        
        auprs = []
        measures = get_measures(in_scores, out_scores)
        auprs.append(measures[1])
        aupr = np.mean(auprs)

    return aupr

In [None]:
from kornia.morphology import dilation, erosion
from scipy import ndimage as ndi

d_k1 = torch.zeros((1, 1, 2 * 1 + 1, 2 * 1 + 1)).cuda()
d_k2 = torch.zeros((1, 1, 2 * 2 + 1, 2 * 2 + 1)).cuda()
d_k3 = torch.zeros((1, 1, 2 * 3 + 1, 2 * 3 + 1)).cuda()
d_k4 = torch.zeros((1, 1, 2 * 4 + 1, 2 * 4 + 1)).cuda()
d_k5 = torch.zeros((1, 1, 2 * 5 + 1, 2 * 5 + 1)).cuda()
d_k6 = torch.zeros((1, 1, 2 * 6 + 1, 2 * 6 + 1)).cuda()
d_k7 = torch.zeros((1, 1, 2 * 7 + 1, 2 * 7 + 1)).cuda()
d_k8 = torch.zeros((1, 1, 2 * 8 + 1, 2 * 8 + 1)).cuda()
d_k9 = torch.zeros((1, 1, 2 * 9 + 1, 2 * 9 + 1)).cuda()

d_ks = {1: d_k1, 2: d_k2, 3: d_k3, 4: d_k4, 5: d_k5, 6: d_k6, 7: d_k7, 8: d_k8, 9: d_k9}


selem = torch.ones((3, 3)).cuda()
selem_dilation = torch.FloatTensor(ndi.generate_binary_structure(2, 1)).cuda()

for k, v in d_ks.items():
    v[:,:,k,k] = 1
    for i in range(k):
        v = dilation(v, selem_dilation)
    d_ks[k] = v.squeeze(0).squeeze(0)

def find_boundaries(labels):
    """
    Calculate boundary mask by getting diff of dilated and eroded prediction maps
    """
    assert len(labels.shape) == 4
    boundaries = (dilation(labels.float(), selem_dilation) != erosion(labels.float(), selem)).float()
    ### save_image(boundaries, f'boundaries_{boundaries.float().mean():.2f}.png', normalize=True)

    return boundaries

def expand_boundaries(boundaries, r=0):
    """
    Expand boundary maps with the rate of r
    """
    if r == 0:
        return boundaries
    expanded_boundaries = dilation(boundaries, d_ks[r])
    ### save_image(expanded_boundaries, f'expanded_boundaries_{r}_{boundaries.float().mean():.2f}.png', normalize=True)
    return expanded_boundaries

In [None]:
class BoundarySuppressionWithSmoothing(nn.Module):
    """
    Apply boundary suppression and dilated smoothing
    """
    def __init__(self, boundary_suppression=True, boundary_width=4, boundary_iteration=4,
                 dilated_smoothing=True, kernel_size=7, dilation=6):
        super(BoundarySuppressionWithSmoothing, self).__init__()
        self.kernel_size = kernel_size
        self.dilation = dilation
        self.boundary_suppression = boundary_suppression
        self.boundary_width = boundary_width
        self.boundary_iteration = boundary_iteration

        sigma = 1.0
        size = 7
        gaussian_kernel = np.fromfunction(lambda x, y: (1/(2*math.pi*sigma**2)) * math.e ** ((-1*((x-(size-1)/2)**2+(y-(size-1)/2)**2))/(2*sigma**2)), (size, size))
        gaussian_kernel /= np.sum(gaussian_kernel)
        gaussian_kernel = torch.Tensor(gaussian_kernel).unsqueeze(0).unsqueeze(0)
        self.dilated_smoothing = dilated_smoothing

        self.first_conv = nn.Conv2d(1, 1, kernel_size=3, stride=1, bias=False)
        self.first_conv.weight = torch.nn.Parameter(torch.ones_like((self.first_conv.weight)))

        self.second_conv = nn.Conv2d(1, 1, kernel_size=self.kernel_size, stride=1, dilation=self.dilation, bias=False)
        self.second_conv.weight = torch.nn.Parameter(gaussian_kernel)


    def forward(self, x, prediction=None):
        if len(x.shape) == 3:
            x = x.unsqueeze(1)
        x_size = x.size()
        # B x 1 x H x W
        assert len(x.shape) == 4
        out = x
        if self.boundary_suppression:
            # obtain the boundary map of width 2 by default
            # this can be calculated by the difference of dilation and erosion
            boundaries = find_boundaries(prediction.unsqueeze(1))
            expanded_boundaries = None
            if self.boundary_iteration != 0:
                assert self.boundary_width % self.boundary_iteration == 0
                diff = self.boundary_width // self.boundary_iteration
            for iteration in range(self.boundary_iteration):
                if len(out.shape) != 4:
                    out = out.unsqueeze(1)
                prev_out = out
                # if it is the last iteration or boundary width is zero
                if self.boundary_width == 0 or iteration == self.boundary_iteration - 1:
                    expansion_width = 0
                # reduce the expansion width for each iteration
                else:
                    expansion_width = self.boundary_width - diff * iteration - 1
                # expand the boundary obtained from the prediction (width of 2) by expansion rate
                expanded_boundaries = expand_boundaries(boundaries, r=expansion_width)
                # invert it so that we can obtain non-boundary mask
                non_boundary_mask = 1. * (expanded_boundaries == 0)

                f_size = 1
                num_pad = f_size

                # making boundary regions to 0
                x_masked = out * non_boundary_mask
                x_padded = nn.ReplicationPad2d(num_pad)(x_masked)

                non_boundary_mask_padded = nn.ReplicationPad2d(num_pad)(non_boundary_mask)

                # sum up the values in the receptive field
                y = self.first_conv(x_padded)
                # count non-boundary elements in the receptive field
                num_calced_elements = self.first_conv(non_boundary_mask_padded)
                num_calced_elements = num_calced_elements.long()

                # take an average by dividing y by count
                # if there is no non-boundary element in the receptive field,
                # keep the original value
                avg_y = torch.where((num_calced_elements == 0), prev_out, y / num_calced_elements)
                out = avg_y

                # update boundaries only
                out = torch.where((non_boundary_mask == 0), out, prev_out)
                del expanded_boundaries, non_boundary_mask

            # second stage; apply dilated smoothing
            if self.dilated_smoothing == True:
                out = nn.ReplicationPad2d(self.dilation * 3)(out)
                out = self.second_conv(out)

            return out.squeeze(1)
        else:
            if self.dilated_smoothing == True:
                out = nn.ReplicationPad2d(self.dilation * 3)(out)
                out = self.second_conv(out)
            else:
                out = x

        return out.squeeze(1)


In [None]:
def compute_anomaly_score(score, mode='energy'):
    score = score.squeeze(0)[:13]
    if mode == 'energy':
        anomaly_score = -(1. * torch.logsumexp(score, dim=1))
    elif mode == 'entropy':
        prob = torch.softmax(score, dim=0)
        anomaly_score = -torch.sum(prob * torch.log(prob), dim=1) / torch.log(torch.tensor(19.))
    else:
        raise NotImplementedError

    # regular gaussian smoothing
    anomaly_score = anomaly_score.unsqueeze(0)
    anomaly_score = transforms.GaussianBlur(7, sigma=1)(anomaly_score)
    anomaly_score = anomaly_score.squeeze(0)
    return anomaly_score

In [None]:
from torch import Tensor, einsum

def simplex(t: Tensor, axis=1) -> bool:
    """
    taken from https://github.com/LIVIAETS/boundary-loss/blob/master/utils.py
    """
    _sum = cast(Tensor, t.sum(axis).type(torch.float32))
    _ones = torch.ones_like(_sum, dtype=torch.float32)
    return torch.allclose(_sum, _ones)

class GeneralizedDice():
    """
    taken from https://github.com/LIVIAETS/boundary-loss/blob/master/losses.py
    """
    def __init__(self, **kwargs):
        # Self.idc is used to filter out some classes of the target mask. Use fancy indexing
        self.idc: List[int] = kwargs["idc"]
        print(f"Initialized {self.__class__.__name__} with {kwargs}")

    def __call__(self, probs: Tensor, target: Tensor) -> Tensor:
        assert simplex(probs) and simplex(target)

        pc = probs[:, self.idc, ...].type(torch.float32)
        tc = target[:, self.idc, ...].type(torch.float32)

        w: Tensor = 1 / ((einsum("bkwh->bk", tc).type(torch.float32) + 1e-10) ** 2)
        intersection: Tensor = w * einsum("bkwh,bkwh->bk", pc, tc)
        union: Tensor = w * (einsum("bkwh->bk", pc) + einsum("bkwh->bk", tc))

        divided: Tensor = 1 - 2 * (einsum("bk->b", intersection) + 1e-10) / (einsum("bk->b", union) + 1e-10)

        loss = divided.mean()

        return loss

class BoundaryLoss():
    """
    taken from https://github.com/LIVIAETS/boundary-loss/blob/master/losses.py
    """
    def __init__(self, **kwargs):
        # Self.idc is used to filter out some classes of the target mask. Use fancy indexing
        self.idc: List[int] = kwargs["idc"]
        print(f"Initialized {self.__class__.__name__} with {kwargs}")

    def __call__(self, probs: Tensor, dist_maps: Tensor) -> Tensor:
        assert simplex(probs)
        assert not one_hot(dist_maps)

        pc = probs[:, self.idc, ...].type(torch.float32)
        dc = dist_maps[:, self.idc, ...].type(torch.float32)

        multipled = einsum("bkwh,bkwh->bkwh", pc, dc)

        loss = multipled.mean()

        return loss

In [None]:
class Trainer:
    def __init__(self,
                 #processor,
                 model: nn.Module,
                 train_loader: DataLoader,
                 val_loader: DataLoader,
                 device: torch.device,
                 num_classes: int,
                 loss,
                 cfg: dict,
                 model_name: str,
                 resume_ckpt: dict = None,
                 
        ) -> None:
        
        self.model_name = model_name
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.device = device
        self.num_classes = num_classes
        self.patience = cfg["patience"]
        self.multi_scale = BoundarySuppressionWithSmoothing()
        self.multi_scale.to(DEVICE)
        self.loss = loss
        #self.processor = processor
        
        if resume_ckpt:

            self.model = model.to(device)
            self.model.load_state_dict(resume_ckpt['model_state_dict'])
            
            self.num_epochs = cfg["num_epochs"] - resume_ckpt['epoch']
            num_steps = self.num_epochs * len(train_loader)
            
            self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=cfg["lr"], weight_decay=cfg["wd"])
            self.optimizer.load_state_dict(resume_ckpt['optimizer_state_dict'])
            
            self.scheduler = torch.optim.lr_scheduler.OneCycleLR(self.optimizer, cfg["lr"], total_steps=num_steps)
            self.scheduler.load_state_dict(resume_ckpt['scheduler_state_dict'])
            


        else:
            self.model = model.to(device)
            self.num_epochs = cfg["num_epochs"]
            num_steps = self.num_epochs * len(train_loader)
            self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=cfg["lr"], weight_decay=cfg["wd"])
            self.scheduler = torch.optim.lr_scheduler.OneCycleLR(self.optimizer, cfg["lr"], total_steps=num_steps)

        self.mean_iou = 0.0
        self.step = 0
        self.best_acc = 0.0

        self.ckpt_path = Path("ckpts")
        self.ckpt_path.mkdir(exist_ok=True)

        wandb.init(name=cfg["run_name"], entity=WANDB_USER, project=WANDB_PROJECT, config=cfg)


    def train(self, verbose= False) -> None:
        for epoch in tqdm(range(self.num_epochs), desc="Epoch"):
            
            self.model.train()

            for batch in self.train_loader:
                imgs = batch['image'].to(self.device)
                labels = batch['labels'].to(self.device)

                if self.loss == energy_loss:

                    vanilla_logits, mix_logits = self.model(imgs)
                    loss_dict = self.loss(logits=mix_logits, targets=labels.clone(),
                                         vanilla_logits=vanilla_logits)
                    inlier_loss = loss_dict["entropy_part"] + loss_dict["reg"]
                    outlier_loss = loss_dict["energy_part"] * 0.05 # 0.05 = energy_weight (taken from paper)
                    loss_res = inlier_loss + outlier_loss

                else:

                    logits = self.model(imgs)
                    loss_res = self.loss(logits, labels)

                self.optimizer.zero_grad()
                loss_res.backward()
                self.optimizer.step()
                self.scheduler.step()

            self.eval("train", epoch)
            self.eval("val", epoch)

            if self.patience < self.step:
                break


    @torch.no_grad()
    def eval(self, split: str, epoch: int) -> None:
        
        self.model.eval()

        loader = self.train_loader if split == "train" else self.val_loader
        
        mean_iou = MeanIoU()
        losses = []
        mean_avg = []
        std_avg = []
        
        for batch in loader:
            imgs = batch['image'].to(self.device)
            labels = batch['labels'].to(self.device)
            
            if self.loss == energy_loss:
                vanilla_logits, mix_logits = self.model(imgs)
                loss_dict = self.loss(logits=mix_logits, targets=labels.clone(),
                                     vanilla_logits=vanilla_logits)
                inlier_loss = loss_dict["entropy_part"] + loss_dict["reg"]
                outlier_loss = loss_dict["energy_part"] * 0.05
                loss_res = inlier_loss + outlier_loss

            else:
                vanilla_logits = self.model(imgs)
                loss_res = self.loss(vanilla_logits, labels)
            
            losses.append(loss_res.item())

            mean_iou.update(labels, vanilla_logits)
        
        results = mean_iou.get_results()
        mean_iou = results['Mean IoU']
        
        l = sum(losses) / len(losses)
        if split == "val":
            print(f"Epoch {epoch + 1} | {split.upper()} Metrics:")
            print(f"  Loss: {l:.4f}")
            print(f"  Mean IoU: {mean_iou:.4f}\n")

            wandb.log({
            "val_loss": l,
            "mean_iou": mean_iou,
            }, step=(epoch + 1))

        if mean_iou > self.mean_iou and split == "val":
            self.mean_iou = mean_iou
            torch.save(self.model.state_dict(), self.ckpt_path/f"{self.model_name}.pt")
            torch.save({
                'epoch': epoch,
                'mean_iou': self.mean_iou,
                #'loss': loss,
                'model_state_dict': self.model.state_dict(),
                'optimizer_state_dict': self.optimizer.state_dict(),
                'scheduler_state_dict': self.scheduler.state_dict(),
                }, self.ckpt_path / "best_checkpoint")

            wandb.save(self.ckpt_path/f"{self.model_name}.pt")
            wandb.save(self.ckpt_path / "best_checkpoint")
            
            self.best_model = copy.deepcopy(self.model)
            self.step = 0

        elif split == "val":
            self.step += 1

In [None]:
@torch.no_grad()
def predict(model, loader, verbose= False):

    model.eval()
    mean_iou = MeanIoU()
    aupr = []

    mean_logit = []
    std_logit = []
    #anomaly_score_list = []
    labels_list = []
    
    mean_aupr = 0
    auprs_count = 0
    for batch in tqdm(loader):
        
        imgs = batch['image'].to(DEVICE)
        labels = batch['labels'].to(DEVICE)

        pred = model(imgs)
        
        if type(pred) == tuple:
            _, logits = pred
        else:
            logits = pred
            
        #labels_list.append(labels)
        
        mean_iou.update(label_trues=labels, logits= logits)

        #rpl anomaly
        anomaly_score = compute_anomaly_score(logits, mode="energy").cpu()

        '''mean, std = get_mean_std(logits)
        mean_logit.append(mean)
        std_logit.append(std)
        return np.mean(mean_logit, axis = 0), np.mean(std_logit, axis = 0)'''

        """
        predictors
        """

        #anomaly_score = maximum_softmax_probability(logits).cpu()#, multi_scale).cpu()
        #anomaly_score = max_logit(logits).cpu()#, multi_scale).cpu()
        #anomaly_score = euclidean_distance_sum(logits).cpu()
        
        '''anomaly_score = standardized_max_logit(
                                      logits, multi_scale,
                                      class_mean = MEAN_PER_CLASS, 
                                      class_var = VAR_PER_CLASS).cpu()'''
        for i in range(len(anomaly_score)):
            aupr, ac = compute_aupr(anomaly_score[i], (labels[i] ==13))
            auprs_count += ac
            mean_aupr += aupr
        
        del imgs
        del labels
        
        #anomaly_score = anomaly_score.squeeze(0)
        #anomaly_score_list.append(anomaly_score)
            
        
        #conf = as_numpy(conf.squeeze(0).cpu())
    mean_aupr = mean_aupr/auprs_count*100
    #aupr = get_aupr(anomaly_score_list, labels_list)
    return {"aupr": mean_aupr, "mean_iou": mean_iou.get_results()}

In [None]:
def compute_aupr(preds, labels):
    aupr_accumulator = 0
    auprs_count = 0
    if preds.dim() == 2: preds = preds.unsqueeze(0)
    if labels.dim() == 2: labels = labels.unsqueeze(0)
    preds, labels = preds.cpu(), labels.cpu()
    for p, l in zip(preds, labels):
        aupr_accumulator += metrics.average_precision_score(l.type(torch.int32).flatten().numpy(), p.type(torch.float32).flatten().numpy())
        auprs_count += 1

    return aupr_accumulator, auprs_count

In [None]:
def get_mean_std(logits, num_classes = 13):

    mean_avg = np.zeros(13)
    std_avg = np.zeros(13)
    
    for logit in logits:
        conf, labels = torch.max(logit, 0)
        for c in range(num_classes):
            tens = torch.where(labels == c, conf, 0)
            mean, std = torch.std_mean(tens)

            mean_avg[c] += as_numpy(mean)
            std_avg[c] += as_numpy(std)

    return mean_avg, std_avg**2

In [None]:
def maximum_softmax_probability(logits, multi_scale = None):
    """
    taken from https://github.com/Jun-CEN/Open-World-Semantic-Segmentation/blob/main/anomaly/eval_ood_traditional.py#L185
    """
    conf, prediction = torch.max(nn.functional.softmax(logits, dim=1),dim=1)
    #taken from standardized max logit
    if multi_scale:
        with torch.no_grad():
            conf = multi_scale(conf, prediction)
    
    return 1-conf

In [None]:
def max_logit(logits, multi_scale = None):
    """
    taken from https://github.com/Jun-CEN/Open-World-Semantic-Segmentation/blob/main/anomaly/eval_ood_traditional.py#L185
    """
    conf, prediction  = torch.max(logits,dim=1)
    
    #taken from standardized max logit
    if multi_scale:
        with torch.no_grad():
            conf = multi_scale(conf, prediction)

    return conf

In [None]:
MEAN_PER_CLASS = [1.72267947e+01, 7.66881425e+00, 5.77455433e-01, 4.47677614e-02,
 0.00000000e+00, 3.41044600e-01, 1.39990459e+00, 2.11263614e+01,
 7.31018189e+00, 7.46783295e+00, 1.15309967e-02, 2.41669891e+00,
 0.00000000e+00]
VAR_PER_CLASS = [1.33243940e+02, 1.53900278e+01, 2.14974713e-02, 3.87089461e-05,
 0.00000000e+00, 7.03953145e-03, 8.06856298e-02, 2.19127594e+02,
 5.62604913e+00, 8.48272837e+00, 1.52454267e-05, 8.70028476e-01,
 0.00000000e+00]

def standardized_max_logit(logits, multi_scale, class_mean, class_var, num_classes = 13):

    conf, prediction = torch.max(logits,dim=1)
    for c in range(num_classes):
        conf = torch.where(
            prediction == c,
            (conf - class_mean[c]) / np.sqrt(class_var[c]),
            conf)

    if multi_scale:
        with torch.no_grad():
            conf = multi_scale(conf, prediction)

    return conf

In [None]:
def euclidean_distance_sum(logits):
    
    """
    taken from https://github.com/Jun-CEN/Open-World-Semantic-Segmentation/blob/main/anomaly/eval_ood_traditional.py#L185
    """

    def Normalization(x):
        return (x - np.min(x)) / (np.max(x) - np.min(x))

    def Coefficient_map(x, thre):
        lamda = 50
        return 1 / (1 + np.exp(lamda * (x - thre)))
        
    dis_sum = torch.sum(logits,dim=1)
    dis_sum = - as_numpy(dis_sum.squeeze(0).cpu())
    dis_sum[dis_sum >= 400] = 400
    dis_sum = Normalization(dis_sum)
    prob_map = np.max(nn.functional.softmax(logits, dim=1).squeeze().cpu().numpy(), axis=1)
    prob_map = Normalization(prob_map)
    Coefficient = Coefficient_map(dis_sum, 0.2)
    conf = Coefficient * dis_sum + (1 - Coefficient) * prob_map
    conf = dis_sum

    return conf

### mm-opensegmentation lab

### get the model

only specify the weights path if you do not want to train the model

In [None]:
'''#model_name = "apple/deeplabv3-mobilevit-small"
#model_name = "nvidia/segformer-b1-finetuned-cityscapes-1024-1024" #results 0.61 on validation, but i have used 512, 512 img size, 1024, 1024 too big
model_name = "nvidia/segformer-b4-finetuned-ade-512-512"
#model_name = "nvidia/segformer-b2-finetuned-ade-512-512" #no-augmentation: (0,633 on val_set)


#weights_path = "/kaggle/input/weights/segformer_23_mln.pt"
weights_path = None

#processor, model = get_model(model_name, num_classes = len(COLORS)-1, weights_path = weights_path)
'''

In [None]:
import segmentation_models_pytorch as smp
from torchinfo import summary

ENCODER = 'efficientnet-b0'
ENCODER_WEIGHTS = 'imagenet'
ACTIVATION = None # could be None for logits or 'softmax2d' for multiclass segmentation

def get_deeplab_model(encoder_name="efficientnet-b0", encoder_weights = "imagenet", activation= None, num_classes= 13):

    # create segmentation model with pretrained encoder
    model = smp.DeepLabV3Plus(
        encoder_name=encoder_name, 
        encoder_weights=encoder_weights, 
        classes=num_classes,
        activation=activation,
    )

    return model

model = get_deeplab_model(encoder_name="efficientnet-b0", encoder_weights = "imagenet", activation= None, num_classes= 13)
input_size = (8, 3, 512, 512)
summary = summary(model, input_size=input_size)
print(summary)

In [None]:
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"model trainable parameters before freezing are {trainable_params/1000000} milions")

In [None]:
'''for name, param in model.backbone.named_parameters():
    param.requires_grad = False
for name, param in model.backbone.layer4.named_parameters():
    param.requires_grad = True
for name, param in model.classifier.named_parameters():
    param.requires_grad = True
for name, param in model.aux_classifier.named_parameters():
    param.requires_grad = True

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"model trainable parameters are {trainable_params/1000000} milions")'''

### model training

In [None]:
model_name = "deeplabv3plus_efficientnet_b0"

In [None]:
cfg = {
    "num_epochs" : 200,
    "lr": 2e-4,
    "wd": 0.001,
    "patience": 1000,
    "loss" : "CrossEntropyLoss",
    "optimizer" : "AdamW",
    "data_augmentation" : "transforms.v2.RandomCrop(image_resize_scale),transforms.v2.RandomHorizontalFlip(), transforms.v2.Resize(image_resize_scale, transforms.InterpolationMode.NEAREST)",
    "validation" : "not augmented with anomalies",
    "run_name": model_name,
}

In [None]:
def resume_run(run_id):

    api = wandb.Api()
    run = api.run(run_id)

    ckpts_files = []
    files = run.files()
    for f in files:
        if f.name.startswith("ckpts/"):
            f.download(replace=True)
            ckpts_files.append(f"/kaggle/working/{f.name}")
    return ckpts_files

In [None]:
run_id = "chri-project/ML4CV--assignment/1udth88b"
resume_ckpt = resume_run(run_id)
model_weigths = torch.load(resume_ckpt[1], weights_only=True)
model.load_state_dict(model_weigths)

In [None]:
'''trainer = Trainer(
    #processor = processor,
    model= model,
    train_loader= train_dl,
    val_loader= val_dl ,
    loss = F.cross_entropy,
    device= DEVICE,
    num_classes = len(COLORS)-1,
    model_name = model_name,
    cfg= cfg,
    resume_ckpt = torch.load(resume_ckpt[0])
)'''

#trainer.train()

In [None]:
result_train = predict(model, train_dl)
print("train results: ", result_train)

In [None]:
result_val = predict(model, val_dl)
print("validation results: ", result_val)

In [None]:
result_test = predict(model, test_dl)
print("test_results: ", result_test)

In [None]:
@torch.no_grad()
def get_predictions(img, model):
    img = img.unsqueeze(0).to(DEVICE)
    pred = model(img)
    log = pred
    log = log.cpu()
    log = torch.argmax(log, dim=1)
    log = log.squeeze(0)
    return log

In [None]:
dataset = val_dataset
fig, ax = plt.subplots(1, 2, figsize=(10, 12))

idx = 0
img, lbl = dataset[idx].values()
visualize_annotation(get_predictions(img, model), ax[0])
visualize_annotation(lbl, ax[1])

In [None]:
'''idx = 500
dataset = train_dataset
fig, ax = plt.subplots(1, 2)

tr_img, tr_lbl = dataset[idx].values()
visualize_annotation(get_predictions(tr_img, model), ax[0])
visualize_annotation(tr_lbl, ax[1])

#wandb.finish()'''

### AUPR Results:
- standardized max logit is the worst performer: 6.11 (if use boundiary suppression it goes up to 7.sth)
- max logit: 7.65
- maximum softmax probability: 11.33
- euclidean distance: 15.31

### TODO: implement class AUPR

# RPL implementation

make arrays:
- in which you insert all the layer up to the layer2 (representation extracted from shallow layers to go through conv2d)
- one in which you put everything up to the end of resnet (going through both ASPP and ResidualBlock)
- another one for ASPP (concatenate then with the first)

In [None]:
def disimilarity_entropy(logits, vanilla_logits, t=1.):
    n_prob = torch.clamp(torch.softmax(vanilla_logits, dim=1), min=1e-7)
    a_prob = torch.clamp(torch.softmax(logits, dim=1), min=1e-7)

    n_entropy = -torch.sum(n_prob * torch.log(n_prob), dim=1) / t
    a_entropy = -torch.sum(a_prob * torch.log(a_prob), dim=1) / t

    entropy_disimilarity = F.mse_loss(input=a_entropy, target=n_entropy, reduction="none")
    assert ~torch.isnan(entropy_disimilarity).any(), print(torch.min(n_entropy), torch.max(a_entropy))

    return entropy_disimilarity


def energy_loss(logits, targets, vanilla_logits, out_idx=13, t=1.):
    out_msk = (targets == out_idx)
    void_msk = (targets == 255)

    pseudo_targets = torch.argmax(vanilla_logits, dim=1)
    outlier_msk = (out_msk | void_msk)
    entropy_part = F.cross_entropy(input=logits, target=pseudo_targets, reduction='none')[~outlier_msk]
    reg = disimilarity_entropy(logits=logits, vanilla_logits=vanilla_logits)[~outlier_msk]
    if torch.sum(out_msk) > 0:
        logits = logits.flatten(start_dim=2).permute(0, 2, 1)
        energy_part = F.relu(torch.log(torch.sum(torch.exp(logits),dim=2))[out_msk.flatten(start_dim=1)]).mean()
    else:
        energy_part = torch.tensor([.0], device=targets.device)
    return {"entropy_part": entropy_part.mean(), "reg": reg.mean(), "energy_part": energy_part}

In [None]:
from copy import deepcopy

class RPLDeepLab(nn.Module):
    def __init__(self, model):
        super().__init__()
        
        self.encoder = self.copy_un_freeze_params(model.encoder, unfreeze=False)
        self.decoder = self.copy_un_freeze_params(model.decoder, unfreeze=False)
        self.final = nn.Sequential(
            self.copy_un_freeze_params(model.decoder.block2, unfreeze=False),
            self.copy_un_freeze_params(model.segmentation_head, unfreeze=False),    
        )
        
        self.atten_aspp_final = nn.Conv2d(256, 304, kernel_size=1, bias=False)
        
        self.residual_anomaly_block = nn.Sequential(
            self.copy_un_freeze_params(model.decoder.aspp, unfreeze=True),
            self.copy_un_freeze_params(model.decoder.up, unfreeze=True),
            self.atten_aspp_final
        )

    def copy_un_freeze_params(self, layer: nn.Module, unfreeze: bool=True) -> nn.Module:
        """
        function that create a deepcopy of a layer and unfreeze its parameters if unfreeze is True, otherwise freeze it

        return: deepcopy of the layer freezed or unfreezed
        """
        layer_copy = deepcopy(layer)
        for param in layer_copy.parameters():
            param.requires_grad = unfreeze
        return layer_copy

    def forward(self, x):

        features = self.encoder(x)
        aspp_features = self.decoder.aspp(features[-1])
        aspp_features = self.decoder.up(aspp_features)
        high_res_features = self.decoder.block1(features[2])
        concat_features = torch.cat([aspp_features, high_res_features], dim=1)
        
        res = self.residual_anomaly_block(features[-1])

        out1 = self.final(concat_features)
        out2 = self.final(concat_features + res)

        return out1, out2

In [None]:
def resume_run(run_id, model_name):

    api = wandb.Api()
    run = api.run(run_id)
    
    files = run.files()
    for f in files:
        if f.name.startswith("ckpts/"):  # or adjust the filter
            f.download(replace=True)
    directory = f"/kaggle/working/ckpts/{model_name}.pt"
    return directory

In [None]:
model = get_deeplab_model(encoder_name="efficientnet-b0", encoder_weights = "imagenet", activation= None, num_classes= 13)

run_id = "chri-project/ML4CV--assignment/cn4tyspf"
model_name = "deeplabv3plus_efficientnet_b0-continue"
directory = resume_run(run_id, model_name)
model_weigths = torch.load(directory, weights_only=True)
model.load_state_dict(model_weigths)

In [None]:
from torchinfo import summary
rpl = RPLDeepLab(model)
summary(rpl, input_size= (8, 3, 512, 512))

In [None]:
model_name = "rpl_deeplabv3plus_efficientnet_b0"

rpl_cfg = {
    "num_epochs" : 200,
    "lr": 2e-4,
    "wd": 0.001,
    "patience": 1000,
    "loss" : "CrossEntropyLoss",
    "optimizer" : "AdamW",
    "data_augmentation" : "transforms.v2.RandomCrop(image_resize_scale),transforms.v2.RandomHorizontalFlip(), transforms.v2.Resize(image_resize_scale, transforms.InterpolationMode.NEAREST)",
    "train" : "augmented with anomalies",
    "validation" : "augmented with anomalies",
    "run_name": model_name,
}

## model head need to remain at 13

In [None]:
run_id_rpl = "chri-project/ML4CV--assignment/c8ij9i65"
rpl_model_name = "rpl_deeplabv3plus_efficientnet_b0_first_try"
directory = resume_run(run_id_rpl, rpl_model_name)
model_weights_path = torch.load(directory, weights_only=True)
rpl.load_state_dict(model_weights_path)

In [None]:
rpl_trainer = Trainer(
    model= rpl,
    train_loader= train_dl,
    val_loader= val_dl ,
    loss = energy_loss,
    device= DEVICE,
    num_classes = len(COLORS),
    model_name = model_name,
    cfg= rpl_cfg,
    #resume_ckpt = torch.load(resume_ckpt)
)

#rpl_trainer.train()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch

def plot_anomaly_heatmap(anomaly_score, image=None, title="Anomaly Heatmap"):
    """
    Plots a heatmap where red = high anomaly, blue = low anomaly.
    
    Parameters:
    - anomaly_score: torch.Tensor or np.array of shape [H, W]
    - image: optional RGB image [H, W, 3] in range [0, 1] or [0, 255]
    - title: string title for plot
    """
    if isinstance(anomaly_score, torch.Tensor):
        anomaly_score = anomaly_score.squeeze().cpu().numpy()
    
    # Normalize to [0, 1]
    anomaly_score = (anomaly_score - anomaly_score.min()) / (anomaly_score.max() - anomaly_score.min())


    if image is not None:
        fig, ax = plt.subplots(1, 2, figsize=(10, 5))
        #if isinstance(image, torch.Tensor):
            #image = image.permute(1, 2, 0).cpu().numpy()
        if image.max() > 1.0:
            image = image / 255.0

        
        image1 = ax[0].imshow(anomaly_score, cmap='jet')  # 'jet': blue -> red
        fig.colorbar(image1, ax=ax[0], label='Anomaly Score')
        #ax[0].colorbar(label="Anomaly Score")
        ax[0].set_title(title)
        ax[0].axis("off")
        
        visualize_scene(image, ax[1])

        
    else:
        
        plt.imshow(anomaly_score, cmap='jet')

        plt.colorbar(label="Anomaly Score")
        plt.title(title)
        plt.axis("off")
        
    plt.figure(figsize=(10, 12))
    plt.tight_layout()
    plt.show()


In [None]:
@torch.no_grad()
def get_predictions(model, img):
    img = img.unsqueeze(0).to(DEVICE)
    pred = model(img)
    inlier, outlier = pred
    outlier = outlier.unsqueeze(0)
    img = img.squeeze(0).cpu()
    anomaly_score = compute_anomaly_score(outlier, mode="energy").cpu()
    plot_anomaly_heatmap(anomaly_score=anomaly_score, image=None) #try alpha= None
    '''inlier, outlier = inlier.cpu(), outlier.cpu()
    inlier = torch.argmax(inlier, dim=1)
    inlier = inlier.squeeze(0)
    outlier = torch.argmax(outlier, dim=1)
    outlier = outlier.squeeze(0)
    return inlier, outlier'''

In [None]:
img = val_dataset[1000]['image']
get_predictions(rpl, img)

In [None]:
img = test_dataset[1024]['image']
get_predictions(rpl, img)

In [None]:
res = predict(rpl, val_dl)
print(res)

In [None]:
res = predict(rpl, test_dl)
print(res)

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

rpl: 0.5121710741219887 <br>
maximum softmax probability: <br>
max logit: <br>
standardized max logit: 

In [None]:
dataset = val_dataset
fig, ax = plt.subplots(1, 3)

idx = 500
img, lbl = dataset[idx].values()
inlier, outlier = get_predictions(img, rpl)
visualize_annotation(inlier, ax[0])
visualize_annotation(outlier, ax[1])
visualize_annotation(lbl, ax[2])