# Introducció a la segmentació semàntica

**Professor**: Ramon Mateo Navarro

**Assignatura**: Models d'intel·ligència artificial

**Link** : [Image segmentation with a U-Net-like architecture](https://keras.io/examples/vision/oxford_pets_image_segmentation/)

En aquest notebook explicarem pas a pas un exemple de segmentació semàntica. Per fer-ho seguirem el tutorial "Image segmentation with a U-net-like architecture" que es troba disponible en la pròpia pàgina de Keras. Aquest tutorial ens servirà com a punt d'entrada al món de la segmentació.

## Descarregant les dades

El primer pas és descarregar el dataset. Això automàticament descarrega les dades i les descomprimeix. 

In [None]:
!curl -O https://thor.robots.ox.ac.uk/datasets/pets/images.tar.gz
!curl -O https://thor.robots.ox.ac.uk/datasets/pets/annotations.tar.gz

!tar -xf images.tar.gz
!tar -xf annotations.tar.gz


## Imports

Tots els imports necessaris per la realització d'aquest notebook

In [None]:
import os
import numpy as np
import tensorflow as tf
import random
import matplotlib.pyplot as plt
from IPython.display import Image, display
from keras.utils import load_img
from PIL import ImageOps
from tensorflow import data as tf_data
from tensorflow import image as tf_image
from tensorflow import io as tf_io
from tensorflow.keras import layers
from tensorflow import keras
print(tf.config.list_physical_devices('GPU'))


## Preparació del dataset

Primer indicarem els paths on es troben les imatges i després on es troben les seves segmentacions. Un cop fet fent llistes per compressió obtindrem tots els paths de totes les imatges i les seves respectives màscares. 

In [None]:

input_dir = "images/"
target_dir = "annotations/trimaps/"
img_size = (160, 160)
num_classes = 3
batch_size = 32

input_img_paths = sorted(
    [
        os.path.join(input_dir, fname)
        for fname in os.listdir(input_dir)
        if fname.endswith(".jpg")
    ]
)
target_img_paths = sorted(
    [
        os.path.join(target_dir, fname)
        for fname in os.listdir(target_dir)
        if fname.endswith(".png") and not fname.startswith(".")
    ]
)

print("Number of samples:", len(input_img_paths))

for input_path, target_path in zip(input_img_paths[:10], target_img_paths[:10]):
    print(input_path, "|", target_path)


**ATENCIÓ**, és molt important quan treballem amb aquest tipus de dades i tenim diferents conjunts assegurar-nos que es correspon la X amb la Y . Molts cops les imatges poden no està ben aparellada i fer-nos perdre molt de temps fins que ho detectem. Assegurar-vos sempre de printejar i veure sí l'input és correspont amb el output que desitgeu.

## Visualitzant un exemple

Anem a veure que tenim dins del dataset ara. Primer printejem l'imatge original i posteriorment la seva respectiva màscara.

In [None]:
# Display input image #7
display(Image(filename=input_img_paths[9]))

# Display auto-contrast version of corresponding target (per-pixel categories)
img = ImageOps.autocontrast(load_img(target_img_paths[9]))
display(img)

Veiem tres colors no? Què està passant? Si analitzem la màscara veurem que tenim tres classes diferents. 

In [None]:
np.unique(load_img(target_img_paths[9]))

## Preparació del dataset

In [None]:
def get_dataset(
    batch_size,
    img_size,
    input_img_paths,
    target_img_paths,
    max_dataset_len=None,
):
    """Returns a TF Dataset."""

    def load_img_masks(input_img_path, target_img_path):
        # carrega l'imatge d'entrada
        input_img = tf_io.read_file(input_img_path)
        input_img = tf_io.decode_png(input_img, channels=3)
        input_img = tf_image.resize(input_img, img_size)
        input_img = tf_image.convert_image_dtype(input_img, "float32")

        #carrega la mascara, el nostre target
        target_img = tf_io.read_file(target_img_path)
        target_img = tf_io.decode_png(target_img, channels=1)
        target_img = tf_image.resize(target_img, img_size, method="nearest")
        target_img = tf_image.convert_image_dtype(target_img, "uint8")

        # Ground truth labels are 1, 2, 3. Subtract one to make them 0, 1, 2:
        target_img -= 1
        return input_img, target_img

    # For faster debugging, limit the size of data
    if max_dataset_len:
        input_img_paths = input_img_paths[:max_dataset_len]
        target_img_paths = target_img_paths[:max_dataset_len]
    dataset = tf_data.Dataset.from_tensor_slices((input_img_paths, target_img_paths))
    dataset = dataset.map(load_img_masks, num_parallel_calls=tf_data.AUTOTUNE)
    return dataset.batch(batch_size)

## Definició del model

Anem a definir ara el model. Recordem que volem implementar un model com U-net. Aquest model té moltes variants i diferents arquitectures. Podeu provar diferents models sobre el mateix dataset i veure quin us genera un millor resultat.

In [None]:

def get_model(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (3,))

    ### [First half of the network: downsampling inputs] ###

    # Entry block
    x = layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    # Blocks 1, 2, 3 are identical apart from the feature depth.
    for filters in [64, 128, 256]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(filters, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    ### [Second half of the network: upsampling inputs] ###

    for filters in [256, 128, 64, 32]:
        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.UpSampling2D(2)(x)

        # Project residual
        residual = layers.UpSampling2D(2)(previous_block_activation)
        residual = layers.Conv2D(filters, 1, padding="same")(residual)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # Add a per-pixel classification layer
    outputs = layers.Conv2D(num_classes, 3, activation="softmax", padding="same")(x)

    # Define the model
    model = keras.Model(inputs, outputs)
    return model


# Build model
model = get_model(img_size, num_classes)
model.summary()

## Creació del dataset de Test i validació

Ara toca separar el dataset en train i validació per podre entrenar el nostre model

In [None]:
val_samples = 1000
random.Random(1337).shuffle(input_img_paths)
random.Random(1337).shuffle(target_img_paths)
train_input_img_paths = input_img_paths[:-val_samples]
train_target_img_paths = target_img_paths[:-val_samples]
val_input_img_paths = input_img_paths[-val_samples:]
val_target_img_paths = target_img_paths[-val_samples:]

# Instantiate dataset for each split
# Limit input files in `max_dataset_len` for faster epoch training time.
# Remove the `max_dataset_len` arg when running with full dataset.
train_dataset = get_dataset(
    batch_size,
    img_size,
    train_input_img_paths,
    train_target_img_paths,
    max_dataset_len=1000,
)
valid_dataset = get_dataset(
    batch_size, img_size, val_input_img_paths, val_target_img_paths
)

## Entrenament del model

Ara toca entrenar el model. Per aquesta primera iteració es seteja com a funció de pèrdua la Sparse Categorical Cross Entropy ja que recordem que no ho tenim setejat en one hot encoding. Com a optimitzador farem servir Adam. 

In [None]:
def mean_iou(y_true, y_pred):
    y_pred = tf.argmax(y_pred, axis=-1)
    y_pred = tf.cast(y_pred, tf.float32)  # Convert y_pred to float32
    y_true = tf.squeeze(y_true, axis=-1)  # Remove the last dimension from y_true if it exists
    y_true = tf.cast(y_true, tf.float32)  # Convert y_true to float32
    intersection = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred), tf.float32))
    union = tf.reduce_sum(tf.cast(tf.greater(y_true + y_pred, 0), tf.float32))
    smooth = tf.constant(1e-7)
    return (intersection + smooth) / (union + smooth)

def dice_coefficient(y_true, y_pred):
    y_pred = tf.argmax(y_pred, axis=-1)  # Obtén la clase predicha
    y_pred = tf.cast(y_pred, tf.float32)  # Convierte a float32
    y_true = tf.squeeze(y_true, axis=-1)  # Quita la última dimensión de y_true si existe
    y_true = tf.cast(y_true, tf.float32)  # Convierte a float32

    intersection = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred), tf.float32))
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred)

    smooth = tf.constant(1e-7)  # Factor para evitar división por cero
    dice = (2.0 * intersection + smooth) / (union + smooth)
    return dice


model.compile(
    optimizer=keras.optimizers.Adam(1e-4), loss="sparse_categorical_crossentropy", metrics=[mean_iou, dice_coefficient]
)

callbacks = [
    keras.callbacks.ModelCheckpoint("oxford_segmentation.keras", save_best_only=True)
]

# Train the model, doing validation at the end of each epoch.
epochs = 50
mdl_hist = model.fit(
    train_dataset,
    epochs=epochs,
    validation_data=valid_dataset,
    callbacks=callbacks,
)

## Visualització de l'entrenament

In [None]:
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(mdl_hist.history['loss'], label='Loss - Entrenamiento')
plt.plot(mdl_hist.history['val_loss'], label='Loss - Validación')
plt.xlabel('Épocas')
plt.ylabel('Pérdida')
plt.title('Pérdida durante el Entrenamiento')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(mdl_hist.history['mean_iou'], label='IOU - Entrenamiento')
plt.plot(mdl_hist.history['val_mean_iou'], label='IOU - Validación')
plt.xlabel('Épocas')
plt.ylabel('IOU')
plt.title('Índice de Intersección sobre Unión durante el Entrenamiento')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(mdl_hist.history['dice_coefficient'], label='dice - Entrenamiento')
plt.plot(mdl_hist.history['val_dice_coefficient'], label='dice - Validación')
plt.xlabel('Épocas')
plt.ylabel('Dice')
plt.title('Dice coeffient durante el Entrenamiento')
plt.legend()

plt.tight_layout()
plt.show()

## Visualització de les mascaras

Ara toca visualitzar que tan bé ho ha fet

In [None]:
val_dataset = get_dataset(
    batch_size, img_size, val_input_img_paths, val_target_img_paths
)
val_preds = model.predict(val_dataset)


def display_mask(i):
    """Quick utility to display a model's prediction."""
    mask = np.argmax(val_preds[i], axis=-1)
    mask = np.expand_dims(mask, axis=-1)
    img = ImageOps.autocontrast(keras.utils.array_to_img(mask))
    display(img)


# Display results for validation image #10
i = 10

# Display input image
display(Image(filename=val_input_img_paths[i]))

# Display ground-truth target mask
img = ImageOps.autocontrast(load_img(val_target_img_paths[i]))
display(img)

# Display mask predicted by our model
display_mask(i)  # Note that the model only sees inputs at 150x150.
