In [None]:
from google.colab import drive

drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
#Modicar la carpeta donde están almacenadas las imágenes, anotaciones y base_checkpoint.pt
%cd "/content/drive/MyDrive/TA_FINAL_TAC/"

/content/drive/.shortcut-targets-by-id/1-O_N2YOeF3elR-Y6QYwBa1mK2MnxvzHb/TA_FINAL_TAC


## Imports

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
from torchvision.transforms import functional as F_vision

import torchvision.transforms as T
from torchvision.models import resnet50
from torchvision.ops.boxes import box_area

torch.random.manual_seed(0)

<torch._C.Generator at 0x7f139d777a10>

In [None]:
import math
import json
import copy
import shutil
import random
import requests
from PIL import Image
from os import listdir
from os.path import exists
import matplotlib.pyplot as plt
from scipy.optimize import linear_sum_assignment
from sklearn.model_selection import train_test_split

%matplotlib inline
%config InlineBackend.figure_format = "retina"

## Model Architecture

In [None]:
class Transformer(nn.Module):
    def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False,
                 return_intermediate_dec=False):
      super().__init__()

      encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
                                              dropout, activation, normalize_before)
      encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
      self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

      decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
                                              dropout, activation, normalize_before)
      decoder_norm = nn.LayerNorm(d_model)
      self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
                                        return_intermediate=return_intermediate_dec)

      self._reset_parameters()

      self.d_model = d_model
      self.nhead = nhead

    def _reset_parameters(self):
      for p in self.parameters():
        if p.dim() > 1:
          nn.init.xavier_uniform_(p)

    def forward(self, src, mask, query_embed, pos_embed):
      # flatten NxCxHxW a HWxNxC
      bs, c, h, w = src.shape
      src = src.flatten(2).permute(2, 0, 1)
      pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
      query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
      mask = mask.flatten(1)

      tgt = torch.zeros_like(query_embed)
      memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
      hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
                        pos=pos_embed, query_pos=query_embed)
      return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)


class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers, norm=None):
      super().__init__()
      self.layers = _get_clones(encoder_layer, num_layers)
      self.num_layers = num_layers
      self.norm = norm

    def forward(self, src,
                mask = None,
                src_key_padding_mask = None,
                pos = None):
      output = src

      for layer in self.layers:
        output = layer(output, src_mask=mask,
                        src_key_padding_mask=src_key_padding_mask, pos=pos)

      if self.norm is not None:
        output = self.norm(output)

      return output


class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False):
      super().__init__()
      self.layers = _get_clones(decoder_layer, num_layers)
      self.num_layers = num_layers
      self.norm = norm
      self.return_intermediate = return_intermediate

    def forward(self, tgt, memory,
                tgt_mask = None,
                memory_mask = None,
                tgt_key_padding_mask = None,
                memory_key_padding_mask = None,
                pos = None,
                query_pos = None):
      output = tgt

      intermediate = []

      for layer in self.layers:
        output = layer(output, memory, tgt_mask=tgt_mask,
                        memory_mask=memory_mask,
                        tgt_key_padding_mask=tgt_key_padding_mask,
                        memory_key_padding_mask=memory_key_padding_mask,
                        pos=pos, query_pos=query_pos)
        if self.return_intermediate:
          intermediate.append(self.norm(output))

      if self.norm is not None:
        output = self.norm(output)
        if self.return_intermediate:
          intermediate.pop()
          intermediate.append(output)

      if self.return_intermediate:
        return torch.stack(intermediate)

      return output.unsqueeze(0)


class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
      super().__init__()
      self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
      # Implementacion del modelo Feedforward
      self.linear1 = nn.Linear(d_model, dim_feedforward)
      self.dropout = nn.Dropout(dropout)
      self.linear2 = nn.Linear(dim_feedforward, d_model)

      self.norm1 = nn.LayerNorm(d_model)
      self.norm2 = nn.LayerNorm(d_model)
      self.dropout1 = nn.Dropout(dropout)
      self.dropout2 = nn.Dropout(dropout)

      self.activation = _get_activation_fn(activation)
      self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos):
      return tensor if pos is None else tensor + pos

    def forward_post(self,
                     src,
                     src_mask = None,
                     src_key_padding_mask = None,
                     pos = None):
      q = k = self.with_pos_embed(src, pos)
      src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
                            key_padding_mask=src_key_padding_mask)[0]
      src = src + self.dropout1(src2)
      src = self.norm1(src)
      src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
      src = src + self.dropout2(src2)
      src = self.norm2(src)
      return src

    def forward_pre(self, src,
                    src_mask = None,
                    src_key_padding_mask = None,
                    pos = None):
      src2 = self.norm1(src)
      q = k = self.with_pos_embed(src2, pos)
      src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask,
                            key_padding_mask=src_key_padding_mask)[0]
      src = src + self.dropout1(src2)
      src2 = self.norm2(src)
      src2 = self.linear2(self.dropout(self.activation(self.linear1(src2))))
      src = src + self.dropout2(src2)
      return src

    def forward(self, src,
                src_mask = None,
                src_key_padding_mask = None,
                pos = None):
      if self.normalize_before:
        return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
      return self.forward_post(src, src_mask, src_key_padding_mask, pos)


class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
      super().__init__()
      self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
      self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
      # Implementacion del modelo Feedforward
      self.linear1 = nn.Linear(d_model, dim_feedforward)
      self.dropout = nn.Dropout(dropout)
      self.linear2 = nn.Linear(dim_feedforward, d_model)

      self.norm1 = nn.LayerNorm(d_model)
      self.norm2 = nn.LayerNorm(d_model)
      self.norm3 = nn.LayerNorm(d_model)
      self.dropout1 = nn.Dropout(dropout)
      self.dropout2 = nn.Dropout(dropout)
      self.dropout3 = nn.Dropout(dropout)

      self.activation = _get_activation_fn(activation)
      self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos):
      return tensor if pos is None else tensor + pos

    def forward_post(self, tgt, memory,
                     tgt_mask = None,
                     memory_mask = None,
                     tgt_key_padding_mask = None,
                     memory_key_padding_mask = None,
                     pos = None,
                     query_pos = None):
      q = k = self.with_pos_embed(tgt, query_pos)
      tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,
                            key_padding_mask=tgt_key_padding_mask)[0]
      tgt = tgt + self.dropout1(tgt2)
      tgt = self.norm1(tgt)
      tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
                                  key=self.with_pos_embed(memory, pos),
                                  value=memory, attn_mask=memory_mask,
                                  key_padding_mask=memory_key_padding_mask)[0]
      tgt = tgt + self.dropout2(tgt2)
      tgt = self.norm2(tgt)
      tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
      tgt = tgt + self.dropout3(tgt2)
      tgt = self.norm3(tgt)
      return tgt

    def forward_pre(self, tgt, memory,
                    tgt_mask = None,
                    memory_mask = None,
                    tgt_key_padding_mask = None,
                    memory_key_padding_mask = None,
                    pos = None,
                    query_pos = None):
      tgt2 = self.norm1(tgt)
      q = k = self.with_pos_embed(tgt2, query_pos)
      tgt2 = self.self_attn(q, k, value=tgt2, attn_mask=tgt_mask,
                            key_padding_mask=tgt_key_padding_mask)[0]
      tgt = tgt + self.dropout1(tgt2)
      tgt2 = self.norm2(tgt)
      tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt2, query_pos),
                                  key=self.with_pos_embed(memory, pos),
                                  value=memory, attn_mask=memory_mask,
                                  key_padding_mask=memory_key_padding_mask)[0]
      tgt = tgt + self.dropout2(tgt2)
      tgt2 = self.norm3(tgt)
      tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt2))))
      tgt = tgt + self.dropout3(tgt2)
      return tgt

    def forward(self, tgt, memory,
                tgt_mask = None,
                memory_mask = None,
                tgt_key_padding_mask = None,
                memory_key_padding_mask = None,
                pos = None,
                query_pos = None):
      if self.normalize_before:
        return self.forward_pre(tgt, memory, tgt_mask, memory_mask,
                                tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)
      return self.forward_post(tgt, memory, tgt_mask, memory_mask,
                                tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)

def _get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

def _get_activation_fn(activation):
  """Retorno de una función de activación a partir de una cadena"""
  if activation == "relu":
    return F.relu
  if activation == "gelu":
    return F.gelu
  if activation == "glu":
    return F.glu
  raise RuntimeError(F"activation should be relu/gelu, not {activation}.")

In [None]:
class PositionEmbeddingSine(nn.Module):
  def __init__(self, num_pos_feats=64, temperature=10000, normalize=False, scale=None):
    super().__init__()

    self.num_pos_feats = num_pos_feats
    self.temperature = temperature
    self.normalize = normalize
    
    if (scale is not None) and (normalize is False):
      raise ValueError("normalize should be True if scale is passed")
    
    if scale is None:
      scale = 2 * math.pi
    
    self.scale = scale

  def forward(self, x, masks):
    assert masks is not None

    not_masks = ~masks
    
    y_embed = not_masks.cumsum(1, dtype=torch.float32)
    x_embed = not_masks.cumsum(2, dtype=torch.float32)
    
    if self.normalize:
      eps = 1e-6
      y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale
      x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale

    dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)
    dim_t = self.temperature ** (2 * torch.div(dim_t, 2, rounding_mode="trunc") / self.num_pos_feats)

    pos_x = x_embed[:, :, :, None] / dim_t
    pos_y = y_embed[:, :, :, None] / dim_t
    
    pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
    pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
    
    pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)
    
    return pos

In [None]:
class DETR(nn.Module):
  def __init__(self, num_classes, num_queries=100, hidden_dim=256, embedding_size=50, nheads=8, num_encoder_layers=6, num_decoder_layers=6):
    super().__init__()

    #Creación de ResNet-50 backbone
    self.backbone = resnet50(pretrained=True)

    #Borrada de las últimas capas de ResNet-50
    del self.backbone.fc
    del self.backbone.avgpool

    #Codificaciones posicionales espaciales, el embedding_size depende de la salida de la red
    #Por ejemplo:
    #  inputs: (batch_size, 3, 800, 800) -> backbone's output: (batch_size, 2048, 25, 25)
    #  inputs: (batch_size, 3, 1600, 1600) -> backbone's output: (batch_size, 2048, 50, 50)
    n_steps = hidden_dim // 2

    self.position_embedding = PositionEmbeddingSine(num_pos_feats=n_steps, normalize=True)

    #Cración de capa conv
    self.conv = nn.Conv2d(2048, hidden_dim, 1)

    #Codificaciones posicionales de salida
    self.query_pos = nn.Embedding(num_queries, hidden_dim)

    #Creación de transformer
    self.transformer = Transformer(hidden_dim, nheads, num_encoder_layers, num_decoder_layers, normalize_before=False, return_intermediate_dec=True)

    #Capa de Regresión
    self.linear_1 = nn.Linear(hidden_dim, hidden_dim)
    self.linear_2 = nn.Linear(hidden_dim, hidden_dim)
    self.linear_bbox = nn.Linear(hidden_dim, 4)
    
    #Capa de clasificación - Una clase extra para predecir los espacios vacíos
    self.linear_class = nn.Linear(hidden_dim, num_classes + 1)
  
  def forward(self, inputs_and_masks):
    inputs = inputs_and_masks[0]
    masks = inputs_and_masks[1]

    #Inputs de propagación (batch_size, 3, height, width) para ResNet-50
    x = self.backbone.conv1(inputs)
    x = self.backbone.bn1(x)
    x = self.backbone.relu(x)
    x = self.backbone.maxpool(x)

    x = self.backbone.layer1(x)
    x = self.backbone.layer2(x)
    x = self.backbone.layer3(x)
    x = self.backbone.layer4(x)

    #Adaptación de las formas de la máscara
    new_masks_height_and_width = x.shape[2:]

    masks = F.interpolate(masks[None].float(), size=new_masks_height_and_width).to(torch.bool)[0]

    #Se obtienen codificaciones posicionales espaciales
    pos = self.position_embedding(x, masks)
    
    #Se convierte de 2048 a 256 planos característicos para el transformer
    x = self.conv(x)

    #Se propaga a través del transformer
    x = self.transformer(x, masks, self.query_pos.weight, pos)[0]

    #La salida permite obtener la clasificación y los bounding boxes
    pred_logits = self.linear_class(x)
    pred_logits = pred_logits[-1]

    x = self.linear_1(x)
    x = F.relu(x)

    x = self.linear_2(x)
    x = F.relu(x)

    pred_bboxes = self.linear_bbox(x)
    pred_bboxes = pred_bboxes.sigmoid()
    pred_bboxes = pred_bboxes[-1]

    return {"pred_logits": pred_logits, "pred_bboxes": pred_bboxes}

## Model Creation

In [None]:
def create_model(num_classes):
  torch.cuda.empty_cache()

  model = DETR(num_classes)

  return model

In [None]:
device_condition = "cuda" if torch.cuda.is_available() else "cpu"

device = torch.device(device_condition)

device

device(type='cuda')

In [None]:
#Modificar el número de clases dependiendo del dataset
num_classes = 6

model = create_model(num_classes=num_classes)

model = model.to(device)

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "
Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

## Functions for Load Model without the Classification Layer

In [None]:
def load_pre_trained_model_weights(model, model_file):
  model.load_state_dict(torch.load(model_file), strict=False)

  return model

In [None]:
model_file = "base_checkpoint.pt"

model = load_pre_trained_model_weights(model, model_file)

In [None]:
for parameter in model.backbone.parameters():
  parameter.requires_grad = False

## Functions for Training and Evaluation

In [None]:
def save_weights(model, optimizer, lr_scheduler, model_file, optimizer_file, lr_scheduler_file):
  torch.save(model.state_dict(), model_file)
  torch.save(optimizer.state_dict(), optimizer_file)
  torch.save(lr_scheduler.state_dict(), lr_scheduler_file)

def reset_weights(model, optimizer, lr_scheduler, model_file, optimizer_file, lr_scheduler_file):
  model.load_state_dict(torch.load(model_file))

  optimizer.load_state_dict(torch.load(optimizer_file))

  lr_scheduler.load_state_dict(torch.load(lr_scheduler_file))

  return model, optimizer, lr_scheduler

In [None]:
def crop(image, target, region):
    cropped_image = F_vision.crop(image, *region)

    target = target.copy()
    i, j, h, w = region

    fields = ["labels"]

    if "boxes" in target:
        boxes = target["boxes"]
        max_size = torch.as_tensor([w, h], dtype=torch.float32)
        cropped_boxes = boxes - torch.as_tensor([j, i, j, i])
        cropped_boxes = torch.min(cropped_boxes.reshape(-1, 2, 2), max_size)
        cropped_boxes = cropped_boxes.clamp(min=0)
        area = (cropped_boxes[:, 1, :] - cropped_boxes[:, 0, :]).prod(dim=1)
        target["boxes"] = cropped_boxes.reshape(-1, 4)
        fields.append("boxes")

    # Se remueven elementos para los que el box tiene 0 de área
    if "boxes" in target:
        # Favorecer la selección de cuadros al definir qué elementos conservar
        cropped_boxes = target['boxes'].reshape(-1, 2, 2)
        keep = torch.all(cropped_boxes[:, 1, :] > cropped_boxes[:, 0, :], dim=1)

        target["labels"] = target["labels"][keep]
        target["boxes"] = target["boxes"][keep]

    return cropped_image, target

def resize(image, target, size, max_size=None):
    # el tamaño puede ser min_size (escalar) o (w, h) tupla

    def get_size_with_aspect_ratio(image_size, size, max_size=None):
        w, h = image_size
        if max_size is not None:
            min_original_size = float(min((w, h)))
            max_original_size = float(max((w, h)))
            if max_original_size / min_original_size * size > max_size:
                size = int(round(max_size * min_original_size / max_original_size))

        if (w <= h and w == size) or (h <= w and h == size):
            return (h, w)

        if w < h:
            ow = size
            oh = int(size * h / w)
        else:
            oh = size
            ow = int(size * w / h)

        return (oh, ow)

    def get_size(image_size, size, max_size=None):
        if isinstance(size, (list, tuple)):
            return size[::-1]
        else:
            return get_size_with_aspect_ratio(image_size, size, max_size)

    size = get_size(image.size, size, max_size)
    rescaled_image = F_vision.resize(image, size)

    if target is None:
        return rescaled_image, None

    ratios = tuple(float(s) / float(s_orig) for s, s_orig in zip(rescaled_image.size, image.size))
    ratio_width, ratio_height = ratios

    target = target.copy()
    if "boxes" in target:
        boxes = target["boxes"]
        scaled_boxes = boxes * torch.as_tensor([ratio_width, ratio_height, ratio_width, ratio_height])
        target["boxes"] = scaled_boxes

    return rescaled_image, target

def hflip(image, target):
    flipped_image = F_vision.hflip(image)

    w, h = image.size

    target = target.copy()
    if "boxes" in target:
        boxes = target["boxes"]
        boxes = boxes[:, [2, 1, 0, 3]] * torch.as_tensor([-1, 1, -1, 1]) + torch.as_tensor([w, 0, w, 0])
        target["boxes"] = boxes

    if "masks" in target:
        target['masks'] = target['masks'].flip(-1)

    return flipped_image, target

def bbox_xyxy_to_cxcywh(x):
  #Convertir x_0, y_0, x_1, y_1 a x_c, y_c, w, h para un bounding box (x_0, y_0), (x_0, y_1), (x_1, y_1), (x_1, y_0) donde x_c, y_c son x_center, y_center
  x0, y0, x1, y1 = x.unbind(-1)
  b = [(x0 + x1) / 2, (y0 + y1) / 2,
       (x1 - x0), (y1 - y0)]
  return torch.stack(b, dim=-1)

class Normalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, image, target=None):
        image = F_vision.normalize(image, mean=self.mean, std=self.std)
        if target is None:
            return image, None
        target = target.copy()
        h, w = image.shape[-2:]
        if "boxes" in target:
            boxes = target["boxes"]
            boxes = bbox_xyxy_to_cxcywh(boxes)
            boxes = boxes / torch.tensor([w, h, w, h], dtype=torch.float32)
            target["boxes"] = boxes
        return image, target

class ToTensor(object):
    def __call__(self, img, target=None):
        return F_vision.to_tensor(img), target

class RandomSizeCrop(object):
    def __init__(self, min_size: int, max_size: int):
        self.min_size = min_size
        self.max_size = max_size

    def __call__(self, img, target):
        w = random.randint(self.min_size, min(img.width, self.max_size))
        h = random.randint(self.min_size, min(img.height, self.max_size))
        region = T.RandomCrop.get_params(img, [h, w])
        return crop(img, target, region)

class RandomResize(object):
    def __init__(self, sizes, max_size=None):
        assert isinstance(sizes, (list, tuple))
        self.sizes = sizes
        self.max_size = max_size

    def __call__(self, img, target=None):
        size = random.choice(self.sizes)
        return resize(img, target, size, self.max_size)

class RandomSelect(object):
    """
    Selecciona aleatoriamente entre transforms1 y transforms2,
    con probabilidad p para transforms1 y (1 - p) para transforms2
    """
    def __init__(self, transforms1, transforms2, p=0.5):
        self.transforms1 = transforms1
        self.transforms2 = transforms2
        self.p = p

    def __call__(self, img, target):
        if random.random() < self.p:
            return self.transforms1(img, target)
        return self.transforms2(img, target)

class RandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, img, target):
        if random.random() < self.p:
            return hflip(img, target)
        return img, target

class TargetToTensor(object):
    def __call__(self, img, target):
        if target != None:
            target["labels"] = torch.tensor(target["labels"])
            target["boxes"] = torch.tensor(target["boxes"])

        return img, target

class Compose(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target=None):
        for t in self.transforms:
            transform_result = t(image, target)
            if type(transform_result) == tuple:
              image, target = transform_result
            else:
              image, target = transform_result, None
        if target is None:
          return image
        return image, target

    def __repr__(self):
        format_string = self.__class__.__name__ + "("
        for t in self.transforms:
            format_string += "\n"
            format_string += "    {0}".format(t)
        format_string += "\n)"
        return format_string

In [None]:
def get_tensors_shape(tensor_list):
  tensor_shapes_list = [list(tensor.shape) for tensor in tensor_list]

  max_tensor_shape = tensor_shapes_list[0]

  for tensor_shape in tensor_shapes_list[1:]:
    max_tensor_shape = [max(first_value, second_value) for first_value, second_value in zip(max_tensor_shape, tensor_shape)]
  
  tensors_shape = [len(tensor_list)] + max_tensor_shape

  return tensors_shape

def padding_tensors_and_generate_masks(tensor_list, return_masks):
  dtype = tensor_list[0].dtype
  device = tensor_list[0].device

  tensors_shape = get_tensors_shape(tensor_list)

  batch_size, channels, height, width = tensors_shape

  tensors_padded = torch.zeros((batch_size, channels, height, width), dtype=dtype, device=device)
  
  if return_masks == True:
    masks = torch.ones((batch_size, height, width), dtype=torch.bool, device=device)

    for original_tensor, tensor_padded, mask in zip(tensor_list, tensors_padded, masks):
      tensor_padded[:original_tensor.shape[0], :original_tensor.shape[1], :original_tensor.shape[2]] = original_tensor
      mask[:original_tensor.shape[1], :original_tensor.shape[2]] = False
  else:
    for original_tensor, tensor_padded in zip(tensor_list, tensors_padded):
      tensor_padded[:original_tensor.shape[0], :original_tensor.shape[1], :original_tensor.shape[2]] = original_tensor

  if return_masks == True:
    return tensors_padded, masks
  elif return_masks == False:
    return tensors_padded

def get_transformation(mode="train"):
  scales = [480, 512, 544, 576, 608, 640, 672, 704, 736, 768, 800]

  random_select = RandomSelect(
                                  RandomResize(scales, max_size=1333),
                                  Compose([
                                            RandomResize([400, 500, 600]),
                                            RandomSizeCrop(384, 600),
                                            RandomResize(scales, max_size=1333),
                                          ])
                              )
  
  normalize = Compose([
                        ToTensor(),
                        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                       ])

  if mode == "train":
    return Compose([
                      TargetToTensor(),
                      RandomHorizontalFlip(),
                      random_select,
                      normalize,
                    ])
  elif mode == "eval":
    return Compose([
                      TargetToTensor(),
                      RandomResize([800], max_size=1333),
                      normalize,
                    ])
  else:
    raise ValueError(f"Unknown mode ({mode})")

def process_images(images, mode, device, return_masks=False):
  transform = get_transformation(mode=mode)
  
  #Mean-Std normaliza las imágenes de entrada
  images = [transform(image) for image in images]

  if return_masks == True:
    images, masks = padding_tensors_and_generate_masks(images, return_masks)

    return images.to(device), masks.to(device)
  else:
    images = padding_tensors_and_generate_masks(images, return_masks)

    return images.to(device)

In [None]:
class ApplyTransformationToDataset(torch.utils.data.Dataset):
  def __init__(self, images_dataset, annotations_dataset, transform):
    self.images_dataset = images_dataset
    self.annotations_dataset = annotations_dataset
    self.transform = transform

  def __getitem__(self, index):
    image_path = self.images_dataset[index]
    annotation_path = self.annotations_dataset[index]

    image = Image.open(image_path)
    
    with open(annotation_path, "r") as json_file:
      target = json.load(json_file)

    image, target = self.transform(image, target)

    return image, target

  def __len__(self):
    return len(self.images_dataset)

In [None]:
def padding_sample_batch(sample_batch):
  images, targets = zip(*sample_batch)
  
  images, masks = padding_tensors_and_generate_masks(images, True)
  
  return {"images": images, "masks": masks}, [target for target in targets]

In [None]:
def box_iou(boxes1, boxes2):
    area1 = box_area(boxes1)
    area2 = box_area(boxes2)

    lt = torch.max(boxes1[:, None, :2], boxes2[:, :2])  # [N,M,2]
    rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:])  # [N,M,2]

    #Sujeta todos los elementos en la entrada en el rango [min, max]. Si min es Ninguno, no hay límite inferior. O bien, si max es Ninguno, no hay límite superior.
    wh = (rb - lt).clamp(min=0)  # [N,M,2]
    inter = wh[:, :, 0] * wh[:, :, 1]  # [N,M]

    union = area1[:, None] + area2 - inter

    iou = inter / union
    return iou, union


def generalized_box_iou(boxes1, boxes2):
    # Los boxes degenerados dan como resultados inf/nan por lo tanto se debe realizar una revisión
    assert (boxes1[:, 2:] >= boxes1[:, :2]).all()
    assert (boxes2[:, 2:] >= boxes2[:, :2]).all()
    iou, union = box_iou(boxes1, boxes2)

    lt = torch.min(boxes1[:, None, :2], boxes2[:, :2])
    rb = torch.max(boxes1[:, None, 2:], boxes2[:, 2:])

    wh = (rb - lt).clamp(min=0)  # [N,M,2]
    area = wh[:, :, 0] * wh[:, :, 1]

    return iou - (area - union) / area

def matching_function_for_bounding_boxes(predictions, targets):
  predictions = predictions.flatten(0, 1) # [batch_size * num_queries, 4]
  targets = torch.cat(targets)

  #p value para la distancia p-norm
  p_value_for_p_norm = 1
  
  #Calcula la distancia p-norm entre los boxes.
  cost_bbox = torch.cdist(predictions, targets, p=p_value_for_p_norm)

  #Calcula el costo GIoU (Generalized IoU) entre los boxes
  cost_giou = -generalized_box_iou(bbox_cxcywh_to_xyxy(predictions), bbox_cxcywh_to_xyxy(targets))

  return cost_bbox, cost_giou

def matching_function_for_classification(predictions, targets):
  predictions = predictions.flatten(0, 1) # [batch_size * num_queries, num_classes]
  targets = torch.cat(targets)

  # Calcula el costo de clasificación. Al contrario que el loss, no se usa el loss de probabilidad logarítmica negativa
  # pero aproximado en 1 - probabilidades[clase objetivo]. El 1 es una constante que no cambia el match,
  # por lo que puede ser omitido.
  cost_classification = -predictions[:, targets]

  return cost_classification

def matching_function(prediction_logits, prediction_bboxes, target_labels, target_bboxes, cost_for_classification=1, cost_for_bbox=5, cost_for_giou=2):
  prediction_probabilities = prediction_logits.softmax(-1)

  batch_size, num_queries, _ = prediction_probabilities.shape

  cost_classification = matching_function_for_classification(prediction_probabilities, target_labels)

  cost_bbox, cost_giou = matching_function_for_bounding_boxes(prediction_bboxes, target_bboxes)

  # Calcula el costo de la matriz
  total_cost = (cost_for_classification * cost_classification) + (cost_for_bbox * cost_bbox) + (cost_for_giou * cost_giou)
  
  total_cost = total_cost.view(batch_size, num_queries, -1)
  
  indices = list() # [(row_1, column_1), ..., (row_n, column_n)], donde n es igual a sum([len(bboxes) for bboxes in target_bboxes])

  for batch_index in range(batch_size):
    target_labels_of_batch = target_labels[batch_index]

    target_indexes_of_batch = list(range(len(target_labels_of_batch)))

    target_costs = total_cost[batch_index][:, target_indexes_of_batch]

    target_costs = target_costs.cpu().detach().numpy()

    #La función linear_sum_assignment encuentra las filas y columnas con los valores mínimos
    rows, columns = linear_sum_assignment(target_costs)

    rows = torch.as_tensor(rows, dtype=torch.int64)
    columns = torch.as_tensor(columns, dtype=torch.int64)
    
    indices.append((rows, columns))
  
  return indices

In [None]:
@torch.no_grad()
def accuracy(output, target, topk=(1,)):
    """Calcula la precisión para los valores especificados de k"""
    if target.numel() == 0:
        return [torch.zeros([], device=output.device)]
    maxk = max(topk)
    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0)
        res.append(correct_k.mul_(100.0 / batch_size))
    return res

class SetCriterion(nn.Module):
    """ Esta clase calcula el loss del DETR.
    El proceso se da en dos etapas:
        1) calculamos la asignación húngara entre los ground-truth boxes y los resultados del modelo
        2) Supervisamos cada pair de ground-truth / prediction (supervise class and box)
    """
    def __init__(self, num_classes, matcher, eos_coef=0.1, loss_ce=1, loss_bbox=5, loss_giou=2):
        """ Creación del criterio.
        Parámetros:
            num_classes: Número de las categorías omitiendo la categoría vacía
            matcher: módulo capaz de calcular una coincidencia entre objetivos y propuestas
            eos_coef: peso de clasificación relativo aplicado a la categoría vacía
        """
        super().__init__()
        self.num_classes = num_classes
        self.matcher = matcher
        self.eos_coef = eos_coef
        self.loss_ce = loss_ce
        self.loss_bbox = loss_bbox
        self.loss_giou = loss_giou

        empty_weight = torch.ones(self.num_classes + 1)
        empty_weight[-1] = self.eos_coef

        #Se agrega un búfer al módulo.

        #Esto se usa normalmente para registrar un búfer que no debe considerarse un parámetro de modelo. 
        #Por ejemplo, running_mean de BatchNorm no es un parámetro, pero es parte del estado del módulo. 
        #Los búferes, de forma predeterminada, son persistentes y se guardarán junto con los parámetros. 
        #Este comportamiento se puede cambiar configurando persistent en Falso. 
        #La única diferencia entre un búfer persistente y un búfer no persistente es que este último
        #no será parte del state_dict de este módulo.

        #Se puede acceder a los búferes como atributos utilizando nombres dados.
        self.register_buffer('empty_weight', empty_weight, persistent=True)

    def loss_labels(self, outputs, targets, indices, num_boxes, log=True):
        """Loss de clasificación (NLL)
        los dictados de objetivos deben contener las "etiquetas" clave que contienen un tensor de dim[nb_target_boxes]
        """
        assert 'pred_logits' in outputs
        src_logits = outputs['pred_logits']

        # permutar predicciones siguiendo índices
        idx = self._get_src_permutation_idx(indices)
        target_classes_o = torch.cat([t["labels"][J] for t, (_, J) in zip(targets, indices)])
        target_classes = torch.full(src_logits.shape[:2], self.num_classes,
                                    dtype=torch.int64, device=src_logits.device)
        target_classes[idx] = target_classes_o

        loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight)
        losses = {'loss_ce': loss_ce}

        if log == True:
            losses['class_error'] = 100 - accuracy(src_logits[idx], target_classes_o)[0]
        return losses

    @torch.no_grad()
    def loss_cardinality(self, outputs, targets, indices, num_boxes):
        """ Calcula el error de cardinalidad, es decir, el error absoluto en el número de casillas no vacías pronosticadas
        Esto no es realmente una pérdida, está destinado únicamente a fines de registro. No propaga gradientes.
        """
        pred_logits = outputs['pred_logits']
        device = pred_logits.device
        tgt_lengths = torch.as_tensor([len(v["labels"]) for v in targets], device=device)
        # Cuenta el número de predicciones que NO son "no-object" (que es la última clase)
        card_pred = (pred_logits.argmax(-1) != pred_logits.shape[-1] - 1).sum(1)
        card_err = F.l1_loss(card_pred.float(), tgt_lengths.float())
        losses = {'cardinality_error': card_err}
        return losses

    def loss_boxes(self, outputs, targets, indices, num_boxes):
        """Calcula las pérdidas relacionadas con los cuadros delimitadores, la pérdida de regresión L1 y la pérdida de GIoU
           los dictados de objetivos deben contener las "cajas" clave que contienen un tensor de dim[nb_target_boxes, 4]
           Los boxes de destino se esperan en formato (center_x, center_y, w, h), normalizados por el tamaño de la imagen
        """
        assert 'pred_bboxes' in outputs
        # permutar predicciones siguiendo índices
        idx = self._get_src_permutation_idx(indices)
        src_boxes = outputs['pred_bboxes'][idx]
        target_boxes = torch.cat([t['boxes'][i] for t, (_, i) in zip(targets, indices)], dim=0)

        loss_bbox = F.l1_loss(src_boxes, target_boxes, reduction='none')

        losses = {}
        losses['loss_bbox'] = loss_bbox.sum() / num_boxes

        loss_giou = 1 - torch.diag(generalized_box_iou(
            bbox_cxcywh_to_xyxy(src_boxes),
            bbox_cxcywh_to_xyxy(target_boxes)))
        losses['loss_giou'] = loss_giou.sum() / num_boxes
        return losses

    def _get_src_permutation_idx(self, indices):
        # Devuelve un tensor con el mismo tamaño que la entrada llena con fill_value
        batch_idx = torch.cat([torch.full_like(input=src, fill_value=i) for i, (src, _) in enumerate(indices)])
        src_idx = torch.cat([src for (src, _) in indices])
        return batch_idx, src_idx

    def forward(self, outputs, targets):
        """ Esto realiza el cálculo de pérdida.
        Parámetros:
             outputs: dict de tensores, consultar la especificación de salida del modelo para el formato
             targets: lista de dict, tanto comp len(targets) == batch_size.
                      Las claves esperadas en cada dict dependen de las pérdidas aplicadas, consultar el documento de cada pérdida
        """
        prediction_logits = outputs["pred_logits"]
        prediction_bboxes = outputs["pred_bboxes"]

        target_labels = [target["labels"] for target in targets]
        target_bboxes = [target["boxes"] for target in targets]

        # Recuperar la coincidencia entre las salidas de la última capa y los objetivos
        indices = self.matcher(prediction_logits, prediction_bboxes, target_labels, target_bboxes)

        # Calcula el número promedio de boxes de destino en todos los nodos, con fines de normalización
        num_boxes = sum(len(t["labels"]) for t in targets)
        num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device)
        num_boxes = torch.clamp(num_boxes, min=1).item()

        # Calcula todos los losses
        losses = dict()

        losses.update(self.loss_labels(outputs, targets, indices, num_boxes))
        losses.update(self.loss_cardinality(outputs, targets, indices, num_boxes))
        losses.update(self.loss_boxes(outputs, targets, indices, num_boxes))

        total_loss = (losses["loss_ce"] * self.loss_ce) + (losses["loss_bbox"] * self.loss_bbox) + (losses["loss_giou"] * self.loss_giou)

        return total_loss, losses["class_error"], losses["cardinality_error"]

In [None]:
def train_model(model, criterion, optimizer, lr_scheduler, epochs, train_dataset_loader, test_dataset_loader, max_norm=0.1):
  train_losses = []
  test_losses = []

  train_accuracies = []
  test_accuracies = []

  for epoch in range(epochs):
    train_loss = torch.tensor(0, dtype=torch.float32)
    test_loss = torch.tensor(0, dtype=torch.float32)

    train_accuracy = torch.tensor(0, dtype=torch.float32)
    test_accuracy = torch.tensor(0, dtype=torch.float32)

    ###################################################################################################################

    #Establecer el modelo en modo entrenamiento
    model.train()

    #Establecer el criterion en modo entrenamiento
    criterion.train()

    #Número de batches
    nb = len(train_dataset_loader)

    #Número de registros
    n_rows = len(train_dataset_loader.dataset)

    for batch_x_train, batch_y_train in train_dataset_loader:
      batch_x_train["images"] = batch_x_train["images"].to(device)
      batch_x_train["masks"] = batch_x_train["masks"].to(device)

      for targets in batch_y_train:
        targets["labels"] = targets["labels"].to(device)
        targets["boxes"] = targets["boxes"].to(device)

      #PyTorch acumula gradientes, por eso necesitamos eliminarlas antes de cada instancia
      model.zero_grad()

      batch_train_predictions = model([batch_x_train["images"], batch_x_train["masks"]])

      batch_train_loss, train_class_error, train_cardinality_error = criterion(batch_train_predictions, batch_y_train)

      train_loss_value = batch_train_loss.item()
      
      if not math.isfinite(train_loss_value):
        print("Batch Train Loss is {}, stopping training".format(train_loss_value))
        return
      
      #Sacamos la media para realizar el backward
      batch_train_loss.backward()

      if max_norm > 0:
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)

      optimizer.step()

      train_loss += batch_train_loss.item()

      train_accuracy += (100 - train_class_error.item())
    
    lr_scheduler.step()

    train_loss /= n_rows

    #Necesitamos usar detach porque esta sección no se ha ejecutado con torch.no_grad()
    #Eso implica que el tensor require grad
    train_loss = train_loss.detach().numpy()
    
    train_accuracy /= nb
    train_accuracy = train_accuracy.numpy()

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    ###################################################################################################################  
    
    #Establecer el modelo en modo prueba
    model.eval()

    #Establecer el criterion en modo prueba
    criterion.eval()

    #Número de batches
    nb = len(test_dataset_loader)

    #Número de registros
    n_rows = len(test_dataset_loader.dataset)
    
    with torch.no_grad():
      for batch_x_test, batch_y_test in test_dataset_loader:
        batch_x_test["images"] = batch_x_test["images"].to(device)
        batch_x_test["masks"] = batch_x_test["masks"].to(device)

        for targets in batch_y_test:
          targets["labels"] = targets["labels"].to(device)
          targets["boxes"] = targets["boxes"].to(device)

        batch_test_predictions = model([batch_x_test["images"], batch_x_test["masks"]])

        batch_test_loss, test_class_error, test_cardinality_error = criterion(batch_test_predictions, batch_y_test)
        test_loss += batch_test_loss.item()

        test_accuracy += (100 - test_class_error.item())
      
      test_loss /= n_rows
      test_loss = test_loss.numpy()

      test_accuracy /= nb
      test_accuracy = test_accuracy.numpy()

      test_losses.append(test_loss)
      test_accuracies.append(test_accuracy)
    
    ###################################################################################################################
    
    print("Epoch {}/{}\n----> loss: {:.4f} - accuracy: {:.4f} - val_loss: {:.4f} - val_accuracy: {:.4f}".format(epoch+1, epochs, train_loss, train_accuracy, test_loss, test_accuracy))

    model_file = "my_model/model_epoch_{}.pt".format(epoch+1)
    optimizer_file = "my_model/optimizer_epoch_{}.pt".format(epoch+1)
    lr_scheduler_file = "my_model/lr_scheduler_epoch_{}.pt".format(epoch+1)

    save_weights(model, optimizer, lr_scheduler, model_file, optimizer_file, lr_scheduler_file)
  
  return train_losses, test_losses, train_accuracies, test_accuracies

In [None]:
def show_results(train_losses, test_losses, train_accuracies, test_accuracies):
  fig, axes = plt.subplots(1, 2, figsize=(14,4))
  ax1, ax2 = axes
  ax1.plot(train_losses, label='train')
  ax1.plot(test_losses, label='test')
  ax1.set_xlabel('epoch'); ax1.set_ylabel('loss')
  ax2.plot(train_accuracies, label='train')
  ax2.plot(test_accuracies, label='test')
  ax2.set_xlabel('epoch'); ax2.set_ylabel('accuracy')
  for ax in axes: ax.legend()

In [None]:
#Para el post-procesamiento de los bounding box de salida
def bbox_cxcywh_to_xyxy(x):
  #Convertir de x_c, y_c, w, h a x_0, y_0, x_1, y_1 para un bounding box (x_0, y_0), (x_0, y_1), (x_1, y_1), (x_1, y_0) donde x_c, y_c son x_center, y_center
  x_c, y_c, w, h = x.unbind(1)
  b = [(x_c - 0.5 * w), (y_c - 0.5 * h),
       (x_c + 0.5 * w), (y_c + 0.5 * h)]
  return torch.stack(b, dim=1)

def rescale_bboxes(out_bbox, img_w, img_h):
  b = bbox_cxcywh_to_xyxy(out_bbox)
  b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32)
  return b

In [None]:
def predict(model, images, device, threshold=0.7):
  images_width = [image.width for image in images]
  images_height = [image.height for image in images]

  return_masks = True
  
  mode = "eval"

  if return_masks == False:
    images = process_images(images, mode=mode, device=device, return_masks=return_masks)
  else:
    images, masks = process_images(images, mode=mode, device=device, return_masks=return_masks)

  #Propagar a través del modelo
  if return_masks == False:
    outputs = model(images)
  else:
    outputs = model([images, masks])

  #Mantener solo donde predictions >= threshold
  images_probabilities = outputs["pred_logits"].softmax(-1)
  
  images_bboxes = outputs["pred_bboxes"]

  images_probabilities_and_bboxes = list()
  
  for image_probabilities, image_bboxes, image_width, image_height in zip(images_probabilities, images_bboxes, images_width, images_height):
    #Remover las clases extras
    image_probabilities = image_probabilities[:, :-1]

    keep = image_probabilities.max(-1).values > threshold

    image_probabilities = image_probabilities[keep].cpu()
    image_bboxes = image_bboxes[keep].cpu()

    #Convertir bounding boxes del rango [0, 1] a la escala de imagen original
    image_bboxes = rescale_bboxes(image_bboxes, image_width, image_height)

    images_probabilities_and_bboxes.append([image_probabilities, image_bboxes])
  
  return images_probabilities_and_bboxes

In [None]:
def plot_results(pil_img, prob, bboxes, classes, colors):
  plt.figure(figsize=(16,10))
  plt.imshow(pil_img)
  
  if prob.shape[0] != 0:
    ax = plt.gca()
    for p, (xmin, ymin, xmax, ymax), c in zip(prob, bboxes.tolist(), colors * 100):
        ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                    fill=False, color=c, linewidth=3))
        cl = p.argmax()
        text = f"{classes[cl]}: {p[cl]:0.2f}"
        print(text)
        ax.text(xmin, ymin, text, fontsize=15,
                bbox=dict(facecolor="yellow", alpha=0.5))
  else:
    print("There is no objects")
  
  plt.axis("off")
  plt.show()

## Model Training

In [None]:
%mkdir -p my_model

In [None]:
images_folder_path = "ImagesT"
annotations_folder_path = "AnnotationsT"

In [None]:
images_filenames = list()
annotations_filenames = list()

for file_name in listdir(images_folder_path):
  image_path = images_folder_path + "/" + file_name
  annotation_path = annotations_folder_path + "/" + file_name.rsplit(".", 1)[0] + ".json"

  if (exists(image_path) == True) and (exists(annotation_path) == True):
    images_filenames.append(image_path)
    annotations_filenames.append(annotation_path)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(images_filenames, annotations_filenames, test_size=0.15, random_state=0)

len(x_train), len(y_train), len(x_test), len(y_test)

In [None]:
#Modificar el valor para mejorar el entrenamiento
#batch_size = 1
batch_size = 8

In [None]:
train_transformations = get_transformation(mode="train")

train_data = ApplyTransformationToDataset(x_train, y_train, train_transformations)

train_dataset_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, collate_fn=padding_sample_batch, shuffle=True)

In [None]:
test_transformations = get_transformation(mode="eval")

test_data = ApplyTransformationToDataset(x_test, y_test, test_transformations)

test_dataset_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, collate_fn=padding_sample_batch, shuffle=False)

In [None]:
criterion = SetCriterion(num_classes, matching_function)

criterion.to(device)

In [None]:
param_dicts = [
                {"params": [parameter for name, parameter in model.named_parameters() if ("backbone" not in name) and (parameter.requires_grad == True)]},
                {
                    "params": [parameter for name, parameter in model.named_parameters() if ("backbone" in name) and (parameter.requires_grad == True)],
                    "lr": 0.00001,
                },
              ]

optimizer = torch.optim.AdamW(param_dicts, lr=0.0001, weight_decay=0.0001)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.0001)

In [None]:
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=200)

In [None]:
epochs = 50

model_file = "best_my_model/model_epoch_17.pt"
optimizer_file = "best_my_model/optimizer_epoch_17.pt"
lr_scheduler_file = "best_my_model/lr_scheduler_epoch_17.pt"

model, optimizer, lr_scheduler = reset_weights(model, optimizer, lr_scheduler, model_file, optimizer_file, lr_scheduler_file)
model.eval();

torch.cuda.empty_cache()

train_losses, test_losses, train_accuracies, test_accuracies = train_model(model, criterion, optimizer, lr_scheduler, epochs, train_dataset_loader, test_dataset_loader)

In [None]:
show_results(train_losses, test_losses, train_accuracies, test_accuracies)

## Model Predictions

In [None]:
#Modificar dependiendo del dataset
classes = ['vehicles', 'Ambulance', 'Bus', 'Car', 'Motorcycle', 'Truck']

#Colores para la visualización
colors = [[0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
          [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]]

In [None]:
#Modificar para usar la mejor época
model_file = "my_model/model_epoch_26.pt"
optimizer_file = "my_model/optimizer_epoch_26.pt"
lr_scheduler_file = "my_model/lr_scheduler_epoch_26.pt"

model, optimizer, lr_scheduler = reset_weights(model, optimizer, lr_scheduler, model_file, optimizer_file, lr_scheduler_file)

In [None]:
model.eval();

In [None]:
#image_path = "images1/00aaf0a0a9ee7e71_jpg.rf.808b1e59067887493dffad63561c2a9d.jpg"
image_path = "ImagesT/00aaf0a0a9ee7e71_jpg.rf.808b1e59067887493dffad63561c2a9d.jpg"

image = Image.open(image_path)

images_probabilities_and_bboxes = predict(model, [image], device)

In [None]:
scores, boxes = images_probabilities_and_bboxes[0]

plot_results(image, scores, boxes, classes, colors)