In [None]:
!pip install --upgrade keras


Collecting keras
  Downloading keras-3.5.0-py3-none-any.whl.metadata (5.8 kB)
Collecting namex (from keras)
  Downloading namex-0.0.8-py3-none-any.whl.metadata (246 bytes)
Collecting optree (from keras)
  Downloading optree-0.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (47 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.8/47.8 kB[0m [31m777.6 kB/s[0m eta [36m0:00:00[0m
Downloading keras-3.5.0-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading namex-0.0.8-py3-none-any.whl (5.8 kB)
Downloading optree-0.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (347 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m347.7/347.7 kB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: namex, optree, keras
  Attempting uninstall: keras
    Found existing installation: keras 2.15.0


In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from keras import ops
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
import numpy as np

AUTOTUNE = tf.data.AUTOTUNE

NUM_CLASSES = 4
INPUT_HEIGHT = 224
INPUT_WIDTH = 224
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4
EPOCHS = 50
BATCH_SIZE = 32
MIXED_PRECISION = True
SHUFFLE = True

# Mixed-precision setting
if MIXED_PRECISION:
  policy = keras.mixed_precision.Policy("mixed_float16")
  keras.mixed_precision.set_global_policy(policy)

In [None]:
(train_ds, valid_ds, test_ds) = tfds.load(
  "oxford_iiit_pet",
  split=["train[:85%]", "train[85%:]", "test"],
  batch_size=BATCH_SIZE,
  shuffle_files=SHUFFLE,
)

In [None]:
# Image and Mask Pre-processing
# train_ds por ejemplo es un conjunto de objetos "section", cada section es un diccionario donde está tanto la image como la máscara
def unpack_resize_data(section):
  image = section["image"]
  segmentation_mask = section["segmentation_mask"]

  resize_layer = keras.layers.Resizing(INPUT_HEIGHT, INPUT_WIDTH)

  image = resize_layer(image)
  segmentation_mask = resize_layer(segmentation_mask)

  return image, segmentation_mask

# todas las imágenes y máscaras de los 3 conjuntos tendrán las mismas dimensiones (se hace zoom in/zoom out para redimensionar)
train_ds = train_ds.map(unpack_resize_data, num_parallel_calls=AUTOTUNE)
valid_ds = valid_ds.map(unpack_resize_data, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.map(unpack_resize_data, num_parallel_calls=AUTOTUNE)

In [None]:
images, masks = next(iter(test_ds)) # se coge un único batch
random_idx = keras.random.uniform([], minval=0, maxval=BATCH_SIZE, seed=10)

test_image = images[int(random_idx)].numpy().astype("float")
test_mask = masks[int(random_idx)].numpy().astype("float")

# Overlay segmentation mask on top of image.
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))

ax[0].set_title("Image")
ax[0].imshow(test_image / 255.0)

ax[1].set_title("Image with segmentation mask overlay")
ax[1].imshow(test_image / 255.0)
ax[1].imshow(
  test_mask,
  cmap="inferno",
  alpha=0.6,
)
plt.show()

In [None]:
# dada una imagen y su máscara, se preprocesa normalizando los valores de los píxeles para centrarlos en un rango
# lo importante es que sea el rango que sea, todas las imágenes tengan el mismo
# "it will convert the input images from RGB to BGR, then will zero-center each color channel with respect to the ImageNet dataset, without scaling"
def preprocess_data(image, segmentation_mask):
  image = keras.applications.vgg19.preprocess_input(image)

  return image, segmentation_mask

train_ds = (
  train_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
  .shuffle(buffer_size=1024)
  .prefetch(buffer_size=1024)
)
valid_ds = (
  valid_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
  .shuffle(buffer_size=1024)
  .prefetch(buffer_size=1024)
)
test_ds = (
  test_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
  .shuffle(buffer_size=1024)
  .prefetch(buffer_size=1024)
)

In [None]:
# como backbone se usa VGG19 (se usan los outputs de ciertas capas, osea los features maps)
# y luego se le añaden más capas convolucionales propias
# capa de entrada de la FCN
input_layer = keras.Input(shape=(INPUT_HEIGHT, INPUT_WIDTH, 3))

# VGG Model backbone with pre-trained ImageNet weights.
# las capas densas que trae include_top serán convertidas en capas convolucionales
# una cosa es los filtros de VGG19 que te traes (genéricos) y otra cosa es la importancia de los mismos (los pesos entrenado en este caso con imagenet)
vgg_model = keras.applications.vgg19.VGG19(include_top=True, weights="imagenet")

# a partir de VGG19 construyo un modelo igual pero más pequeño que reusa el input y algnos outputs
fcn_backbone = keras.models.Model(
  inputs=vgg_model.layers[1].input,
  outputs=[
    vgg_model.get_layer(block_name).output
    for block_name in ["block3_pool", "block4_pool", "block5_pool"]
  ],
)

fcn_backbone.trainable = False # congela backbone
# paso mi input layer a partir del backbone, lo que produce 3 outputs (uno por cada "ouput" del backbone) que recojo en "x"
x = fcn_backbone(input_layer)

# el problema de las capas densas es que aplanan la imagen, pero mi capa convolucional no debería hacer eso
# una capa densa es como una capa convolucional con un número de filtros igual al número de píxeles de la imagen
# las capas convolucionales que creo van a "imitar" el comportamiento de las capas densas de vgg19 que había después de block3_pool", "block4_pool", "block5_pool
# vgg funciona bien extrayendo características, eso es reutilizable. pero a la hora de combinar eso en vez de hacerlo con capas densas lo hago con convolucionales
# para procesar la info localmente (kernel size). las densas og tenían 4096 uds, osea combis de features extraídas a las que se asignan pesos. incluso con convs
# yo puedo generar el mismo número, usando tal num de filters (dado que cada capa aplica los filtros al feature map de la capa anterior, luego esos filtros
# buscan "patrones dentro de patrones" osea estás generando las mismas combinaciones que lo que hacían las densas que demostraron funcionar bien)
units = [4096, 4096]
dense_convs = []

for filter_idx in range(len(units)): # 0,1
  dense_conv = keras.layers.Conv2D(
    filters=units[filter_idx], # 4096, 4096 <-- cada filtro busca un patrón, y genera un feature map distinto
    kernel_size=(7, 7) if filter_idx == 0 else (1, 1), # (7,7), (1,1)
    strides=(1, 1),
    activation="relu",
    padding="same",
    use_bias=False,
    kernel_initializer=keras.initializers.Constant(1.0),
  )
  dense_convs.append(dense_conv)
  dropout_layer = keras.layers.Dropout(0.5)
  dense_convs.append(dropout_layer)

dense_convs = keras.Sequential(dense_convs) # modelo secuencial construido a partir de la lista de capas convolucionales generada en el bucle anterior
dense_convs.trainable = False # congela modelo secuencial

x[-1] = dense_convs(x[-1]) # al final del modelo fcn que antes era solo el backbone, métele el resultado de
# pasar ese output a través de las capas que acabo de generar

pool3_output, pool4_output, pool5_output = x # unwrapea los 3 outputs del backbone

In [None]:
# esta capa conv sirve para dejar la imagen del tamaño que es (kernel de 1) pero tiene tantos filtros como número de clases
# porque al final la salida es "cuánto se activa cada filtro de clase con esta imagen de entrada"
pool5 = keras.layers.Conv2D(
  filters=NUM_CLASSES,
  kernel_size=(1, 1),
  padding="same",
  strides=(1, 1),
  activation="relu",
)

# esta es la capa final que realmente predice las probabilidades de clase, y trabaja sobre la anterior
fcn32s_conv_layer = keras.layers.Conv2D(
  filters=NUM_CLASSES,
  kernel_size=(1, 1),
  activation="softmax",
  padding="same",
  strides=(1, 1),
)

fcn32s_upsampling = keras.layers.UpSampling2D(
  size=(32, 32), # si ves el summary puedes ver cuánto se ha reducido en comparación con la imagen original y por tanto cuánto debes upsamplear
  data_format=keras.backend.image_data_format(),
  interpolation="bilinear",
)

final_fcn32s_pool = pool5(pool5_output) # ajustar num canales (salida final del modelo)
final_fcn32s_output = fcn32s_conv_layer(final_fcn32s_pool) # (salida def del modelo)
final_fcn32s_output = fcn32s_upsampling(final_fcn32s_output)

fcn32s_model = keras.Model(inputs=input_layer, outputs=final_fcn32s_output)

In [None]:
# a parte de mi modelo ya usable "fcn32s_model" voy a construir uno que combine la salida de ese modelo con la salida de la penúltima capa
# 1x1 convolution to set channels = number of classes
# aquí vuelvo a ajustar el número de canales que tiene la salida en este caso de la penúltima capa
pool4 = keras.layers.Conv2D(
  filters=NUM_CLASSES,
  kernel_size=(1, 1),
  padding="same",
  strides=(1, 1),
  activation="linear",
  kernel_initializer=keras.initializers.Zeros(),
)(pool4_output)

# como la salida de final_fcn32s_pool es de 1x1 y la quiero combinar con la salida de pool4 que es 2x2, upsampleo
pool5 = keras.layers.UpSampling2D(
  size=(2, 2),
  data_format=keras.backend.image_data_format(),
  interpolation="bilinear",
)(final_fcn32s_pool)

# capa final de ESTE modelo (predictor)
fcn16s_conv_layer = keras.layers.Conv2D(
  filters=NUM_CLASSES,
  kernel_size=(1, 1),
  activation="softmax",
  padding="same",
  strides=(1, 1),
)

fcn16s_upsample_layer = keras.layers.UpSampling2D(
  size=(16, 16),
  data_format=keras.backend.image_data_format(),
  interpolation="bilinear",
)

# combina el output de pool4 con el output (upsampleado) de pool5
final_fcn16s_pool = keras.layers.Add()([pool4, pool5])
# pásalo por el predictor
final_fcn16s_output = fcn16s_conv_layer(final_fcn16s_pool)
final_fcn16s_output = fcn16s_upsample_layer(final_fcn16s_output)
# esto es OTRO modelo
fcn16s_model = keras.models.Model(inputs=input_layer, outputs=final_fcn16s_output)


In [None]:
# 3er y último modelo, se combina el output del modelo fcn16 con el output de pool3
# para ello vuelvo a ajustar la profundida del output de la capa 3
pool3 = keras.layers.Conv2D(
  filters=NUM_CLASSES,
  kernel_size=(1, 1),
  padding="same",
  strides=(1, 1),
  activation="linear",
  kernel_initializer=keras.initializers.Zeros(),
)(pool3_output)

# upsampleo la salida del modelo anterior para poder combinarlo
intermediate_pool_output = keras.layers.UpSampling2D(
  size=(2, 2),
  data_format=keras.backend.image_data_format(),
  interpolation="bilinear",
)(final_fcn16s_pool)

# última capa de ESTE modelo (predictor)
fcn8s_conv_layer = keras.layers.Conv2D(
  filters=NUM_CLASSES,
  kernel_size=(1, 1),
  activation="softmax",
  padding="same",
  strides=(1, 1),
)

fcn8s_upsample_layer = keras.layers.UpSampling2D(
  size=(8, 8),
  data_format=keras.backend.image_data_format(),
  interpolation="bilinear",
)

# ahora combino esas 2
final_fcn8s_pool = keras.layers.Add()([pool3, intermediate_pool_output])
final_fcn8s_output = fcn8s_conv_layer(final_fcn8s_pool)
final_fcn8s_output = fcn8s_upsample_layer(final_fcn8s_output)

fcn8s_model = keras.models.Model(inputs=input_layer, outputs=final_fcn8s_output)

In [None]:
# esto es para reutilizar los pesos de las últimas capas densas de vgg, para construir una versión de estas capas como capas convolucionales
weights1 = vgg_model.get_layer("fc1").get_weights()[0]
weights2 = vgg_model.get_layer("fc2").get_weights()[0]

# las 2 últimas capas (convolucionales) de mi modelo tienen esta forma, osea son como las densas de vgg y por eso funciona bien que se le metan esos pesos
weights1 = weights1.reshape(7, 7, 512, 4096)
weights2 = weights2.reshape(1, 1, 4096, 4096)

dense_convs.layers[0].set_weights([weights1])
dense_convs.layers[2].set_weights([weights2])

In [None]:
fcn16s_optimizer = keras.optimizers.AdamW(
  learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY
)

fcn16s_loss = keras.losses.SparseCategoricalCrossentropy()

fcn16s_model.compile(
  optimizer=fcn16s_optimizer,
  loss=fcn16s_loss,
  metrics=[
    keras.metrics.MeanIoU(num_classes=NUM_CLASSES, sparse_y_pred=False),
    keras.metrics.SparseCategoricalAccuracy(),
  ],
)

fcn16s_history = fcn16s_model.fit(train_ds, epochs=EPOCHS, validation_data=valid_ds)

In [None]:
images, masks = next(iter(test_ds))
random_idx = keras.random.uniform([], minval=0, maxval=BATCH_SIZE,seed=10)

# Get random test image and mask
test_image = images[int(random_idx)].numpy().astype("float")
test_mask = masks[int(random_idx)].numpy().astype("float")

pred_image = ops.expand_dims(test_image, axis=0)
pred_image = keras.applications.vgg19.preprocess_input(pred_image)

# Perform inference on FCN-16S
pred_mask_16s = fcn16s_model.predict(pred_image, verbose=0).astype("float")
pred_mask_16s = np.argmax(pred_mask_16s, axis=-1)
pred_mask_16s = pred_mask_16s[0, ...]

# Plot all results
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 8))

fig.delaxes(ax[0, 2])

ax[0, 0].set_title("Image")
ax[0, 0].imshow(test_image / 255.0)

ax[0, 1].set_title("Image with ground truth overlay")
ax[0, 1].imshow(test_image / 255.0)
ax[0, 1].imshow(
    test_mask,
    cmap="inferno",
    alpha=0.6,
)

ax[1, 1].set_title("Image with FCN-16S mask overlay")
ax[1, 1].imshow(test_image / 255.0)
ax[1, 1].imshow(pred_mask_16s, cmap="inferno", alpha=0.6)

plt.show()