# Semantic segmentation with Unet and Resnet50 backbone

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [3]:
import os
import shutil
import traceback

from IPython.display import Image, display
from keras.utils import load_img
from PIL import ImageOps

import cv2
import numpy as np
import albumentations as A
from albumentations.core.composition import Compose

from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50

import tensorflow as tf
from tensorflow.keras import backend as K

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
IMAGE_PATH = "../data/P8_Cityscapes_leftImg8bit_trainvaltest.zip"
LABEL_PATH = "../data/P8_Cityscapes_gtFine_trainvaltest.zip"
#!unzip -q {IMAGE_PATH} -d /content/drive/MyDrive/Openclassrooms/p8
#!unzip -q {LABEL_PATH} -d /content/drive/MyDrive/Openclassrooms/p8

In [None]:
input_dir = "../data/leftImg8bit"
target_dir = "../data/gtFine"
num_classes = 8
batch_size = 128
directory = ['val', 'test', 'train']

## Préprocessing des images

In [6]:
def images_in_one_folder(base_dir, directories):
    for dir in directories:
        print(f"Directory: {dir}")
        folder_path = os.path.join(base_dir, dir)
        try:
            if not os.path.exists(folder_path):
                print(f"Le chemin {folder_path} n'existe pas.")
                continue
            for folder in os.listdir(folder_path):
                folder_full_path = os.path.join(folder_path, folder)
                try:
                    if os.path.isdir(folder_full_path):
                        for file in os.listdir(folder_full_path):
                            file_full_path = os.path.join(folder_full_path, file)
                            try:
                                shutil.move(file_full_path, folder_path)
                                print(f"Fichier déplacé: {file_full_path} vers {folder_path}")
                            except Exception as e:
                                print(f"Erreur lors du déplacement du fichier {file_full_path}: {e}")
                                traceback.print_exc()
                        try:
                            os.rmdir(folder_full_path)
                            print(f"Dossier supprimé: {folder_full_path}")
                        except Exception as e:
                            print(f"Erreur lors de la suppression du dossier {folder_full_path}: {e}")
                            traceback.print_exc()
                except Exception as e:
                    print(f"Erreur lors du traitement du dossier {folder_full_path}: {e}")
                    traceback.print_exc()
        except Exception as e:
            print(f"Erreur lors du traitement du répertoire {dir}: {e}")
            traceback.print_exc()
    print("Tous les fichiers ont été déplacés.\n")



def clean_directories(base_dir, directories, tag):
    for dir in directories:
        print(f"Directory: {dir}")
        folder_path = os.path.join(base_dir, dir)
        try:
            if not os.path.exists(folder_path):
                print(f"Le chemin {folder_path} n'existe pas.")
                continue
            for file in os.listdir(folder_path):
                file_full_path = os.path.join(folder_path, file)
                try:
                    if os.path.isfile(file_full_path) and tag not in file:
                        os.remove(file_full_path)
                        print(f"Fichier supprimé: {file_full_path}")
                except Exception as e:
                    print(f"Erreur lors du traitement du fichier {file_full_path}: {e}")
                    traceback.print_exc()
        except Exception as e:
            print(f"Erreur lors du traitement du répertoire {dir}: {e}")
            traceback.print_exc()
    print("Nettoyage terminé.\n")



In [7]:
images_in_one_folder(input_dir, directory)
images_in_one_folder(target_dir, directory)
clean_directories(target_dir, directory, 'gtFine_color')

input_img_paths = sorted([os.path.join(input_dir, 'train', img) for img in os.listdir(os.path.join(input_dir, 'train'))])
target_img_paths = sorted([os.path.join(target_dir, 'train', img) for img in os.listdir(os.path.join(target_dir, 'train'))])

print(input_img_paths)
print(target_img_paths)

display(Image(filename=input_img_paths[11]))

if os.path.exists(target_img_paths[11]):
    img = ImageOps.autocontrast(load_img(target_img_paths[11]))
    display(img)
else:
    print(f"Le fichier {target_img_paths[11]} n'existe pas.")

Directory: val
Le chemin /content/drive/MyDrive/Openclassrooms/p8/leftImg8bit\val n'existe pas.
Directory: test
Le chemin /content/drive/MyDrive/Openclassrooms/p8/leftImg8bit\test n'existe pas.
Directory: train
Le chemin /content/drive/MyDrive/Openclassrooms/p8/leftImg8bit\train n'existe pas.
Tous les fichiers ont été déplacés.

Directory: val
Le chemin /content/drive/MyDrive/Openclassrooms/p8/gtFine\val n'existe pas.
Directory: test
Le chemin /content/drive/MyDrive/Openclassrooms/p8/gtFine\test n'existe pas.
Directory: train
Le chemin /content/drive/MyDrive/Openclassrooms/p8/gtFine\train n'existe pas.
Tous les fichiers ont été déplacés.

Directory: val
Le chemin /content/drive/MyDrive/Openclassrooms/p8/gtFine\val n'existe pas.
Directory: test
Le chemin /content/drive/MyDrive/Openclassrooms/p8/gtFine\test n'existe pas.
Directory: train
Le chemin /content/drive/MyDrive/Openclassrooms/p8/gtFine\train n'existe pas.
Nettoyage terminé.



FileNotFoundError: [WinError 3] Le chemin d’accès spécifié est introuvable: '/content/drive/MyDrive/Openclassrooms/p8/leftImg8bit\\train'

In [None]:
# Définir les transformations
transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
])

# Fonction pour appliquer les transformations
def apply_transformations(image_path, label_path):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    label = cv2.imread(label_path, cv2.IMREAD_GRAYSCALE)

    transformed = transform(image=image, mask=label)
    transformed_image = transformed['image']
    transformed_label = transformed['mask']

    return transformed_image, transformed_label

# Appliquer les transformations à vos images
transformed_images = []
transformed_labels = []

for img_path, label_path in zip(input_img_paths, target_img_paths):
    transformed_image, transformed_label = apply_transformations(img_path, label_path)
    transformed_images.append(transformed_image)
    transformed_labels.append(transformed_label)

# Convertir les listes en tableaux numpy
transformed_images = np.array(transformed_images)
transformed_labels = np.array(transformed_labels)

# Diviser les données en ensembles d'entraînement et de validation
x_train, x_val, y_train, y_val = train_test_split(transformed_images, transformed_labels, test_size=0.2, random_state=42)

## Build the model

In [None]:
def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

def build_resnet50_unet(input_shape):
    inputs = Input(input_shape)
    resnet50 = ResNet50(include_top=False, weights="imagenet", input_tensor=inputs)
    s1 = resnet50.get_layer("input_layer").output
    s2 = resnet50.get_layer("conv1_relu").output
    s3 = resnet50.get_layer("conv2_block3_out").output
    s4 = resnet50.get_layer("conv3_block4_out").output
    b1 = resnet50.get_layer("conv4_block6_out").output
    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)
    outputs = Conv2D(num_classes, 1, padding="same", activation="sigmoid")(d4)
    model = Model(inputs, outputs, name="ResNet50_U-Net")
    return model

In [None]:
input_shape = (160, 160, 3)
model = build_resnet50_unet(input_shape)

### Define the metrics

In [None]:
def recall(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_score(y_true, y_pred):
    precision = precision(y_true, y_pred)
    recall = recall(y_true, y_pred)
    return 2 * ((precision * recall) / (precision + recall + K.epsilon()))

def mean_iou(y_true, y_pred):
    y_pred = tf.round(y_pred)
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1, 2, 3])
    union = K.sum(y_true, axis=[1, 2, 3]) + K.sum(y_pred, axis=[1, 2, 3]) - intersection
    iou = K.mean((intersection + K.epsilon()) / (union + K.epsilon()), axis=0)
    return iou

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[recall, precision, f1_score, mean_iou])


### Train the model

In [None]:
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True)


history = model.fit(
    x_train, y_train,
    validation_data=(x_val, y_val),
    batch_size=8,
    epochs=50,
    callbacks=[early_stopping, model_checkpoint],
    verbose=1
)



### Eval the model

In [None]:
results = model.evaluate(x_val, y_val, batch_size=8)
print("Validation Loss:", results[0])
print("Validation Recall:", results[1])
print("Validation Precision:", results[2])
print("Validation F1 Score:", results[3])
print("Validation Mean IoU:", results[4])