<a href="https://colab.research.google.com/github/fabiobento/dnn-course-2024-1/blob/main/00_course_folder/adv_cv/class_2/21%20-%20%20Laborat%C3%B3rio/C3_W2_Lab_3_Interactive_Eager_Few_Shot_OD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

*Esta é uma cópia deste [tutorial oficial](https://colab.research.google.com/github/tensorflow/models/blob/master/research/object_detection/colab_tutorials/eager_few_shot_od_training_tf2_colab.ipynb) com revisões mínimas na seção `Imports` para fazê-lo funcionar com versões mais recentes do Tensorflow*

# Colab _Eager Few Shot Object Detection_ (Detecção de objetos com poucos disparos)

Neste colab, demonstramos o ajuste fino de uma arquitetura RetinaNet (compatível com TF2) em pouquíssimos exemplos de uma nova classe após a inicialização de um ponto de verificação COCO pré-treinado.

O treinamento é executado no modo eager.

Tempo estimado para executar este laboratório (com GPU): < 5 minutos.

## Importações

In [None]:
import os
import pathlib

# Clonar o repositório de modelos do Tensorflow se ele ainda não existir
if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
# Para compatibilidade. Fixe a versão oficial do tf-models para que ele use o Tensorflow 2.15.
!sed -i 's/tf-models-official>=2.5.1/tf-models-official==2.15.0/g' ./models/research/object_detection/packages/tf2/setup.py

# Instalar a API de detecção de objetos

In [None]:
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install .

_Nota: No Google Colab, você precisa reiniciar a runtime para finalizar a instalação dos pacotes. Você pode fazer isso selecionando Runtime > Restart Runtime na barra de menus. **Não prossiga para a próxima seção sem reiniciar.**_

In [None]:
import matplotlib
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage

import tensorflow as tf

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder


try:
  import google.colab
  # Testar se estamos no Google Colab
  IN_COLAB = True
  from object_detection.utils import colab_utils
except:
  IN_COLAB = False
  
%matplotlib inline

# Utilities

In [None]:
def load_image_into_numpy_array(path):
  """Carrega uma imagem de um arquivo em uma matriz numpy.

  Coloca a imagem em uma matriz numpy para alimentar o gráfico do tensorflow.
  Observe que, por convenção, nós a colocamos em uma matriz numpy com a forma
  (altura, largura, canais), onde canais=3 para RGB.

  Args:
    path: um caminho de arquivo.

  Retorna:
    matriz numpy uint8 com formato (img_height, img_width, 3)
  """
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  image = Image.open(BytesIO(img_data))
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)

def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    figsize=(12, 16),
                    image_name=None):
  """Função de wrapper para visualizar as detecções.

  Args:
    image_np: matriz numérica uint8 com formato (img_height, img_width, 3)
    boxes: uma matriz numérica de forma [N, 4]
    classes: uma matriz numérica de formato [N]. Observe que os índices de classe são baseados em 1,
      e correspondem às chaves no mapa de rótulos.
    scores: uma matriz numpy de forma [N] ou None.  Se scores=None, então
      essa função pressupõe que as caixas a serem plotadas são caixas de verdade
      e plotará todas as caixas como pretas, sem classes ou pontuações.
    category_index: um dict contendo dicionários de categorias (cada um contendo
      índice de categoria `id` e nome de categoria `name`), codificados por índices de categoria.
    figsize: tamanho da figura.
    image_name: um nome para o arquivo de imagem.
  """
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes,
      classes,
      scores,
      category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.8)
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)


# Dados do "_Rubber Ducky_"

Começaremos com alguns dados que consistem em 5 imagens de um pato de borracha.

Observe que o conjunto de dados [coco](https://cocodataset.org/#explore) contém vários animais, mas, notavelmente, ele *não* contém patos de borracha (ou mesmo patos de verdade), portanto, essa é uma classe nova.

In [None]:
# Carregar imagens e visualizar
train_image_dir = 'models/research/object_detection/test_images/ducky/train/'
train_images_np = []
for i in range(1, 6):
  image_path = os.path.join(train_image_dir, 'robertducky' + str(i) + '.jpg')
  train_images_np.append(load_image_into_numpy_array(image_path))

plt.rcParams['axes.grid'] = False
plt.rcParams['xtick.labelsize'] = False
plt.rcParams['ytick.labelsize'] = False
plt.rcParams['xtick.top'] = False
plt.rcParams['xtick.bottom'] = False
plt.rcParams['ytick.left'] = False
plt.rcParams['ytick.right'] = False
plt.rcParams['figure.figsize'] = [14, 7]

for idx, train_image_np in enumerate(train_images_np):
  plt.subplot(2, 3, idx+1)
  plt.imshow(train_image_np)
plt.show()

# Anotar imagens com caixas delimitadoras

Nesta célula, você anotará os patinhos de borracha - desenhe uma caixa ao redor do patinho de borracha em cada imagem; clique em "próxima imagem" para ir para a próxima imagem e em "enviar" quando não houver mais imagens.

Se você quiser pular a etapa de anotação manual, nós entendemos perfeitamente.  Nesse caso, basta ignorar essa célula e executar a próxima célula, na qual preenchemos previamente o groundtruth com caixas delimitadoras pré-anotadas.



In [None]:
gt_boxes = []
colab_utils.annotate(train_images_np, box_storage_pointer=gt_boxes)

# Caso você não queira rotular...

Execute essa célula somente se você não tiver anotado nada acima e
preferir usar apenas nossas caixas pré-anotadas.  Não se esqueça de descomentar.

In [None]:
gt_boxes = [
            np.array([[0.436, 0.591, 0.629, 0.712]], dtype=np.float32),
            np.array([[0.539, 0.583, 0.73, 0.71]], dtype=np.float32),
            np.array([[0.464, 0.414, 0.626, 0.548]], dtype=np.float32),
            np.array([[0.313, 0.308, 0.648, 0.526]], dtype=np.float32),
            np.array([[0.256, 0.444, 0.484, 0.629]], dtype=np.float32)
]

# Preparar dados para treinamento

Abaixo, adicionamos as anotações de classe (para simplificar, considerei uma única classe neste colab, embora deva ser fácil estender isso para lidar com várias classes).

Também foi tudo convertido para o formato que o loop de treinamento abaixo espera (por exemplo, tudo convertido em tensores, classes convertidas em representações de um único ponto etc.).

In [None]:

# Por convenção, nossas classes sem histórico começam a contar em 1.
# Já que estaremos prevendo apenas uma classe, atribuiremos a ela uma
# classe de 1.
duck_class_id = 1
num_classes = 1

category_index = {duck_class_id: {'id': duck_class_id, 'name': 'rubber_ducky'}}

# Converta rótulos de classe em um único disparo; converta tudo em tensores.
# O `label_id_offset` aqui desloca todas as classes em um determinado número de índices;
# Fazemos isso aqui para que o modelo receba rótulos de um único instante em que as classes que não são de fundo
# classes começam a contar no índice zero.  Normalmente, isso é tratado
# automaticamente em nossos binários de treinamento, mas precisamos reproduzi-lo aqui.
label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []
for (train_image_np, gt_box_np) in zip(
    train_images_np, gt_boxes):
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(
      train_image_np, dtype=tf.float32), axis=0))
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))
  zero_indexed_groundtruth_classes = tf.convert_to_tensor(
      np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) - label_id_offset)
  gt_classes_one_hot_tensors.append(tf.one_hot(
      zero_indexed_groundtruth_classes, num_classes))
print('Dados de preparação concluídos.')


# Vamos apenas visualizar os patos de borracha


In [None]:
dummy_scores = np.array([1.0], dtype=np.float32)   # Dar às caixas uma pontuação de 100%

plt.figure(figsize=(30, 15))
for idx in range(5):
  plt.subplot(2, 3, idx+1)
  plot_detections(
      train_images_np[idx],
      gt_boxes[idx],
      np.ones(shape=[gt_boxes[idx].shape[0]], dtype=np.int32),
      dummy_scores, category_index)
plt.show()

# Criar modelo e restaurar pesos para todas as camadas, exceto a última

Nesta célula, criamos uma arquitetura de detecção de estágio único (RetinaNet) e restauramos tudo, exceto a camada de classificação no topo (que será automaticamente inicializada de forma aleatória).

Para simplificar, codificamos várias coisas nesta célula para a arquitetura RetinaNet específica que temos em mãos (incluindo a suposição de que o tamanho da imagem será sempre 640x640), mas não é difícil generalizar para outras configurações de modelo.

In [None]:
# Faça o download do ponto de verificação e coloque-o em models/research/object_detection/test_data/

!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

In [None]:
tf.keras.backend.clear_session()

print('Criação de modelo e restauração de pesos para ajuste fino...', flush=True)
num_classes = 1
pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# Carregue a configuração do pipeline e crie um modelo de detecção.
#
# Como estamos trabalhando com uma arquitetura COCO que prevê 90
# classes por padrão, substituímos o campo `num_classes` aqui para que seja apenas
# apenas um (para a nossa nova classe de pato de borracha).
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)

# Configure a restauração de pontos de verificação baseada em objetos --- O RetinaNet tem duas previsões
# uma para classificação e a outra para regressão de caixa.  Vamos
# restaurar a cabeça de regressão de caixa, mas inicializar a cabeça de classificação
# do zero (mostramos a omissão abaixo comentando a linha que
# adicionaríamos se quiséssemos restaurar os dois cabeçotes)
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    # _prediction_heads=detection_model._box_predictor._prediction_heads,
    #    (i.e., the classification head that we *will not* restore)
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )
fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()

# Execute o modelo por meio de uma imagem fictícia para que as variáveis sejam criadas
image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('Weights restored!')

# Loop de treinamento personalizado do modo Eager

In [None]:
tf.keras.backend.set_learning_phase(True)

# Esses parâmetros podem ser ajustados; como nosso conjunto de treinamento tem 5 imagens
# não faz sentido ter um tamanho de lote muito maior, embora pudéssemos
# caber mais exemplos na memória, se quisermos.
batch_size = 4
learning_rate = 0.01
num_batches = 100

# Selecione as variáveis nas camadas superiores para fazer o ajuste fino.
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

# Configure a propagação forward + backward para uma única etapa de treino.
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
  """Get a tf.function for training step."""

  # Use o tf.function para ter um pouco mais de velocidade.
  # Comente o decorador tf.function se você quiser que o interior da função
  # da função seja executado com ansiedade.

  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    """Uma única iteração de treinamento.

    Args:
      image_tensors: Uma lista de [1, height, width, 3] Tensor do tipo tf.float32.
        Observe que a altura e a largura podem variar entre as imagens, pois elas são
        remodeladas dentro dessa função para serem 640x640.
      groundtruth_boxes_list: Uma lista de tensores de forma [N_i, 4] com o tipo
        tf.float32 representando caixas de verdade para cada imagem no lote.
      groundtruth_classes_list: Uma lista de tensores de forma [N_i, num_classes]
        com o tipo tf.float32 representando caixas de verdade para cada imagem no
        do lote.

    Retorna:
      Um tensor escalar que representa a perda total para o lote de entrada.
    """
    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0)
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return total_loss

  return train_step_fn

optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

print('Começar o fine-tuning!', flush=True)
for idx in range(num_batches):
  # Obter chaves para um subconjunto aleatório de exemplos
  all_keys = list(range(len(train_images_np)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  # Observe que não aumentamos os dados nesta demonstração.  Se você quiser um
  # um exercício divertido, recomendamos fazer experiências com inversão horizontal aleatória
  # e cortes aleatórios :)
  gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
  gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
  image_tensors = [train_image_tensors[key] for key in example_keys]

  # Etapa de treinamento (passe para frente + passe para trás)
  total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

  if idx % 10 == 0:
    print('batch ' + str(idx) + ' of ' + str(num_batches)
    + ', loss=' +  str(total_loss.numpy()), flush=True)

print('Fine-tuning concluído!')

# Carregue as imagens de teste e execute a inferência com o novo modelo!

In [None]:
test_image_dir = 'models/research/object_detection/test_images/ducky/test/'
test_images_np = []
for i in range(1, 50):
  image_path = os.path.join(test_image_dir, 'out' + str(i) + '.jpg')
  test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(image_path), axis=0))

# Descomente esse decorador se quiser executar a inferência eager
@tf.function
def detect(input_tensor):
  """Executa a detecção em uma imagem de entrada.

  Args:
    input_tensor: Um [1, altura, largura, 3] Tensor do tipo tf.float32.
      Observe que a altura e a largura podem ser qualquer coisa, pois a imagem será
      imediatamente redimensionada de acordo com as necessidades do modelo dentro dessa
      função.

  Retorna:
    Um dict contendo 3 Tensores (`detection_boxes`, `detection_classes`,
      e `detection_scores`).
  """
  preprocessed_image, shapes = detection_model.preprocess(input_tensor)
  prediction_dict = detection_model.predict(preprocessed_image, shapes)
  return detection_model.postprocess(prediction_dict, shapes)

# Observe que o primeiro quadro acionará o rastreamento da função tf.function, o que levará algum tempo.
# levará algum tempo, após o qual a inferência deverá ser rápida.

label_id_offset = 1
for i in range(len(test_images_np)):
  input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=tf.float32)
  detections = detect(input_tensor)

  plot_detections(
      test_images_np[i][0],
      detections['detection_boxes'][0].numpy(),
      detections['detection_classes'][0].numpy().astype(np.uint32)
      + label_id_offset,
      detections['detection_scores'][0].numpy(),
      category_index, figsize=(15, 20), image_name="gif_frame_" + ('%02d' % i) + ".jpg")

In [None]:
imageio.plugins.freeimage.download()

anim_file = 'duckies_test.gif'

filenames = glob.glob('gif_frame_*.jpg')
filenames = sorted(filenames)
last = -1
images = []
for filename in filenames:
  image = imageio.imread(filename)
  images.append(image)

imageio.mimsave(anim_file, images, 'GIF-FI', fps=5)

display(IPyImage(open(anim_file, 'rb').read()))