In [1]:
import os
import glob
import numpy as np
import random

import matplotlib.pyplot as plt
import imageio.v2 as imageio
import matplotlib.colors as colors

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
from torchvision import transforms as T
from torchvision.transforms import functional as TF
from torch.utils.data import Dataset, DataLoader
from PIL import Image
#from torchsummary import summary
from tqdm import tqdm
#from cityscapesscripts.helpers.labels import trainId2label as t2l
import cv2
#from sklearn.model_selection import train_test_split
from torchmetrics.classification import MulticlassJaccardIndex

In [2]:
from torchvision.models.segmentation import deeplabv3_resnet50, DeepLabV3_ResNet50_Weights

In [3]:
mapping_20 = { 
        0: 0,
        1: 0,
        2: 0,
        3: 0,
        4: 0,
        5: 0,
        6: 0,
        7: 1,
        8: 2,
        9: 0,
        10: 0,
        11: 3,
        12: 4,
        13: 5,
        14: 0,
        15: 0,
        16: 0,
        17: 6,
        18: 0,
        19: 7,
        20: 8,
        21: 9,
        22: 10,
        23: 11,
        24: 12,
        25: 13,
        26: 14,
        27: 15,
        28: 16,
        29: 0,
        30: 0,
        31: 17,
        32: 18,
        33: 19,
        -1: 0
    }

In [24]:
import random

class RandomCropper:

    def __init__(self, crop_size = (256,256)):
        self.crop_size = crop_size

    def __call__(self, sample):
        image = sample['images']
        mask = sample['masks']

        h, w, _ = image.shape
        top = random.randint(0, h - self.crop_size[0])
        left = random.randint(0, w - self.crop_size[1])

        sample['images'] = image[top:top+self.crop_size[0], left:left+self.crop_size[1], :]
        sample['masks'] = mask[top:top+self.crop_size[0], left:left+self.crop_size[1]]

        return sample
    
class HorizontalFlip:

    def __init__(self, keys):
        self.keys = keys

    def __call__(self, sample):
        for key in self.keys:
#                 sample[key] = sample[key].transpose(sample[key].FLIP_LEFT_RIGHT)
                sample[key] = cv2.flip(sample[key], 1)
        return sample

class VerticalFlip:

    def __init__(self, keys):
        self.keys = keys

    def __call__(self, sample):
        for key in self.keys:
#                 sample[key] = sample[key].transpose(sample[key].FLIP_LEFT_RIGHT)
                sample[key] = cv2.flip(sample[key], 0)
        return sample

class RandomRotator:
    def __init__(self, keys):
        self.keys = keys

    def __call__(self, sample):
        for key in self.keys:
            image = sample[key]  # Get the image from the key
            angle = np.random.randint(-30, 30)  # Random angle between -30 and 30 degrees

            # Perform rotation
            h, w = image.shape[:2]
            center = (w // 2, h // 2)
            rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
            rotated_image = cv2.warpAffine(image, rotation_matrix, (w, h))

            sample[key] = rotated_image  # Update the sample with the rotated image

        return sample

class ImageNormalizer():
    def __init__(self, keys):
        self.keys = keys

    def __call__(self, sample):
        for key in self.keys:
            images = sample[key].astype(np.float32)
            images /= 255.0
            images -= images.mean(axis=(0, 1), keepdims=True)
            images /= images.std(axis=(0, 1), keepdims=True)
            sample[key] = images

        return sample
    
import torch

class ToTensor():
    
    def __init__(self,keys):
        self.keys=keys
        
    def __call__(self,sample):
        for key in self.keys:
            sample[key]= torch.tensor(sample[key],dtype=torch.long)
            if key == 'image_paths':
                sample[key] = sample[key].permute(2, 0, 1)
                
        return sample
class OneHotEncodeLabels:
    
    def __init__(self, keys):
        self.keys = keys

    def __call__(self, sample):
        for key in self.keys:
            tens = sample[key]
            tens = torch.where(tens == 255, 20, tens)
            # print(tens.shape)
            # tens_max = torch.argmax(tens, dim=1)
            # print(tens_max)
            sample[key] = torch.nn.functional.one_hot(tens.long(), num_classes=20).float()
            sample[key] = sample[key].permute(2, 0, 1)
            
        return sample

In [23]:
class CityscapesDataset(Dataset):
    def __init__(self, image_dir, label_dir,transforms= None):
        #self.labelcolorpaths=[]
        self.image_dir=image_dir
        self.label_dir=label_dir
        self.transform=transforms
        self.imagepaths=sorted(glob.glob(self.image_dir))
        labelpaths=sorted(glob.glob(self.label_dir))
        self.label_paths=[]
        for img in labelpaths:
            if 'labelIds' in os.path.basename(img):
                self.label_paths.append(img)
     
        
    def __len__(self):
        return len(self.imagepaths)

    def __getitem__(self, idx):
        #print(self.imagepaths[idx])           
        image = Image.open(self.imagepaths[idx]).convert("RGB")
        mask = imageio.imread(self.label_paths[idx])    
        
        
        image = image.resize((512, 256))
        mask = cv2.resize(mask, (512,256))
        
        for i, j in np.ndindex(mask.shape):
            mask[i][j] = mapping_20[mask[i][j]]
        
        if self.transform is not None:
            #image = Image.fromarray(image)
            image = self.transform(image)
            #image = ToTensor()(image)
        
        mask = torch.tensor(mask, dtype=torch.uint8)
        mask = torch.tensor(mask.tolist())
        
#         mask_color = torch.tensor(mask_color, dtype=torch.float32)
#         mask_color = torch.tensor(mask_color.tolist())
#         mask_color = mask_color.permute(2, 0, 1)
        
        return image, mask#, mask_color 

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform=DeepLabV3_ResNet50_Weights.COCO_WITH_VOC_LABELS_V1.transforms()

batch_size = 1
NUM_CLASSES = 20
start_epoch = 0
epochs = 1
learning_rate = 1e-4

In [6]:
transform

SemanticSegmentation(
    resize_size=[520]
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    interpolation=InterpolationMode.BILINEAR
)

In [26]:
train_dir = '/app/duy55/segmentation/cityscapes/Cityspaces/images/train/*/*'
label_train_dir = '/app/duy55/segmentation/cityscapes/Cityspaces/gtFine/train/*/*'

val_dir = '/app/duy55/segmentation/cityscapes/Cityspaces/images/val/*/*'
label_val_dir = '/app/duy55/segmentation/cityscapes/Cityspaces/gtFine/val/*/*'

from torchvision.transforms import Compose, Resize, ToTensor, Normalize


train_dataset = CityscapesDataset(image_dir=train_dir, label_dir=label_train_dir,transforms=transform)
valid_dataset = CityscapesDataset(image_dir=val_dir, label_dir=label_val_dir,transforms=transform)

valid_dataset, test_valid_dataset = torch.utils.data.random_split(valid_dataset, [0.8, 0.2])

test_dataset = test_valid_dataset

#divisione del dataset da aggiustare
#train_dataset.__getitem__(3)    
# Get train and val data loaders
test_loader = DataLoader(test_dataset)
train_loader = DataLoader(train_dataset,batch_size=batch_size, shuffle=True)
val_loader = DataLoader(valid_dataset,batch_size=batch_size, shuffle=True)

print (len(test_dataset))

100


In [28]:
from torchvision.models.segmentation.deeplabv3 import DeepLabHead
from torchvision import models


def createDeepLabv3(outputchannels=20):
    """DeepLabv3 class with custom head

    Args:
        outputchannels (int, optional): The number of output channels
        in your dataset masks. Defaults to 1.

    Returns:
        model: Returns the DeepLabv3 model with the ResNet101 backbone.
    """
    model = models.segmentation.deeplabv3_resnet50(pretrained=True,progress=True)
    model.classifier = DeepLabHead(2048, outputchannels)
    # Set the model in training mode
    #model.train()
    return model

In [29]:
import torch
import torch.optim as optim
import torch.nn.functional as F

class FocalLoss(torch.nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, input, target):
        ce_loss = F.cross_entropy(input, target, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        
        return focal_loss.mean()


In [10]:
import time
#si deve aggiustare IoU
def train(epochs, start_epoch, model, train_loader, val_loader, loss, optimizer, patch=False):
    jaccard_mean = MulticlassJaccardIndex(num_classes=NUM_CLASSES, ignore_index=0).to(device)
    train_losses = []
    test_losses = []
    val_loss = []
    val_iou = []
    train_iou = []
    min_loss = np.inf
    
    fit_time = time.time()
    
    for e in range(start_epoch, epochs):
        since = time.time()
        running_loss = 0
        iou_score = 0
        
        #training loop
        model.train()
        for data, target in tqdm(train_loader):
            #training phase

            mask= target.squeeze()
            data = data.to(device)
            mask = mask.to(device)

            optimizer.zero_grad()
            #forward
            
            output = model(torch.squeeze(data, dim=1))["out"].to(device)
            # Loss and IoU evaluation
            loss = loss_function(output, mask)
            iou_score += jaccard_mean(output, mask).item()
            # backward
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()

        model.eval()
        val_loss = 0
        val_iou_score = 0
        #validation loop
        with torch.no_grad():
            for data, target in tqdm(val_loader):
                output = model(data.to(device))
                mask= target.squeeze()
                mask = mask.to(device)
                
                #Loss and IoU evaluation
                val_iou_score +=  jaccard_mean(output, mask).item()
                loss = loss_function(output, mask)                          
                val_loss += loss.item()
            
        train_losses.append(running_loss/len(train_loader))
        test_losses.append(val_loss/len(val_loader))

        
        if min_loss > (val_loss/len(val_loader)):
            print('Loss Decreasing.. {:.3f} >> {:.3f} '.format(min_loss, (val_loss/len(val_loader))))
            min_loss = (val_loss/len(val_loader))

            print('saving model...')
            torch.save(model.state_dict(), 'UNet_best.pt')


        # Viene salvata la IoU ad ogni epoca sia per il training che per il validation
        val_iou.append(val_iou_score/len(val_loader))
        train_iou.append(iou_score/len(train_loader))

        print("Epoch:{}/{}..".format(e+1, epochs),
              "Train Loss: {:.3f}..".format(running_loss/len(train_loader)),
              "Val Loss: {:.3f}..".format(val_loss/len(val_loader)),
              "Train mIoU:{:.3f}..".format(iou_score/len(train_loader)),
              "Val mIoU: {:.3f}..".format(val_iou_score/len(val_loader)),
              "Time: {:.2f}m".format((time.time()-since)/60))
        print('saving model...')
        checkpoint(model, f"epoch-{e}.pth")
        
    history = {'train_loss' : train_losses, 'val_loss': test_losses,
               'train_miou' :train_iou, 'val_miou':val_iou,
              }
    print('Total time: {:.2f} m' .format((time.time()- fit_time)/60))
    return history

In [11]:
model=createDeepLabv3(outputchannels=20)
model=model.to(device)

Downloading: "https://download.pytorch.org/models/deeplabv3_resnet50_coco-cd0a2569.pth" to /root/.cache/torch/hub/checkpoints/deeplabv3_resnet50_coco-cd0a2569.pth


  0%|          | 0.00/161M [00:00<?, ?B/s]

In [32]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model=createDeepLabv3(outputchannels=20)
model=model.to(device)

loss_function = nn.CrossEntropyLoss(ignore_index=0)
loss_function = loss_function.to(device)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)


In [33]:
history = train(epochs, start_epoch, model, train_loader, val_loader, loss_function, optimizer)  

  0%|          | 0/2975 [00:00<?, ?it/s]


ValueError: Expected more than 1 value per channel when training, got input size torch.Size([1, 256, 1, 1])