# Moduli import

In [2]:
import torch
import torch.nn as nn
import torch.nn.utils.prune as prune
import torch.quantization as quant
from torch.ao.quantization import get_default_qconfig
from torch.ao.quantization.quantize_fx import prepare_fx, convert_fx
from torch.ao.quantization import QConfigMapping

In [3]:
# Code with dataset loader for VOC12 and Cityscapes (adapted from bodokaiser/piwise code)
# Sept 2017
# Eduardo Romera
#######################

import numpy as np
import os

from PIL import Image

from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, Resize, ToTensor


class Relabel:

    def __init__(self, olabel, nlabel):
        self.olabel = olabel
        self.nlabel = nlabel

    def __call__(self, tensor):
        assert isinstance(tensor, torch.LongTensor) or isinstance(tensor, torch.ByteTensor) , 'tensor needs to be LongTensor'
        tensor[tensor == self.olabel] = self.nlabel
        return tensor


class ToLabel:

    def __call__(self, image):
        return torch.from_numpy(np.array(image)).long()#.unsqueeze(0)


EXTENSIONS = ['.jpg', '.png']

def load_image(file):
    return Image.open(file)

def is_image(filename):
    return any(filename.endswith(ext) for ext in EXTENSIONS)

def is_label(filename):
    return filename.endswith("_labelTrainIds.png")

def image_path(root, basename, extension):
    return os.path.join(root, f'{basename}{extension}')

def image_path_city(root, name):
    return os.path.join(root, f'{name}')

def image_basename(filename):
    return os.path.basename(os.path.splitext(filename)[0])

def get_cityscapes_loader(datadir, batch_size, subset,num_workers=4,size = 256):

    # preprocessign of the input images
    input_transform_cityscapes = Compose([
        Resize(size, Image.BILINEAR),
        ToTensor(),
    ])
    target_transform_cityscapes = Compose([
        Resize(size, Image.NEAREST),
        ToLabel(),
        Relabel(255, 19),   #ignore label to 19
    ])

    return DataLoader(cityscapes(datadir, input_transform_cityscapes, target_transform_cityscapes, subset=subset), num_workers=num_workers, batch_size=batch_size, shuffle=False)



class VOC12(Dataset):

    def __init__(self, root, input_transform=None, target_transform=None):
        self.images_root = os.path.join(root, 'images')
        self.labels_root = os.path.join(root, 'labels')

        self.filenames = [image_basename(f)
            for f in os.listdir(self.labels_root) if is_image(f)]
        self.filenames.sort()

        self.input_transform = input_transform
        self.target_transform = target_transform

    def __getitem__(self, index):
        filename = self.filenames[index]

        with open(image_path(self.images_root, filename, '.jpg'), 'rb') as f:
            image = load_image(f).convert('RGB')
        with open(image_path(self.labels_root, filename, '.png'), 'rb') as f:
            label = load_image(f).convert('P')

        if self.input_transform is not None:
            image = self.input_transform(image)
        if self.target_transform is not None:
            label = self.target_transform(label)

        return image, label

    def __len__(self):
        return len(self.filenames)


class cityscapes(Dataset):

    def __init__(self, root, input_transform=None, target_transform=None, subset='val'):

        #unisce la root di cityscapes con la cartella delle immagini e delle label e il corretto subset
        self.images_root = os.path.join(root, 'leftImg8bit/' + subset)
        self.labels_root = os.path.join(root, 'gtFine/' + subset)

        # crea una lista dei path di tutte le immagini (compresi di root)
        self.filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(self.images_root)) for f in fn if is_image(f)]
        self.filenames.sort()

        # crea una lista dei path di tutte le label (compresi di root)
        self.filenamesGt = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(self.labels_root)) for f in fn if is_label(f)]
        self.filenamesGt.sort()

        self.input_transform = input_transform
        self.target_transform = target_transform

    def __getitem__(self, index):

        filename = self.filenames[index]
        filenameGt = self.filenamesGt[index]

        with open(filename, 'rb') as f:
            image = load_image(f).convert('RGB')
        with open(filenameGt, 'rb') as f:
            label = load_image(f).convert('P')

        if self.input_transform is not None:
            image = self.input_transform(image)
        if self.target_transform is not None:
            label = self.target_transform(label)

        return image, label#, filename, filenameGt

    def __len__(self):
        return len(self.filenames)

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from itertools import chain

def initialize_weights(*models):
    for model in models:
        for m in model.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight.data, nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1.)
                m.bias.data.fill_(1e-4)
            elif isinstance(m, nn.Linear):
                m.weight.data.normal_(0.0, 0.0001)
                m.bias.data.zero_()

class InitalBlock(nn.Module):
    def __init__(self, in_channels, use_prelu=True):
        super(InitalBlock, self).__init__()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)
        self.conv = nn.Conv2d(in_channels, 16 - in_channels, 3, padding=1, stride=2)
        self.bn = nn.BatchNorm2d(16)
        self.prelu = nn.PReLU(16) if use_prelu else nn.ReLU(inplace=True)

    def forward(self, x):
        x = torch.cat((self.pool(x), self.conv(x)), dim=1)
        x = self.bn(x)
        x = self.prelu(x)
        return x

class BottleNeck(nn.Module):
    def __init__(self, in_channels, out_channels=None, activation=None, dilation=1, downsample=False, proj_ratio=4,
                        upsample=False, asymetric=False, regularize=True, p_drop=None, use_prelu=True):
        super(BottleNeck, self).__init__()

        self.pad = 0
        self.upsample = upsample
        self.downsample = downsample
        if out_channels is None: out_channels = in_channels
        else: self.pad = out_channels - in_channels

        if regularize: assert p_drop is not None
        if downsample: assert not upsample
        elif upsample: assert not downsample
        inter_channels = in_channels//proj_ratio

        # Main
        if upsample:
            self.spatil_conv = nn.Conv2d(in_channels, out_channels, 1, bias=False)
            self.bn_up = nn.BatchNorm2d(out_channels)
            self.unpool = nn.MaxUnpool2d(kernel_size=2, stride=2)
        elif downsample:
            self.pool = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)

        # Bottleneck
        if downsample:
            self.conv1 = nn.Conv2d(in_channels, inter_channels, 2, stride=2, bias=False)
        else:
            self.conv1 = nn.Conv2d(in_channels, inter_channels, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(inter_channels)
        self.prelu1 = nn.PReLU() if use_prelu else nn.ReLU(inplace=True)

        if asymetric:
            self.conv2 = nn.Sequential(
                nn.Conv2d(inter_channels, inter_channels, kernel_size=(1,5), padding=(0,2)),
                nn.BatchNorm2d(inter_channels),
                nn.PReLU() if use_prelu else nn.ReLU(inplace=True),
                nn.Conv2d(inter_channels, inter_channels, kernel_size=(5,1), padding=(2,0)),
            )
        elif upsample:
            self.conv2 = nn.ConvTranspose2d(inter_channels, inter_channels, kernel_size=3, padding=1,
                                            output_padding=1, stride=2, bias=False)
        else:
            self.conv2 = nn.Conv2d(inter_channels, inter_channels, 3, padding=dilation, dilation=dilation, bias=False)
        self.bn2 = nn.BatchNorm2d(inter_channels)
        self.prelu2 = nn.PReLU() if use_prelu else nn.ReLU(inplace=True)

        self.conv3 = nn.Conv2d(inter_channels, out_channels, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.prelu3 = nn.PReLU() if use_prelu else nn.ReLU(inplace=True)

        self.regularizer = nn.Dropout2d(p_drop) if regularize else None
        self.prelu_out = nn.PReLU() if use_prelu else nn.ReLU(inplace=True)

    def forward(self, x, indices=None, output_size=None):
        # Main branch
        identity = x
        if self.upsample:
            assert (indices is not None) and (output_size is not None)
            identity = self.bn_up(self.spatil_conv(identity))
            if identity.size() != indices.size():
                pad = (indices.size(3) - identity.size(3), 0, indices.size(2) - identity.size(2), 0)
                identity = F.pad(identity, pad, "constant", 0)
            identity = self.unpool(identity, indices=indices)#, output_size=output_size)
        elif self.downsample:
            identity, idx = self.pool(identity)

        '''
        if self.pad > 0:
            if self.pad % 2 == 0 : pad = (0, 0, 0, 0, self.pad//2, self.pad//2)
            else: pad = (0, 0, 0, 0, self.pad//2, self.pad//2+1)
            identity = F.pad(identity, pad, "constant", 0)
        '''

        if self.pad > 0:
            extras = torch.zeros((identity.size(0), self.pad, identity.size(2), identity.size(3)))
            if torch.cuda.is_available(): extras = extras.cuda(0)
            identity = torch.cat((identity, extras), dim = 1)

        # Bottleneck
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.prelu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.prelu2(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.prelu3(x)
        if self.regularizer is not None:
            x = self.regularizer(x)

        # When the input dim is odd, we might have a mismatch of one pixel
        if identity.size() != x.size():
            pad = (identity.size(3) - x.size(3), 0, identity.size(2) - x.size(2), 0)
            x = F.pad(x, pad, "constant", 0)

        x += identity
        x = self.prelu_out(x)

        if self.downsample:
            return x, idx
        return x

class ENet(nn.Module):
    def __init__(self, num_classes, in_channels=3, freeze_bn=False, **_):
        super(ENet, self).__init__()
        self.initial = InitalBlock(in_channels)

        # Stage 1
        self.bottleneck10 = BottleNeck(16, 64, downsample=True, p_drop=0.01)
        self.bottleneck11 = BottleNeck(64, p_drop=0.01)
        self.bottleneck12 = BottleNeck(64, p_drop=0.01)
        self.bottleneck13 = BottleNeck(64, p_drop=0.01)
        self.bottleneck14 = BottleNeck(64, p_drop=0.01)

        # Stage 2
        self.bottleneck20 = BottleNeck(64, 128, downsample=True, p_drop=0.1)
        self.bottleneck21 = BottleNeck(128, p_drop=0.1)
        self.bottleneck22 = BottleNeck(128, dilation=2, p_drop=0.1)
        self.bottleneck23 = BottleNeck(128, asymetric=True, p_drop=0.1)
        self.bottleneck24 = BottleNeck(128, dilation=4, p_drop=0.1)
        self.bottleneck25 = BottleNeck(128, p_drop=0.1)
        self.bottleneck26 = BottleNeck(128, dilation=8, p_drop=0.1)
        self.bottleneck27 = BottleNeck(128, asymetric=True, p_drop=0.1)
        self.bottleneck28 = BottleNeck(128, dilation=16, p_drop=0.1)

        # Stage 3
        self.bottleneck31 = BottleNeck(128, p_drop=0.1)
        self.bottleneck32 = BottleNeck(128, dilation=2, p_drop=0.1)
        self.bottleneck33 = BottleNeck(128, asymetric=True, p_drop=0.1)
        self.bottleneck34 = BottleNeck(128, dilation=4, p_drop=0.1)
        self.bottleneck35 = BottleNeck(128, p_drop=0.1)
        self.bottleneck36 = BottleNeck(128, dilation=8, p_drop=0.1)
        self.bottleneck37 = BottleNeck(128, asymetric=True, p_drop=0.1)
        self.bottleneck38 = BottleNeck(128, dilation=16, p_drop=0.1)

        # Stage 4
        self.bottleneck40 = BottleNeck(128, 64, upsample=True, p_drop=0.1, use_prelu=False)
        self.bottleneck41 = BottleNeck(64, p_drop=0.1, use_prelu=False)
        self.bottleneck42 = BottleNeck(64, p_drop=0.1, use_prelu=False)

        # Stage 5
        self.bottleneck50 = BottleNeck(64, 16, upsample=True, p_drop=0.1, use_prelu=False)
        self.bottleneck51 = BottleNeck(16, p_drop=0.1, use_prelu=False)

        # Stage 6
        self.fullconv = nn.ConvTranspose2d(16, num_classes, kernel_size=3, padding=1,
                                            output_padding=1, stride=2, bias=False)
        initialize_weights(self)
        if freeze_bn: self.freeze_bn()

    def forward(self, x):
        x = self.initial(x)

        # Stage 1
        sz1 = x.size()
        x, indices1 = self.bottleneck10(x)
        x = self.bottleneck11(x)
        x = self.bottleneck12(x)
        x = self.bottleneck13(x)
        x = self.bottleneck14(x)

        # Stage 2
        sz2 = x.size()
        x, indices2 = self.bottleneck20(x)
        x = self.bottleneck21(x)
        x = self.bottleneck22(x)
        x = self.bottleneck23(x)
        x = self.bottleneck24(x)
        x = self.bottleneck25(x)
        x = self.bottleneck26(x)
        x = self.bottleneck27(x)
        x = self.bottleneck28(x)

        # Stage 3
        x = self.bottleneck31(x)
        x = self.bottleneck32(x)
        x = self.bottleneck33(x)
        x = self.bottleneck34(x)
        x = self.bottleneck35(x)
        x = self.bottleneck36(x)
        x = self.bottleneck37(x)
        x = self.bottleneck38(x)

        # Stage 4
        x = self.bottleneck40(x, indices=indices2, output_size=sz2)
        x = self.bottleneck41(x)
        x = self.bottleneck42(x)

        # Stage 5
        x = self.bottleneck50(x, indices=indices1, output_size=sz1)
        x = self.bottleneck51(x)

        # Stage 6
        x = self.fullconv(x)
        return x

    def get_backbone_params(self):
        # There is no backbone for unet, all the parameters are trained from scratch
        return []

    def get_decoder_params(self):
        return self.parameters()

    def freeze_bn(self):
        for module in self.modules():
            if isinstance(module, nn.BatchNorm2d): module.eval()

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DownsampleBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels - in_channels, kernel_size=3, stride=2, padding=1, bias=False)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        conv_out = self.conv(x)
        pool_out = self.pool(x)
        out = torch.cat([conv_out, pool_out], dim=1)
        return self.relu(self.bn(out))

class Bottleneck(nn.Module):
    def __init__(self, in_channels, out_channels, downsample=False, dilation=1):
        super().__init__()
        stride = 2 if downsample else 1
        self.downsample = downsample
        self.conv1 = nn.Conv2d(in_channels, in_channels // 4, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(in_channels // 4)
        self.conv2 = nn.Conv2d(in_channels // 4, in_channels // 4, kernel_size=3, stride=stride, padding=dilation, dilation=dilation, bias=False)
        self.bn2 = nn.BatchNorm2d(in_channels // 4)
        self.conv3 = nn.Conv2d(in_channels // 4, out_channels, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.match_channels = None
        if in_channels != out_channels or downsample:
            self.match_channels = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False)

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        if self.match_channels:
            residual = self.match_channels(residual)
        return self.relu(out + residual)

class MidENet(nn.Module):
    def __init__(self, num_classes=20):
        super().__init__()
        self.initial = DownsampleBlock(3, 16)
        self.bottleneck1 = nn.Sequential(
            Bottleneck(16, 64, downsample=True),
            Bottleneck(64, 64),
            Bottleneck(64, 64)
        )
        self.bottleneck2 = nn.Sequential(
            Bottleneck(64, 128, downsample=True),
            Bottleneck(128, 128, dilation=2),
            Bottleneck(128, 128, dilation=4)
        )
        self.classifier = nn.Conv2d(128, num_classes, kernel_size=1)

    def forward(self, x):
        x = self.initial(x)
        x = self.bottleneck1(x)
        x = self.bottleneck2(x)
        x = F.interpolate(x, scale_factor=8, mode='bilinear', align_corners=False)  # restore input resolution
        return self.classifier(x)


## Funzioni load

In [None]:
from torch.ao.quantization import get_default_qconfig, QConfigMapping
from torch.ao.quantization.quantize_fx import prepare_fx, convert_fx
import torch


datadir = '/content/drive/MyDrive/dataset/Cityscapes'

def load_my_state_dict(model, state_dict):  #custom function to load model when not all dict elements
        own_state = model.state_dict()
        for name, param in state_dict.items():
            if name not in own_state:
                if name.startswith("module."):
                    own_state[name.split("module.")[-1]].copy_(param)
                else:
                    print(name, " not loaded")
                    continue
            else:
                own_state[name].copy_(param)
        return model

def load_my_quant_fx_state_dict(filepath,device='cpu',printing=False):
    model = MidENet(num_classes=20)
    model.eval()
    qconfig_opt = get_default_qconfig("x86")

    qconfig_mapping = QConfigMapping().set_global(qconfig_opt).set_object_type(
                                      torch.nn.ConvTranspose2d, get_default_qconfig("qnnpack")
                                  )  # qconfig_opt is an optional qconfig, either a valid qconfig or None
    dataloader = get_cityscapes_loader(datadir, 1, 'val',num_workers=2,size = 256)
    example_inputs = dataloader.dataset[0][0].unsqueeze(0)
    model = prepare_fx(model, qconfig_mapping, example_inputs)
    if printing:
        print('model.graph: ')
        print(model.graph)
    model = convert_fx(model)
    if printing:
        print('model: ')
        print(model)
    if device != 'cuda':
        model.to(device)
        model.load_state_dict(torch.load(filepath, map_location=device))
    else:
        model.load_state_dict(torch.load(filepath))
    if printing:
      print("model loaded successfully")

    return model


# Knowledge Distillation


In [None]:
import torch
import torch.nn.functional as F
from torch.optim import Adam

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

teacher_model = ENet(num_classes=20).to(device)
student_model = MidENet(num_classes=20).to(device)

teacher_model = load_my_state_dict(teacher_model,torch.load('/content/drive/MyDrive/trained_models/Enet/enet_finetuned.pth'))
student_model = load_my_state_dict(student_model,torch.load('/content/midenet_last_2.pth'))

teacher_model.eval()
for param in teacher_model.parameters():
    param.requires_grad = False

optimizer = Adam(student_model.parameters(), lr=1e-2)

T = 4.0
alpha = 0.5

def distillation_loss(student_logits, teacher_logits, target, alpha=0.5, T=4.0):
    loss_kd = F.kl_div(
        F.log_softmax(student_logits / T, dim=1),
        F.softmax(teacher_logits / T, dim=1),
        reduction='batchmean'
    ) * (T * T)
    loss_ce = F.cross_entropy(student_logits, target)
    return alpha * loss_ce + (1 - alpha) * loss_kd

def train_epoch(student_model, teacher_model, dataloader, optimizer, device):
    student_model.train()
    teacher_model.eval()
    i=0
    total_loss = 0
    for images, labels in dataloader:
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        with torch.no_grad():
            teacher_logits = teacher_model(images)

        student_logits = student_model(images)

        loss = distillation_loss(student_logits, teacher_logits, labels, alpha, T)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        i+=1
        if i%100 == 0 or i%100 == 1 or i%100 == 2:
          print("step: ", i)
        if i >= 1200:
          break

    return total_loss / len(dataloader)

def validate(student_model, dataloader, device):
    student_model.eval()
    correct = 0
    i=0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:

            images = images.to(device)
            labels = labels.to(device)

            outputs = student_model(images)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            i+=1
            if i%100 == 0:
              print("validation step: ", i)
            if i >= 1200:
              break

    return correct / total
print("loading loaders ...")
train_loader = get_cityscapes_loader('/content/drive/MyDrive/dataset/Cityscapes', batch_size=1,num_workers=2, subset='train')
val_loader = get_cityscapes_loader('/content/drive/MyDrive/dataset/Cityscapes', batch_size=1,num_workers=2, subset='val')

print("starting training...")
# Ciclo di training
num_epochs = 20
for epoch in range(num_epochs):
    train_loss = train_epoch(student_model, teacher_model, train_loader, optimizer, device)
    val_acc = validate(student_model, val_loader, device)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f} - Val Accuracy: {val_acc:.4f}")


loading loaders ...
starting training...
step:  1
step:  2
step:  100
step:  101
step:  102
step:  200
step:  201
step:  202
step:  300
step:  301
step:  302
step:  400
step:  401
step:  402
step:  500
step:  501
step:  502
step:  600
step:  601
step:  602
step:  700
step:  701
step:  702
step:  800
step:  801
step:  802
step:  900
step:  901
step:  902
step:  1000
step:  1001
step:  1002
step:  1100
step:  1101
step:  1102
step:  1200
validation step:  100
validation step:  200
validation step:  300
validation step:  400
validation step:  500
Epoch 1/20 - Train Loss: 39307.8051 - Val Accuracy: 91622.6780
step:  1
step:  2
step:  100
step:  101
step:  102
step:  200
step:  201
step:  202
step:  300
step:  301
step:  302
step:  400
step:  401
step:  402
step:  500
step:  501
step:  502
step:  600
step:  601
step:  602
step:  700
step:  701
step:  702
step:  800
step:  801
step:  802
step:  900
step:  901
step:  902
step:  1000
step:  1001
step:  1002
step:  1100
step:  1101
step:  1102


In [None]:
torch.save(student_model.state_dict(), "midenet_last_3.pth")

# Pruning

In [None]:
model = MidENet(20)
model = load_my_state_dict(model,torch.load('/content/drive/MyDrive/trained_models/Enet/midenet_last.pth'))

In [None]:
for name, module in model.named_modules():
    if isinstance(module, torch.nn.Conv2d):  # Prune solo i layer Conv2d
        prune.ln_structured(module, name='weight', amount=0.3, n=2,dim=0)  # Rimuovi il 30% dei pesi meno significativi
for name, module in model.named_modules():
    if isinstance(module, torch.nn.Conv2d):
        prune.remove(module, 'weight')  # Rimuovi la maschera


In [None]:
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast

# Definizione della funzione di perdita e ottimizzatore
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
#optimizer = optim.Adam(model.parameters()[-1], lr=1e-4)




device = 'cuda'
datadir = '/content/drive/MyDrive/dataset/Cityscapes'
dataloader = get_cityscapes_loader(datadir, 1, 'train',num_workers=2,size = 256)
#scaler = GradScaler('cuda')

model.to(device)
model.train()
for epoch in range(10):
    i=0

    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        i+=1
        if i%100 == 0:
          print("step: ", i)
        if i >= 1200:
          break
        # Forward pass
        #with autocast():
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward pass
        """scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()"""
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


step:  100
step:  200
step:  300
step:  400
step:  500
step:  600
step:  700
step:  800
step:  900
step:  1000
step:  1100
step:  1200
Epoch 1, Loss: 3.550808906555176
step:  100
step:  200
step:  300
step:  400
step:  500
step:  600
step:  700
step:  800
step:  900
step:  1000
step:  1100
step:  1200
Epoch 2, Loss: 3.53474760055542
step:  100
step:  200
step:  300
step:  400
step:  500
step:  600
step:  700
step:  800
step:  900
step:  1000
step:  1100
step:  1200
Epoch 3, Loss: 3.503751754760742
step:  100
step:  200
step:  300
step:  400
step:  500
step:  600
step:  700
step:  800
step:  900
step:  1000
step:  1100
step:  1200
Epoch 4, Loss: 3.471660614013672
step:  100
step:  200
step:  300
step:  400
step:  500
step:  600
step:  700
step:  800
step:  900
step:  1000
step:  1100
step:  1200
Epoch 5, Loss: 3.446265697479248
step:  100
step:  200
step:  300
step:  400
step:  500
step:  600
step:  700
step:  800
step:  900
step:  1000
step:  1100
step:  1200
Epoch 6, Loss: 3.418218135

In [None]:
torch.save(model.state_dict(), '/content/drive/MyDrive/trained_models/Enet/midenet_30%.pth')


# FXQ


In [7]:
Original_model_path = '/content/drive/MyDrive/trained_models/Enet/midenet_pruned_last.pth'


Original_model = MidENet(num_classes=20)
Original_model.to('cpu')

load_my_state_dict(Original_model,torch.load(Original_model_path,map_location=torch.device('cpu')))

model_to_quantize = MidENet(num_classes=20)
model_to_quantize.to('cpu')

load_my_state_dict(model_to_quantize,torch.load(Original_model_path,map_location=torch.device('cpu')))

dataloader = get_cityscapes_loader(datadir, 1, 'val',num_workers=2,size = 256)

model_to_quantize.eval()


MidENet(
  (initial): DownsampleBlock(
    (conv): Conv2d(3, 13, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (bn): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU()
  )
  (bottleneck1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(16, 4, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(4, 4, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(4, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
      (match_channels): Conv2d(16, 64, kernel_size=(1, 1), stride=(2, 2), bias=Fa

In [None]:


qconfig_opt = get_default_qconfig("x86")

qconfig_mapping = QConfigMapping().set_global(qconfig_opt)


In [9]:
example_inputs = dataloader.dataset[0][0].unsqueeze(0)
prepared_model = prepare_fx(model_to_quantize, qconfig_mapping, example_inputs)
print(prepared_model.graph)

graph():
    %x : [num_users=1] = placeholder[target=x]
    %activation_post_process_0 : [num_users=2] = call_module[target=activation_post_process_0](args = (%x,), kwargs = {})
    %initial_conv : [num_users=1] = call_module[target=initial.conv](args = (%activation_post_process_0,), kwargs = {})
    %activation_post_process_1 : [num_users=1] = call_module[target=activation_post_process_1](args = (%initial_conv,), kwargs = {})
    %initial_pool : [num_users=1] = call_module[target=initial.pool](args = (%activation_post_process_0,), kwargs = {})
    %activation_post_process_2 : [num_users=1] = call_module[target=activation_post_process_2](args = (%initial_pool,), kwargs = {})
    %cat : [num_users=1] = call_function[target=torch.cat](args = ([%activation_post_process_1, %activation_post_process_2],), kwargs = {dim: 1})
    %activation_post_process_3 : [num_users=1] = call_module[target=activation_post_process_3](args = (%cat,), kwargs = {})
    %initial_bn : [num_users=1] = call_module[



In [10]:

def calibrate(model, data_loader):
    step = 0
    model.eval()
    with torch.no_grad():
        for image, target in data_loader:
            model(image)

            step += 1
            print(f"Step {step}/{len(data_loader)}")


calibrate(prepared_model, dataloader)

Step 1/500
Step 2/500
Step 3/500
Step 4/500
Step 5/500
Step 6/500
Step 7/500
Step 8/500
Step 9/500
Step 10/500
Step 11/500
Step 12/500
Step 13/500
Step 14/500
Step 15/500
Step 16/500
Step 17/500
Step 18/500
Step 19/500
Step 20/500
Step 21/500
Step 22/500
Step 23/500
Step 24/500
Step 25/500
Step 26/500
Step 27/500
Step 28/500
Step 29/500
Step 30/500
Step 31/500
Step 32/500
Step 33/500
Step 34/500
Step 35/500
Step 36/500
Step 37/500
Step 38/500
Step 39/500
Step 40/500
Step 41/500
Step 42/500
Step 43/500
Step 44/500
Step 45/500
Step 46/500
Step 47/500
Step 48/500
Step 49/500
Step 50/500
Step 51/500
Step 52/500
Step 53/500
Step 54/500
Step 55/500
Step 56/500
Step 57/500
Step 58/500
Step 59/500
Step 60/500
Step 61/500
Step 62/500
Step 63/500
Step 64/500
Step 65/500
Step 66/500
Step 67/500
Step 68/500
Step 69/500
Step 70/500
Step 71/500
Step 72/500
Step 73/500
Step 74/500
Step 75/500
Step 76/500
Step 77/500
Step 78/500
Step 79/500
Step 80/500
Step 81/500
Step 82/500
Step 83/500
Step 84/500
S

In [11]:
quantized_model = convert_fx(prepared_model)
print(quantized_model)

GraphModule(
  (initial): Module(
    (conv): QuantizedConv2d(3, 13, kernel_size=(3, 3), stride=(2, 2), scale=0.6466743350028992, zero_point=84, padding=(1, 1), bias=False)
    (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (bn): QuantizedBNReLU2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (bottleneck1): Module(
    (0): Module(
      (conv1): QuantizedConvReLU2d(16, 4, kernel_size=(1, 1), stride=(1, 1), scale=0.37500613927841187, zero_point=0)
      (conv2): QuantizedConvReLU2d(4, 4, kernel_size=(3, 3), stride=(2, 2), scale=0.36437860131263733, zero_point=0, padding=(1, 1))
      (conv3): QuantizedConv2d(4, 64, kernel_size=(1, 1), stride=(1, 1), scale=0.7879742980003357, zero_point=76)
      (match_channels): QuantizedConv2d(16, 64, kernel_size=(1, 1), stride=(2, 2), scale=1.2675620317459106, zero_point=100, bias=False)
    )
    (1): Module(
      (conv1): QuantizedConvReLU2d(64, 16, kernel_size=(1, 1), stride=(1,

In [12]:
fx_graph_mode_model_file_path = '/content/drive/MyDrive/trained_models/Enet/midenet_quantized_fx_last.pth'

#torch.save(quantized_model,'model_'+ fx_graph_mode_model_file_path)
torch.save(quantized_model.state_dict(),  fx_graph_mode_model_file_path)

# Eval


In [37]:
# Code for evaluating IoU
# Nov 2017
# Eduardo Romera
#######################


class iouEval:

    # the ignoreIndex is the class that will be ignored for the evaluation (20th class in the case of Cityscapes)
    def __init__(self, nClasses, ignoreIndex=19):
        self.nClasses = nClasses
        self.ignoreIndex = ignoreIndex if nClasses>ignoreIndex else -1 #if ignoreIndex is larger than nClasses, consider no ignoreIndex
        self.reset()

    def reset (self):
        classes = self.nClasses if self.ignoreIndex==-1 else self.nClasses-1
        self.tp = torch.zeros(classes).double()
        self.fp = torch.zeros(classes).double()
        self.fn = torch.zeros(classes).double()

    def addBatch(self, x, y):   #x=preds, y=targets
        #sizes should be "batch_size x nClasses x H x W"

        #print ("X size: ", x.size())
        #print ("Y size: ", y.size())

        #print ("X is cuda: ", x.is_cuda)
        #print ("Y is cuda: ", y.is_cuda)

        #print("Nclasses: ", self.nClasses)

        if (x.is_cuda or y.is_cuda):
            x = x.cuda()
            y = y.cuda()

        #if size is "batch_size x 1 x H x W" scatter to onehot
        if (x.size(1) == 1):
            x_onehot = torch.zeros(x.size(0), self.nClasses, x.size(2), x.size(3))
            if x.is_cuda:
                x_onehot = x_onehot.cuda()
            x_onehot.scatter_(1, x, 1).float()
        else:
            x_onehot = x.float()

        if (y.size(1) == 1):
            y_onehot = torch.zeros(y.size(0), self.nClasses, y.size(2), y.size(3))
            if y.is_cuda:
                y_onehot = y_onehot.cuda()
            y_onehot.scatter_(1, y, 1).float()
        else:
            y_onehot = y.float()

        if (self.ignoreIndex != -1):
            ignores = y_onehot[:,self.ignoreIndex].unsqueeze(1)
            x_onehot = x_onehot[:, :self.ignoreIndex]
            y_onehot = y_onehot[:, :self.ignoreIndex]
        else:
            ignores=0

        tpmult = x_onehot * y_onehot    #times prediction and gt coincide is 1
        tp = torch.sum(torch.sum(torch.sum(tpmult, dim=0, keepdim=True), dim=2, keepdim=True), dim=3, keepdim=True).squeeze()
        fpmult = x_onehot * (1-y_onehot-ignores) #times prediction says its that class and gt says its not (subtracting cases when its ignore label!)
        fp = torch.sum(torch.sum(torch.sum(fpmult, dim=0, keepdim=True), dim=2, keepdim=True), dim=3, keepdim=True).squeeze()
        fnmult = (1-x_onehot) * (y_onehot) #times prediction says its not that class and gt says it is
        fn = torch.sum(torch.sum(torch.sum(fnmult, dim=0, keepdim=True), dim=2, keepdim=True), dim=3, keepdim=True).squeeze()

        self.tp += tp.double().cpu()
        self.fp += fp.double().cpu()
        self.fn += fn.double().cpu()

    def getIoU(self):
        num = self.tp
        den = self.tp + self.fp + self.fn + 1e-15
        iou = num / den
        return torch.mean(iou), iou     #returns "iou mean", "iou per class"

# Class for colors
class colors:
    RED       = '\033[31;1m'
    GREEN     = '\033[32;1m'
    YELLOW    = '\033[33;1m'
    BLUE      = '\033[34;1m'
    MAGENTA   = '\033[35;1m'
    CYAN      = '\033[36;1m'
    BOLD      = '\033[1m'
    UNDERLINE = '\033[4m'
    ENDC      = '\033[0m'

# Colored value output if colorized flag is activated.
def getColorEntry(val):
    if not isinstance(val, float):
        return colors.ENDC
    if (val < .20):
        return colors.RED
    elif (val < .40):
        return colors.YELLOW
    elif (val < .60):
        return colors.BLUE
    elif (val < .80):
        return colors.CYAN
    else:
        return colors.GREEN

In [38]:
# Code to calculate IoU (mean and per-class) in a dataset
# Nov 2017
# Eduardo Romera
#######################

import torch
import time
from PIL import Image

from torch.autograd import Variable


# verificare come utilizzare il parametro method

def eval_iou(model, datadir, cpu, num_classes, ignoreIndex=19):

    # load the dataset
    bs = 10
    loader = get_cityscapes_loader(datadir, bs, 'val')

    # create the IoU evaluator
    iouEvalVal = iouEval(num_classes, ignoreIndex=ignoreIndex)

    # start the timer used for the prints
    start = time.time()

    if cpu:
        model.to('cpu') # Move model to CPU
        #model = model.to(torch.float32) # Move to float32 for CPU
    else:
        model.to('cuda')

    # start the timer used for the prints
    start = time.time()



    for step, (images, labels) in enumerate(loader):

        # if the cpu flag is not set, move the data to the gpu
        if cpu:
            images = images.cpu()
            labels = labels.cpu()
        else:
            images = images.cuda()
            labels = labels.cuda()

        # launch the model with the images as input while disabling gradient computation
        inputs = Variable(images)
        if cpu:
             inputs = inputs[:, :3, :, :]
        dev = next(model.parameters()).device
        if dev == 'cpu':
          print(f"Il modello si trova su: {dev}")
          print(f"Il input si trova su: { inputs.device}")
          print(f"Il label si trova su: {labels.device}")


        #model.eval()
        with torch.no_grad():
          model.eval()
          out = model(inputs)



        # get the max logit value for each pixel
        outputs = out.max(1)[1].unsqueeze(1).data
        labels = labels.unsqueeze(1).data

        # add the batch to the IoU evaluator
        iouEvalVal.addBatch(outputs, labels)

        # print the filename of the image
        #filenameSave = filename[0].split("leftImg8bit/")[1]
        #print (step, filenameSave)

        if step in [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]:
           #print_output(out[0, :, :, :], filename[0].split("leftImg8bit/")[1])
           pass

    # get the IoU results
    iouVal, iou_classes = iouEvalVal.getIoU()

    iou_classes_str = []

    for i in range(iou_classes.size(0)):
        iouStr = getColorEntry(iou_classes[i])+'{:0.2f}'.format(iou_classes[i]*100) + '\033[0m'
        iou_classes_str.append(iouStr)

    print("---------------------------------------")
    print("Took ", time.time()-start, "seconds -> ", len(loader)*bs/(time.time()-start), "images/seconds")
    print("=======================================")
    #print("TOTAL IOU: ", iou * 100, "%")
    print("Per-Class IoU:")
    print(iou_classes_str[0], "Road")
    print(iou_classes_str[1], "sidewalk")
    print(iou_classes_str[2], "building")
    print(iou_classes_str[3], "wall")
    print(iou_classes_str[4], "fence")
    print(iou_classes_str[5], "pole")
    print(iou_classes_str[6], "traffic light")
    print(iou_classes_str[7], "traffic sign")
    print(iou_classes_str[8], "vegetation")
    print(iou_classes_str[9], "terrain")
    print(iou_classes_str[10], "sky")
    print(iou_classes_str[11], "person")
    print(iou_classes_str[12], "rider")
    print(iou_classes_str[13], "car")
    print(iou_classes_str[14], "truck")
    print(iou_classes_str[15], "bus")
    print(iou_classes_str[16], "train")
    print(iou_classes_str[17], "motorcycle")
    print(iou_classes_str[18], "bicycle")
    if ignoreIndex == -1:
        print(iou_classes_str[19], "void")
    print("=======================================")
    iouStr = getColorEntry(iouVal)+'{:0.2f}'.format(iouVal*100) + '\033[0m'
    print ("MEAN IoU: ", iouStr, "%")

    return iouVal

In [None]:
#import torch.quantization as quant
#import torch.nn as nn
#import torch.nn.utils.prune as prune
device = 'cpu'

enet = ENet(num_classes=20)
enet= load_my_state_dict(enet,torch.load('/content/drive/MyDrive/trained_models/Enet/enet_finetuned.pth',map_location=torch.device(device)))
enet = enet.to(device) # Assign the original model to the correct variable

midenet = MidENet(num_classes=20)
midenet = load_my_state_dict(midenet,torch.load('/content/drive/MyDrive/trained_models/Enet/midenet_pruned_last.pth',map_location=torch.device(device)))
midenet = midenet.to(device)

midenet_pq = MidENet(num_classes=20)
midenet_pq = load_my_quant_fx_state_dict('/content/drive/MyDrive/trained_models/Enet/midenet_quantized_fx_last.pth')

inputs = (torch.randn(1, 3, 512, 1024) + torch.ones(1,3,512,1024)).to('cpu')  # Esempio di input




In [36]:
#calculate FLOPS, Memory usage, dimension,

#dimension
# Salva i modelli
torch.save(enet.state_dict(), 'EvalDimensionOriginal.pth')
torch.save(midenet_pq.state_dict(), 'EvalDimensionQuantized.pth')
torch.save(midenet.state_dict(), 'EvalDimensionMini.pth')

# Misura le dimensioni
size_original = os.path.getsize('EvalDimensionOriginal.pth') / (1024 ** 2)  # Dimensioni in MB
size_pruned = os.path.getsize('EvalDimensionQuantized.pth') / (1024 ** 2)   #doesn't take count of the zeros
size_final = os.path.getsize('EvalDimensionMini.pth') / (1024 ** 2)

print(f"ENet Size: {size_original:.2f} MB")
print(f"Quantized MidENet Size: {size_pruned:.2f} MB")
print(f"Mid ENet Size: {size_final:.2f} MB")

ENet Size: 1.60 MB
Quantized MidENet Size: 0.12 MB
Mid ENet Size: 0.28 MB


In [17]:


datadir = '/content/drive/MyDrive/dataset/Cityscapes'

iou_M = eval_iou(midenet_pq,datadir,cpu=True,num_classes=20,ignoreIndex = 19)

  cat = torch.cat([initial_conv, initial_pool], dim = 1);  initial_conv = initial_pool = None


---------------------------------------
Took  161.46820163726807 seconds
Per-Class IoU:
[0m56.72[0m Road
[0m26.07[0m sidewalk
[0m50.05[0m building
[0m0.25[0m wall
[0m4.16[0m fence
[0m3.96[0m pole
[0m0.00[0m traffic light
[0m6.34[0m traffic sign
[0m57.08[0m vegetation
[0m16.52[0m terrain
[0m7.39[0m sky
[0m4.50[0m person
[0m0.00[0m rider
[0m27.98[0m car
[0m0.01[0m truck
[0m0.02[0m bus
[0m0.00[0m train
[0m0.00[0m motorcycle
[0m5.51[0m bicycle
MEAN IoU:  [0m14.03[0m %


In [None]:
device = 'cuda'

midenet = MidENet(num_classes=20)
midenet = load_my_state_dict(midenet,torch.load('/content/midenet_last_2.pth',map_location=torch.device(device)))
midenet = midenet.to(device)

datadir = '/content/drive/MyDrive/dataset/Cityscapes'

iou_M = eval_iou(midenet,datadir,cpu=False,num_classes=20,ignoreIndex = 19)



---------------------------------------
Took  54.185189962387085 seconds
Per-Class IoU:
[0m86.42[0m Road
[0m44.42[0m sidewalk
[0m69.76[0m building
[0m4.08[0m wall
[0m2.13[0m fence
[0m4.68[0m pole
[0m0.00[0m traffic light
[0m10.96[0m traffic sign
[0m74.15[0m vegetation
[0m25.84[0m terrain
[0m70.27[0m sky
[0m24.64[0m person
[0m0.00[0m rider
[0m54.86[0m car
[0m0.16[0m truck
[0m0.13[0m bus
[0m0.00[0m train
[0m0.00[0m motorcycle
[0m5.02[0m bicycle
MEAN IoU:  [0m25.13[0m %


In [26]:
# prompt: calcualte number of flops using flopconutanalysys library

# Install flopcoountanalysis if not already installed
try:

    from fvcore.nn import FlopCountAnalysis, flop_count_table
except ImportError:
    !pip install fvcore

    from fvcore.nn import FlopCountAnalysis, flop_count_table


# Assuming 'enet', 'midenet', and 'midenet_pq' are already defined and loaded as per the preceding code.
# Assuming 'inputs' is already defined as a sample input tensor.

# Calculate FLOPs, IOPS, and parameters for each model
enet_stats = FlopCountAnalysis(enet, inputs)
midenet_stats = FlopCountAnalysis(midenet, inputs)

print(f"ENet FLOPs: {enet_stats.total()}")
print(f"MidENet FLOPs: {midenet_stats.total()}")



#flop_count_table(enet_stats)
print(flop_count_table(midenet_stats, show_param_shapes = True))

"""midenet_pq_stats = FlopCountAnalysis(midenet_pq, inputs)

# Print total parameters for each model
print(f"midenetpq FLOPS:{midenet_pq_stats.total()}")"""





# ENet FLOPs: 4,266,524,672

# MidENet FLOPs: 2,450,391,040



ENet FLOPs: 4456054784




MidENet FLOPs: 2498363392
| module                          | #parameters or shape   | #flops     |
|:--------------------------------|:-----------------------|:-----------|
| model                           | 63.155K                | 2.498G     |
|  initial                        |  0.383K                |  56.492M   |
|   initial.conv                  |   0.351K               |   46.006M  |
|    initial.conv.weight          |    (13, 3, 3, 3)       |            |
|   initial.bn                    |   32                   |   10.486M  |
|    initial.bn.weight            |    (16,)               |            |
|    initial.bn.bias              |    (16,)               |            |
|  bottleneck1                    |  10.72K                |  0.385G    |
|   bottleneck1.0                 |   1.632K               |   68.813M  |
|    bottleneck1.0.conv1          |    64                  |    8.389M  |
|    bottleneck1.0.bn1            |    8                   |    2.621M  |
|    bottlen

'midenet_pq_stats = FlopCountAnalysis(midenet_pq, inputs)\n\n# Print total parameters for each model\nprint(f"midenetpq FLOPS:{midenet_pq_stats.total()}")'

In [23]:
import os
import glob
import random
import torch
import numpy as np
from ood_metrics import fpr_at_95_tpr
from sklearn.metrics import average_precision_score
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm
from torchvision.transforms import Compose, Resize, ToTensor


# *********************************************************************************************************************

def get_anomaly_score(result, method='MSP'):

    if method == 'MSP':
        probabilities = F.softmax(result, dim=1)
        retval = 1 - np.max(probabilities.squeeze(0).data.cpu().numpy(), axis=0)
        return retval

    elif method == 'MaxEntropy':
        probabilities = F.softmax(result, dim=1)
        entropy = - np.sum(probabilities.squeeze(0).data.cpu().numpy() * np.log(probabilities.squeeze(0).data.cpu().numpy() + 1e-10), axis=0)
        return entropy

    elif method == 'MaxLogit':
        retval = - np.max(result.squeeze(0).data.cpu().numpy(), axis=0)
        return retval

    elif method == 'VoidClass':
        probabilities = F.softmax(result, dim=1)
        retval = probabilities.squeeze(0).data.cpu().numpy()[-1, :, :]
        return retval

# ********************************************************************************************************************



def evalAnomaly(dataset_dir, model, method, print_images=0, imagesize=(512, 1024), cpu=False):

    input_transform = Compose([Resize(imagesize, Image.BILINEAR), ToTensor()])
    target_transform = Compose([Resize(imagesize, Image.NEAREST), ToLabel()]) #transform label 255 (ignore label) to 19

    # crea due liste vuote dove salvare i risultati
    ood_gts_list = []
    anomaly_score_list = []

    path_list = glob.glob(dataset_dir)

    if print_images != 0:
        print_index = random.sample(range(len(path_list)), print_images)

    # for each path in the input path list (glob.glob returns a list of paths expanding the * wildcard)
    for step, path in enumerate(tqdm(path_list,desc = f"evaluating {dataset_dir}")):

        image = input_transform(Image.open(path).convert('RGB')).unsqueeze(0).float()

        if not cpu:
            image = image.cuda()
        # launches the model with the image as input while disabling gradient computation (saves memory and computation time)
        with torch.no_grad():
            # result size is 1 x 20 x H x W
            # the model returns for each pixel the logits for each class
            result = model(image)

        # calculates the anomaly score using the method specified
        # anomaly_result size is H x W
        # the anomaly score is a measure of confident the model is about the prediction
        # a high anomaly score means the pixel might represent an object class out of the distribution
        anomaly_result = get_anomaly_score(result, method)

        if print_images != 0 and step in print_index:
            pass#print_anomaly(anomaly_result, path)

        # creates the path for the ground truth mask
        pathGT = path.replace("images", "labels_masks")

        # corrects the ground truth format if different from the images
        if "RoadObsticle21" in pathGT :
           pathGT = pathGT.replace("webp", "png")
        if "fs_static" in pathGT:
           pathGT = pathGT.replace("jpg", "png")
        if "RoadAnomaly" in pathGT:
           pathGT = pathGT.replace("jpg", "png")

        # opens the ground truth mask image and converts it to a numpy tensor
        mask = target_transform(Image.open(pathGT))
        # ood_gts stands for out-of-distribution ground truths
        # the ground truth mask highlights the pixels that are not part of any class
        ood_gts = np.array(mask)

        # corrects the gray scale values of the ground truth mask (???)
        if "RoadAnomaly" in pathGT:
            ood_gts = np.where((ood_gts==2), 1, ood_gts)
        if "LostAndFound" in pathGT:
            ood_gts = np.where((ood_gts==0), 255, ood_gts)
            ood_gts = np.where((ood_gts==1), 0, ood_gts)
            ood_gts = np.where((ood_gts>1)&(ood_gts<201), 1, ood_gts)
        if "Streethazard" in pathGT:
            ood_gts = np.where((ood_gts==14), 255, ood_gts)
            ood_gts = np.where((ood_gts<20), 0, ood_gts)
            ood_gts = np.where((ood_gts==255), 1, ood_gts)

        # checks if the ground truth mask contains at least one pixel with value 1
        if 1 not in np.unique(ood_gts):
            continue
        else:
            # if the ground truth contains an anomaly, appends the ground truth mask and the anomaly score to the lists
            ood_gts_list.append(ood_gts)
            anomaly_score_list.append(anomaly_result)

        # releases the memory used by the result, anomaly_result, ood_gts and mask tensors
        del result, anomaly_result, ood_gts, mask
        torch.cuda.empty_cache()

    print(f'Number of images: {len(ood_gts_list)}')

    # creates two numpy tensor from the lists
    ood_gts_np = np.array(ood_gts_list)
    anomaly_scores_np = np.array(anomaly_score_list)

    # creates two boolean lists of masks for the out-of-distribution and in-distribution ground truths
    ood_mask = (ood_gts_np == 1)
    ind_mask = (ood_gts_np == 0)

    # creates two lists filtering anomaly scores for the out-of-distribution and in-distribution ground truths
    ood_out = anomaly_scores_np[ood_mask]
    ind_out = anomaly_scores_np[ind_mask]

    # creates two lists of labes
    ood_label = np.ones(len(ood_out))
    ind_label = np.zeros(len(ind_out))

    # concatenates the lists of anomaly scores and labels
    val_out = np.concatenate((ind_out, ood_out))
    val_label = np.concatenate((ind_label, ood_label))

    # the result is two lists, one for anomaly scores and the other for the labels indicating if the pixel is out-of-distribution or in-distribution
    # both lists are ordered by the label value

    print("Calculating AUPRC and FPR@TPR95...")

    # calculates the AUPRC score and the FPR@TPR95 score
    # both metrics work on anomaly scores and labels because they elaborate the right threshold and separate the two classes
    prc_auc = average_precision_score(val_label, val_out)
    fpr = fpr_at_95_tpr(val_out, val_label)

    print(f'AUPRC score: {prc_auc*100.0}')
    print(f'FPR@TPR95: {fpr*100.0}')
    print("\n")

    return prc_auc, fpr

In [24]:
DatasetDir = {
    "LostFound": "/content/drive/MyDrive/dataset/Validation_Dataset/FS_LostFound_full/images/*.png",
    "FSstatic": "/content/drive/MyDrive/dataset/Validation_Dataset/fs_static/images/*.jpg",
    "RoadAnomaly": "/content/drive/MyDrive/dataset/Validation_Dataset/RoadAnomaly/images/*.jpg",
    "RoadAnomaly21": "/content/drive/MyDrive/dataset/Validation_Dataset/RoadAnomaly21/images/*.png",
    "RoadObstacle21": "/content/drive/MyDrive/dataset/Validation_Dataset/RoadObsticle21/images/*.webp",
              }

datasets =["RoadObstacle21"]# DatasetDir.keys()

mod = midenet_pq
IMAGESIZE = (512, 1024)

for dataset in datasets:
    dataset_string = "Dataset " + dataset
    dataset_dir = DatasetDir[dataset]
    prc_auc, fpr = evalAnomaly(dataset_dir, mod, "VoidClass", cpu=True, imagesize=IMAGESIZE)
    result_string = 'AUPRC score:' + str(prc_auc*100.0) + '\tFPR@TPR95:' + str(fpr*100.0)
    #print(result_string)


  cat = torch.cat([initial_conv, initial_pool], dim = 1);  initial_conv = initial_pool = None
evaluating /content/drive/MyDrive/dataset/Validation_Dataset/RoadObsticle21/images/*.webp: 100%|██████████| 30/30 [00:56<00:00,  1.87s/it]


Number of images: 30
Calculating AUPRC and FPR@TPR95...
AUPRC score: 0.4146223017976108
FPR@TPR95: 99.74885398587686


AUPRC score:0.4146223017976108	FPR@TPR95:99.74885398587686


In [45]:
##inference times

##models = enet, midenet, midenet_pq

datadir = '/content/drive/MyDrive/dataset/Cityscapes'

iou_P = eval_iou(midenet_pq,datadir,cpu=True,num_classes=20,ignoreIndex = 19)


iou_M = eval_iou(midenet,datadir,cpu=True,num_classes=20,ignoreIndex = 19)

enet.to('cpu')
iou_E = eval_iou(enet,datadir,cpu=True,num_classes=20,ignoreIndex = 19)


---------------------------------------
Took  159.3864586353302 seconds ->  3.137028410066859 images/seconds
Per-Class IoU:
[0m56.72[0m Road
[0m26.07[0m sidewalk
[0m50.05[0m building
[0m0.25[0m wall
[0m4.16[0m fence
[0m3.96[0m pole
[0m0.00[0m traffic light
[0m6.34[0m traffic sign
[0m57.08[0m vegetation
[0m16.52[0m terrain
[0m7.39[0m sky
[0m4.50[0m person
[0m0.00[0m rider
[0m27.98[0m car
[0m0.01[0m truck
[0m0.02[0m bus
[0m0.00[0m train
[0m0.00[0m motorcycle
[0m5.51[0m bicycle
MEAN IoU:  [0m14.03[0m %
---------------------------------------
Took  217.76006197929382 seconds ->  2.296104675900188 images/seconds
Per-Class IoU:
[0m82.72[0m Road
[0m37.23[0m sidewalk
[0m64.09[0m building
[0m0.04[0m wall
[0m2.12[0m fence
[0m6.94[0m pole
[0m0.00[0m traffic light
[0m10.94[0m traffic sign
[0m71.10[0m vegetation
[0m25.15[0m terrain
[0m71.51[0m sky
[0m19.21[0m person
[0m0.02[0m rider
[0m52.00[0m car
[0m0.09[0m truck
[0m0.09[0m b