# <div align="center"><b> ETIQUETADO - PROYECTO FINAL </b></div>

<div align="right">📝 <em><small><font color='Gray'>Nota:</font></small></em></div>

<div align="right"> <em><small><font color='Gray'> La funcionalidad de visualización de jupyter notebooks en <a href="https://github.com/" target="_blank">github</a> es solamente un preview.</font></small></em> </div>

<div align="right"> <em><small><font color='Gray'> Para mejor visualización se sugiere utilizar el visualizador recomendado por la comunidad: <a href="https://nbviewer.org/" target="_blank">nbviewer</a></font></small></em> </div>

<div align="right"> <em><small><font color='Gray'> Puedes a acceder al siguiente enlace para ver este notebook en dicha página: <a href="https://nbviewer.org/ruta/de/archivo.ipynb">Ruta archivo</a></font></small></em> </div>

* * *

<style>
/* Limitar la altura de las celdas de salida en html */
.jp-OutputArea.jp-Cell-outputArea {
    max-height: 500px;
}
</style>

✋ <em><font color='DodgerBlue'>Importaciones:</font></em> ✋

In [1]:
import os, sys, logging, json, shutil, zipfile, copy, datetime, re, requests
from pathlib import Path
from typing import List, Dict, Tuple, Any, Optional, Literal
from concurrent.futures import ProcessPoolExecutor

sys.path.append(os.path.abspath("../"))  # Agregar el directorio padre al path

from pprint import pprint
from dotenv import load_dotenv

from mini_apps_config.settings import Config
from mini_apps_utils.utils import MongoDB, S3Client, set_log_to_file

from cvat_sdk import make_client
from cvat_sdk.core.proxies.types import Location

from pymongo import UpdateOne

from pycocotools.coco import COCO

import matplotlib.pyplot as plt

import layoutparser as lp

OPENCV_IO_MAX_IMAGE_PIXELS = 50000 * 50000  # Para imágenes grandes, ej: barrio3Ombues_20180801_dji_pc_3cm.jpg
os.environ["OPENCV_IO_MAX_IMAGE_PIXELS"] = str(OPENCV_IO_MAX_IMAGE_PIXELS)

import cv2 as cv
import numpy as np

from fastkml import kml
import kml2geojson

import geopandas as gpd
from shapely.geometry import Point, Polygon, box
import pandas as pd

from tqdm import tqdm

🔧 <em><font color='tomato'>Configuraciones:</font></em> 🔧


In [2]:
load_dotenv("../.env.dev")

# Crear instancia de Config
CONFIG = Config().config_data

# Configuración de MongoDB
DB = MongoDB(CONFIG["mongodb"]["connection_string"], CONFIG["mongodb"]["database"]).db

# Configuración minio
MINIO_CLIENT = S3Client(CONFIG["minio"]).client

# Configuración del logger
logging.basicConfig(
    format=CONFIG["logging"]["format"],
    level=logging.INFO if CONFIG["logging"]["level"] == "INFO" else logging.DEBUG,
)
logger = logging.getLogger()

MINIO_BUCKET = CONFIG["minio"]["bucket"]

MINIO_PATCHES_FOLDER = CONFIG["minio"]["paths"]["patches"]
download_folder = Path(CONFIG["folders"]["download_folder"])
DOWNLOAD_TASK_FOLDER = download_folder / "tasks"
DOWNLOAD_JOB_FOLDER = download_folder / "jobs"
DOWNLOAD_TEMP_FOLDER = download_folder / "temp"
DOWNLOAD_COCO_ANNOTATIONS_FOLDER = download_folder / "coco_annotations"
DOWNLOAD_IMAGES_FOLDER = download_folder / "images"
DOWNLOAD_PATCHES_FOLDER = download_folder / "patches"
DOWNLOAD_CUTOUTS_FOLDER = download_folder / "cutouts"
DOWNLOAD_CUTOUTS_METADATA_FOLDER = download_folder / "cutouts_metadata"
DOWNLOAD_GOOGLE_MAPS_FOLDER = download_folder / "google_maps"
KMLS_FOLDER = download_folder / "kmls"
GEOJSON_FOLDER = download_folder / "geojson"

logger.info("Configuración cargada correctamente.")

2025-05-17 12:02:46,092 - root - INFO - <module> - Configuración cargada correctamente.


<!-- Colab -->
<!-- <div align="center"><img src="https://drive.google.com/uc?export=view&id=1QSNrTsz1hQbmZwpgwx0qpfpNtLW19Orm" width="600" alt="Figura 1: A data scientist is working on word generation using the Lord of the Rings lore. The image is dark and moody, with a focus on the scientist's computer screen. The screen displays a visualization the one ring, with a map of Middle Earth in the background. - Generada con DALL-E3"></div> -->

<!-- <div align="center"><img src="./ceia-materia/resources/portada.jpeg" width="600" alt="Figura 1: A data scientist playing with convolutional neural networks. - Generada con Microsoft Image Creator"></div>

<div align="center"><small><em>Figura 1: A data scientist playing with convolutional neural networks. - Generada con Microsoft Image Creator</em></small></div> -->

<div align="center">✨Datos del proyecto:✨</div>

<p></p>

<div align="center">

| Subtitulo       | Etiquetado                                                                                                                             |
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| **Descrpción**  | Herramientas y scripts para el etiquetado y guardado de los datos                                                                      |
| **Integrantes** | Bruno Masoller (brunomaso1@gmail.com)                                                                                                  |

</div>

## Consinga

El objetivo de este proyecto es brindar herramientas y scripts para el etiquetado y guardado de los datos.

## Resolución

Utilidades de Ultralytics:

- [https://docs.ultralytics.com/es/usage/simple-utilities/](https://docs.ultralytics.com/es/usage/simple-utilities/)

### Anotaciones

Para las anotaciones, entre varios formatos estudiados, se eligió el formato de COCO.

- [https://roboflow.com/formats/coco-json](https://roboflow.com/formats/coco-json)
- [https://docs.voxel51.com/recipes/convert_datasets.html](https://docs.voxel51.com/recipes/convert_datasets.html)
- [https://stackoverflow.com/questions/75927857/how-to-convert-coco-json-to-yolov8-segmentation-format](https://stackoverflow.com/questions/75927857/how-to-convert-coco-json-to-yolov8-segmentation-format)

#### File

##### Cargar anotaciones desde file

In [3]:
def load_annotations_from_file(file_path: Path) -> Dict[str, Any]:
    """
    Carga anotaciones desde un archivo JSON.

    Args:
        file_path (Path): Ruta al archivo JSON.

    Returns:
        Dict[str, Any]: Anotaciones cargadas desde el archivo JSON.

    Raises:
        FileNotFoundError: Si el archivo no existe.
        json.JSONDecodeError: Si el archivo no es un JSON válido.
    """
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            annotations = json.load(f)
        return annotations
    except FileNotFoundError as e:
        raise FileNotFoundError(f"No se encontró el archivo: {file_path}")
    except json.JSONDecodeError as e:
        raise json.JSONDecodeError(f"Error al decodificar JSON en {file_path}: {e.msg}", e.doc, e.pos)
    except Exception as e:
        raise Exception(f"Error inesperado al cargar el archivo {file_path}: {e}")

#### CVAT

##### Descargar anotaciones CVAT

In [4]:
def download_annotations_from_cvat(
    task_id: Optional[int] = None,
    job_id: Optional[int] = None,
    output_filename: Optional[Path] = None,
) -> Optional[Path]:
    """Descarga las anotaciones de una tarea o de un job de CVAT y las guarda en un archivo JSON.
    Si el archivo ya existe, se elimina y se vuelve a descargar.

    Se guarda el archivo JSON en la carpeta de descarga especificada en la configuración.
    Se eliminan los archivos temporales después de la descarga.

    Args:
        task_id (int): Identificador de la tarea de CVAT a descargar.
        job_id (int): Identificador del job de CVAT a descargar.
        output_filename (Path, optional): Nombre del archivo a descargar. Si no se proporciona, se utiliza el nombre por defecto.

    Returns:
        Path: Ruta del archivo JSON descargado, o None en caso de error.
    """
    if bool(task_id is None) == bool(job_id is None):  # xor
        raise ValueError("Se debe proporcionar un task_id o un job_id.")
    try:
        client = make_client(
            host=CONFIG["cvat"]["url"],
            credentials=(CONFIG["cvat"]["user"], CONFIG["cvat"]["password"]),
        )

        try:
            retrieved = client.jobs.retrieve(job_id).fetch() if job_id else client.tasks.retrieve(task_id).fetch()
            retrieved_id = job_id if job_id else task_id

            Path(DOWNLOAD_TEMP_FOLDER).mkdir(parents=True, exist_ok=True)
            if output_filename is None:
                output_filename = Path(DOWNLOAD_TEMP_FOLDER) / f"cvat_task_{retrieved_id}.zip"

            if output_filename.exists():
                output_filename.unlink()
                logger.warning(
                    f"Archivo {output_filename} ya existía, se eliminó para seguir el proceso (se lo descarga otra vez)."
                )

            retrieved.export_dataset(
                format_name=CONFIG["cvat"]["export_format"],
                filename=str(output_filename),
                include_images=False,
                location=Location.LOCAL,
            )
        except Exception as e:
            msg = (
                f"Error al descargar el job {job_id}: {e}" if job_id else f"Error al descargar la tarea {task_id}: {e}"
            )
            raise ValueError(msg)
        logger.debug(f"Archivo {output_filename} descargado correctamente.")
    except TimeoutError as e:
        raise ConnectionError(f"Error al conectar con CVAT: {e}")
    except Exception as e:
        raise Exception(f"Error con CVAT: {e}")

    try:
        with zipfile.ZipFile(output_filename, "r") as zip_ref:
            zip_ref.extractall(DOWNLOAD_TEMP_FOLDER)
        logger.debug(f"Archivo {output_filename} descomprimido correctamente.")
        # Chequeamos si el archivo descargado fue correcto
        json_file = DOWNLOAD_TEMP_FOLDER / "annotations" / "instances_default.json"
        if not json_file.exists():
            raise FileNotFoundError(f"El archivo {json_file} no existe después de descomprimir.")
    except FileNotFoundError as e:
        raise
    except zipfile.BadZipFile as e:
        raise Exception(f"Error al descomprimir el archivo {output_filename}") from e

    if job_id:
        Path(DOWNLOAD_JOB_FOLDER).mkdir(parents=True, exist_ok=True)
        output_json_file = Path(DOWNLOAD_JOB_FOLDER) / f"cvat_job_{job_id}.json"
    else:
        Path(DOWNLOAD_TASK_FOLDER).mkdir(parents=True, exist_ok=True)
        output_json_file = Path(DOWNLOAD_TASK_FOLDER) / f"cvat_task_{task_id}.json"

    shutil.copy(json_file, output_json_file)
    logger.debug(f"Archivo JSON copiado a {output_json_file}.")

    try:
        output_filename.unlink()
        logger.debug(f"Archivo {output_filename} eliminado correctamente.")

        json_file.unlink()
        logger.debug(f"Archivo {json_file} eliminado correctamente.")
    except OSError as e:
        logger.warning(f"Error al eliminar el archivos: {e}")

    return output_json_file

##### Cargar anotaciones CVAT

In [5]:
def load_annotations_from_cvat(
    task_id: int = None, job_id: int = None, clean_files: bool = True
) -> Optional[Dict[str, Any]]:
    """Carga las anotaciones desde CVAT.

    Este método permite descargar y cargar anotaciones desde CVAT, ya sea para una tarea
    específica o un trabajo específico. Las anotaciones se descargan en formato JSON y
    se cargan en un diccionario.

    Args:
        task_id (int, optional): Identificador de la tarea en CVAT. Defaults to None.
        job_id (int, optional): Identificador del trabajo en CVAT. Defaults to None.
        clean_files (bool, optional): Si se deben eliminar los archivos temporales después de la carga. Defaults to True.

    Raises:
        ValueError: Si no se proporciona ni un task_id ni un job_id.

    Returns:
        Dict[str, Any]: Diccionario con las anotaciones cargadas en formato COCO.
    """
    if bool(task_id is None) == bool(job_id is None):  # xor
        raise ValueError("Se debe proporcionar un task_id o un job_id.")
    try:
        file_path = (
            download_annotations_from_cvat(task_id=task_id)
            if task_id
            else download_annotations_from_cvat(job_id=job_id)
        )
    except Exception as e:
        raise Exception(f"Error al descargar las anotaciones: {e}")

    if file_path:
        annotations = load_annotations_from_file(file_path)
        logger.debug(f"Anotaciones cargadas desde {file_path}.")
        if clean_files and file_path and file_path.exists():
            try:
                file_path.unlink()
                logger.debug(f"Archivo {file_path} eliminado correctamente.")
            except OSError as e:
                logger.warning(f"Error al eliminar el archivo: {e}")
        return annotations
    else:
        raise FileNotFoundError(f"No se pudo encontrar el archivo de anotaciones en {file_path}.")

#### Funciones de traslación entre coordenadas

##### Funciones sobre bboxes

In [6]:
def convert_bbox_patch_to_image(patch_bbox: list, x_start: int, y_start: int) -> list:
    """Convierte las coordenadas de un bounding box (bbox) de un parche a coordenadas de la imagen
    (bbox_local -> bbox_image).

    En el formato COCO, los bounding boxes se representan como [x_min, y_min, width, height].
    Este método transforma las coordenadas locales de un parche a las coordenadas globales
    de la imagen original aplicando una traslación basada en los valores de desplazamiento
    proporcionados (x_start, y_start).

    Args:
        patch_bbox (list): Bounding box del parche en formato [x_min, y_min, width, height].
        x_start (int): Coordenada x inicial del parche en la imagen original.
        y_start (int): Coordenada y inicial del parche en la imagen original.

    Returns:
        list: Bounding box transformado a coordenadas de la imagen en formato [x_global, y_global, width, height].
    """
    x_min, y_min, width, height = patch_bbox
    x_image = x_min + x_start
    y_image = y_min + y_start
    return [x_image, y_image, width, height]

In [7]:
def convert_bbox_image_to_patch(
    bbox_image: list, x_start: int, y_start: int, patch_width: int, patch_height: int
) -> Optional[list]:
    """Convierte las coordenadas de un bounding box (bbox) de una imagen a coordenadas locales de un parche.

    Este método transforma las coordenadas globales de un bounding box en una imagen
    a coordenadas locales dentro de un parche específico, siempre y cuando el bounding box
    esté completamente contenido dentro del parche.

    Args:
      bbox_global (list): Bounding box global en formato [x_min, y_min, width, height].
      x_start (int): Coordenada x inicial del parche en la imagen original.
      y_start (int): Coordenada y inicial del parche en la imagen original.
      patch_width (int): Ancho del parche.
      patch_height (int): Alto del parche.

    Returns:
      list: Bounding box transformado a coordenadas locales del parche en formato [x_local, y_local, width, height].
          Devuelve None si el bounding box no está completamente contenido dentro del parche.
    """
    x_min, y_min, w, h = bbox_image

    # Verificar que el bbox global esté dentro del parche
    if x_min < x_start or y_min < y_start or x_min + w > x_start + patch_width or y_min + h > y_start + patch_height:
        return None  # Fuera de los límites del parche

    # Transformar a coordenadas locales
    xl = x_min - x_start
    yl = y_min - y_start
    return [xl, yl, w, h]

In [8]:
def get_bbox_center(bbox: list) -> Tuple[float, float]:
    """Calcula el centro de un bounding box (bbox) en formato COCO.

    Args:
        bbox (list): Bounding box en formato [x_min, y_min, width, height].

    Returns:
        tuple: Coordenadas del centro del bounding box (x_center, y_center).
    """
    x_min, y_min, width, height = bbox
    x_center = x_min + width / 2
    y_center = y_min + height / 2
    return (x_center, y_center)

In [9]:
# TODO: Definir el tipo jgw_data

def convert_bbox_image_to_world(bbox: list, jgw_data: Dict[str, Any]) -> Dict[str, tuple]:
    """Convierte un bounding box de coordenadas de imagen a coordenadas del mundo.

    Este método transforma las coordenadas de un bounding box definido en el sistema
    de coordenadas de la imagen a coordenadas en el sistema del mundo, utilizando
    los parámetros de transformación proporcionados.

    Args:
      bbox (list): Bounding box en coordenadas de la imagen en formato [x_min, y_min, width, height].
      jgw_data (Dict[str, Any]): Diccionario con los parámetros de transformación del archivo JGW.
                     Debe contener las claves:
                     - "x_pixel_size": Tamaño del píxel en X.
                     - "y_rotation": Rotación en Y.
                     - "x_rotation": Rotación en X.
                     - "y_pixel_size": Tamaño del píxel en Y.
                     - "x_origin": Origen en X.
                     - "y_origin": Origen en Y.

    Returns:
      Dict[str, tuple]: Bounding box en coordenadas del mundo. Cada clave representa un vértice
                del bounding box ("tl", "tr", "br", "bl") y su valor es una tupla (X, Y).
    """
    A = jgw_data["x_pixel_size"]
    D = jgw_data["y_rotation"]
    B = jgw_data["x_rotation"]
    E = jgw_data["y_pixel_size"]
    C = jgw_data["x_origin"]
    F = jgw_data["y_origin"]

    x0, y0, w, h = bbox
    # Definir los 4 vértices en píxeles
    corners = {
        "tl": (x0, y0),
        "tr": (x0 + w, y0),
        "br": (x0 + w, y0 + h),
        "bl": (x0, y0 + h),
    }

    world_bbox = {}
    for name, (px, py) in corners.items():
        X = A * px + B * py + C
        Y = D * px + E * py + F
        world_bbox[name] = (X, Y)

    return world_bbox

In [10]:
def convert_bbox_world_to_image(world_bbox: list, jgw_data: Dict[str, Any]) -> list:
    """Convierte un bounding box en coordenadas del mundo a coordenadas de la imagen.

    Este método transforma las coordenadas de un bounding box definido en el sistema de coordenadas del mundo
    a coordenadas en píxeles dentro de la imagen, utilizando los parámetros de transformación proporcionados.

    Args:
        world_bbox (list): Bounding box en coordenadas del mundo. Debe ser un diccionario con las claves:
                           "tl" (top-left), "tr" (top-right), "br" (bottom-right), "bl" (bottom-left),
                           donde cada clave tiene un valor de tupla (X, Y).
        jgw_data (dict): Diccionario con los parámetros de transformación del archivo JGW. Debe contener las claves:
                         - "x_pixel_size": Tamaño del píxel en X.
                         - "y_rotation": Rotación en Y.
                         - "x_rotation": Rotación en X.
                         - "y_pixel_size": Tamaño del píxel en Y.
                         - "x_origin": Origen en X.
                         - "y_origin": Origen en Y.

    Raises:
        ValueError: Si la transformación no es invertible (determinante ≈ 0).

    Returns:
        list: Bounding box en coordenadas de la imagen en formato [x_min, y_min, width, height].

    Ejemplo:
        world_bbox = {
            "tl": (100.0, 200.0),
            "tr": (150.0, 200.0),
            "br": (150.0, 250.0),
            "bl": (100.0, 250.0),
        }
        jgw_data = {
            "x_pixel_size": 0.5,
            "y_rotation": 0.0,
            "x_rotation": 0.0,
            "y_pixel_size": -0.5,
            "x_origin": 50.0,
            "y_origin": 300.0,
        }
        bbox_image = convert_bbox_world_to_image(world_bbox, jgw_data)
        # bbox_image = [100.0, 100.0, 50.0, 50.0]
    """
    # Leer parámetros
    A = jgw_data["x_pixel_size"]
    B = jgw_data["x_rotation"]
    D = jgw_data["y_rotation"]
    E = jgw_data["y_pixel_size"]
    C = jgw_data["x_origin"]
    F = jgw_data["y_origin"]

    # Montar matriz y calcular su inversa
    M = np.array([[A, B], [D, E]])
    det = A * E - B * D
    if abs(det) < 1e-12:
        raise ValueError("Transformación no invertible (det≈0).")
    M_inv = (1.0 / det) * np.array([[E, -B], [-D, A]])

    # Transformar cada vértice
    pixels = []
    for corner in ("tl", "tr", "br", "bl"):
        X, Y = world_bbox[corner]
        vec = np.dot(M_inv, np.array([X - C, Y - F]))
        pixels.append(vec)

    # Extraer coordenadas
    xs = [p[0] for p in pixels]
    ys = [p[1] for p in pixels]

    x_min, x_max = min(xs), max(xs)
    y_min, y_max = min(ys), max(ys)
    return [x_min, y_min, x_max - x_min, y_max - y_min]

##### Funciones sobre puntos

In [11]:
def convert_point_patch_to_image(point: Tuple[float, float], x_start: int, y_start: int) -> Tuple[float, float]:
    """Convierte las coordenadas de un punto de un parche a coordenadas de la imagen.

    Este método transforma las coordenadas locales de un punto dentro de un parche
    a coordenadas en la imagen original aplicando una traslación basada
    en los valores de desplazamiento proporcionados (x_start, y_start).

    Args:
        point (tuple): Coordenadas del punto en el parche en formato (x, y).
        x_start (int): Coordenada x inicial del parche en la imagen original.
        y_start (int): Coordenada y inicial del parche en la imagen original.

    Returns:
        tuple: Coordenadas del punto en la imagen en formato (x_image, y_image).
    """
    x_p, y_p = point
    return x_p + x_start, y_p + y_start

In [12]:
def convert_point_image_to_patch(
    point: Tuple[float, float], x_start: int, y_start: int, patch_width: int, patch_height: int
) -> tuple:
    """Convierte las coordenadas de un punto de la imagen a coordenadas locales de un parche.

    Este método transforma las coordenadas globales de un punto en la imagen
    a coordenadas locales dentro de un parche específico, siempre y cuando el punto
    esté contenido dentro del parche.

    Args:
        point (tuple): Coordenadas del punto en la imagen en formato (x, y).
        x_start (int): Coordenada x inicial del parche en la imagen original.
        y_start (int): Coordenada y inicial del parche en la imagen original.
        patch_width (int): Ancho del parche.
        patch_height (int): Alto del parche.

    Returns:
        tuple: Coordenadas locales del punto en el parche en formato (x_local, y_local).
               Devuelve None si el punto no está contenido dentro del parche.
    """
    x_p, y_p = point

    # Verificar que el punto esté dentro del parche
    if x_p < x_start or y_p < y_start or x_p > x_start + patch_width or y_p > y_start + patch_height:
        return None  # Fuera de los límites del parche

    # Transformar a coordenadas locales
    xl = x_p - x_start
    yl = y_p - y_start
    return xl, yl

In [13]:
def convert_point_image_to_world(point: Tuple[float, float], jgw_data: Dict[str, Any]) -> tuple:
    """
    Convierte un punto de coordenadas de la imagen a coordenadas del mundo.

    Este método transforma las coordenadas de un punto definido en el sistema
    de coordenadas de la imagen a coordenadas en el sistema del mundo, utilizando
    los parámetros de transformación proporcionados.

    Args:
        point (tuple): Coordenadas del punto en la imagen en formato (x, y).
        jgw_data (Dict[str, Any]): Diccionario con los parámetros de transformación del archivo JGW.
                                   Debe contener las claves:
                                   - "x_pixel_size": Tamaño del píxel en X.
                                   - "y_rotation": Rotación en Y.
                                   - "x_rotation": Rotación en X.
                                   - "y_pixel_size": Tamaño del píxel en Y.
                                   - "x_origin": Origen en X.
                                   - "y_origin": Origen en Y.

    Returns:
        tuple: Coordenadas del punto en el sistema del mundo en formato (X, Y).
    """
    x_p, y_p = point
    A = jgw_data["x_pixel_size"]
    B = jgw_data["x_rotation"]
    D = jgw_data["y_rotation"]
    E = jgw_data["y_pixel_size"]
    C = jgw_data["x_origin"]
    F = jgw_data["y_origin"]

    X = A * x_p + B * y_p + C
    Y = D * x_p + E * y_p + F
    return X, Y

In [14]:
def convert_point_world_to_image(point: Tuple[float, float], jgw_data: Dict[str, Any]) -> Tuple[float, float]:
    """
    Convierte un punto de coordenadas del mundo a coordenadas de la imagen.

    Este método transforma las coordenadas de un punto definido en el sistema
    de coordenadas del mundo a coordenadas en píxeles dentro de la imagen,
    utilizando los parámetros de transformación proporcionados.

    Args:
        point (tuple): Coordenadas del punto en el sistema del mundo en formato (X, Y).
        jgw_data (Dict[str, Any]): Diccionario con los parámetros de transformación del archivo JGW.
                                   Debe contener las claves:
                                   - "x_pixel_size": Tamaño del píxel en X.
                                   - "y_rotation": Rotación en Y.
                                   - "x_rotation": Rotación en X.
                                   - "y_pixel_size": Tamaño del píxel en Y.
                                   - "x_origin": Origen en X.
                                   - "y_origin": Origen en Y.

    Raises:
        ValueError: Si la transformación no es invertible (determinante ≈ 0).

    Returns:
        tuple: Coordenadas del punto en el sistema de la imagen en formato (x, y).
    """
    X, Y = point
    A, B = jgw_data["x_pixel_size"], jgw_data["x_rotation"]
    D, E = jgw_data["y_rotation"], jgw_data["y_pixel_size"]
    C, F = jgw_data["x_origin"], jgw_data["y_origin"]
    det = A * E - B * D
    if abs(det) < 1e-12:
        raise ValueError("Transformación no invertible (det≈0).")
    M_inv = (1.0 / det) * np.array([[E, -B], [-D, A]])
    vec = np.dot(M_inv, np.array([X - C, Y - F]))
    return float(vec[0]), float(vec[1])

#### Mongo DB

##### Guardar anotaciones Mongo DB

In [15]:
def convert_image_annotations_to_cvat_annotations(
    images: Dict[str, Any], annotations: Dict[str, Any]
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """Esta función convierte las anotaciones de las imágenes a un formato igual al que se descarga de CVAT. O sea, en base a los parches de la imagen.

    Se utiliza para cargar directamente las anotaciones en que las imagenes es la imagen completa y no parches.
    Para ello, se transforman las imágenes a los parches asociados. O sea, los bbox de las imágenes se asocian a
    parches correspondientes.

    Este método modifica las anotaciones, ya que los bbox de la imagen ahora pertenece a los parches.
    Como resumen, el diccionario "images" tendría los parches y el diccionario "annotations" tendría los bbox de los parches,
    pero todo esto en concordancia con los bboxes de la imagen original.

    Args:
        images (Dict[str, Any]): Diccionario que contiene las imágenes y sus metadatos.
        annotations (Dict[str, Any]): Diccionario que contiene las anotaciones y sus metadatos.

    Returns:
        Tuple[Dict[str, Any], Dict[str, Any]]: imágenes y anotaciones convertidas al formato de CVAT.
    """
    output_images = []
    output_annotations = []

    imagenes = DB.get_collection("imagenes")
    patches_data_list = []
    for image in images:
        image_annotations = [ann for ann in annotations if ann["image_id"] == image["id"]]
        if not image_annotations:
            logger.warning(f"No se encontraron anotaciones para la imagen {image['id']}.")
            continue

        # Obtener los parches asociados a la imagen
        image_name = image["file_name"].split(".")[0]  # Obtenemos el nombre de la imagen sin la extensión
        db_image = imagenes.find_one({"id": image_name})
        if not db_image:
            raise ValueError(
                f"No se encontraron parches para la imagen {image['id']}. Toda imagen debe tener al menos un parche asociados."
            )

        for db_patch in db_image["patches"]:
            patch_data = {}
            file_name = f"{MINIO_PATCHES_FOLDER}/{image_name}/{db_patch['patch_name']}.jpg"
            patch_data["image"] = {
                "file_name": file_name,
                "height": db_patch["height"],
                "width": db_patch["width"],
                "date_captured": db_image["date_captured"].strftime("%Y-%m-%d %H:%M:%S"),
            }
            patch_data["annotations"] = []
            for annotation in image_annotations:
                # Verificar si el bbox de la imagen está dentro del parche
                patch_bbox = convert_bbox_image_to_patch(
                    annotation["bbox"], db_patch["x_start"], db_patch["y_start"], db_patch["width"], db_patch["height"]
                )
                if patch_bbox is not None:
                    # Transformar el bbox de la imagen a coordenadas locales del parche
                    patch_data["annotations"].append({**annotation, "bbox": patch_bbox})
            if not patch_data["annotations"]:
                logger.warning(
                    f"No se encontraron anotaciones para el parche {db_patch['patch_name']} de la imagen {image_name}."
                )
                continue

            # Agregar el parche a la lista de parches
            patches_data_list.append(patch_data)

    # Procesar los parches
    for id, patch_data in enumerate(patches_data_list):
        image_id = id + 1
        image = {
            "id": image_id,
            **patch_data["image"],
        }

        annotations = [
            {
                "id": i + 1,
                "image_id": image_id,
                **ann,
            }
            for i, ann in enumerate(patch_data["annotations"])
        ]

        output_images.append(image)
        output_annotations.extend(annotations)

    return output_images, output_annotations

In [16]:
def convert_patch_annotations_to_cvat_annotations(
    images: Dict[str, Any], annotations: Dict[str, Any]
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """Esta función convierte las anotaciones de los parches a un formato igual al que se descarga de CVAT.

    Se utiliza para cargar directamente las anotaciones en que las imágenes son parches, por lo que están
    en el formato:
    {
        "id": 0,
        "width": 4096,
        "height": 4096,
        "file_name": "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0.jpg",
        "date_captured": "2025-05-05 21:00:34"
    },

    El objetivo es simplemente cambiar el "file_name" agregando el nombre de la imagen original (obtenido de la base de datos)
    y el prefijo "parche", para que se pueda procesar como si fuera descargado de CVAT.
    Para el diccionario de anotaciones, no se realiza ningún cambio, ya que se espera que
    ya esté en el formato correcto.

    Args:
        images (Dict[str, Any]): Diccionario con las imágenes y sus metadatos.
        annotations (Dict[str, Any]): Diccionario con las anotaciones y sus metadatos.

    Returns:
        Tuple[Dict[str, Any], Dict[str, Any]]: imagenes y anotaciones convertidas al formato de CVAT.
    """
    imagenes = DB.get_collection("imagenes")
    for image in images:
        file_name = image["file_name"]
        patch_name = file_name.split(".")[0]  # Quitar la extensión

        # Obtener el nombre de la imagen original desde la base de datos
        image_name = imagenes.find_one({"patches.patch_name": patch_name})
        if not image_name:
            raise ValueError(f"No se encontró la imagen original para el parche {patch_name}.")

        image["file_name"] = f"{MINIO_PATCHES_FOLDER}/{image_name['id']}/{image["file_name"]}"

    return images, annotations

In [17]:
def save_coco_annotations(coco_annotations: Dict[str, Any], field_name: str, annotation_type: Literal["images", "patches", "cvat"] = "cvat") -> bool:
    """Guarda las anotaciones en la base de datos MongoDB. Las anotaciones se guardan
    en los parches correspondientes, por más que se pase una anotaciones de una imagen completa.
    Se eliminan las anotaciones existentes para cada parche antes de guardar las nuevas.

    Este método procesa las anotaciones en formato COCO y las guarda en la base de datos MongoDB
    asociándolas a sus parches correspondientes. Se eliminan las anotaciones existentes
    para cada parche antes de guardar las nuevas. Las anotaciones se guardan en un campo específico
    de la base de datos, cuyo nombre se pasa como argumento.

    Se espera que las anotaciones sean todos parches o todas imágenes completas. No una combinación de ambos.
    Si se pasan anotaciones de imágenes completas

    Args:
        annotations (dict[str, any]): Diccionario con las anotaciones en formato COCO.
        field_name (str): Nombre del campo donde se guardarán las anotaciones en la base de datos.

    Returns:
        bool: True si las anotaciones se guardaron correctamente, False en caso contrario.
    """
    images = coco_annotations["images"]
    annotations = coco_annotations["annotations"]
    categories = coco_annotations["categories"]

    category_map = {cat["id"]: cat["name"] for cat in categories}

    image_patch_pairs = []
    upsert_operations = []

    if annotation_type == "images":
        images, annotations = convert_image_annotations_to_cvat_annotations(images, annotations)
    elif annotation_type == "patches":
        images, annotations = convert_patch_annotations_to_cvat_annotations(images, annotations)

    for image in images:
        file_name = image["file_name"]
        image_id = image["id"]

        # Obtener el nombre de la imagen y el parche
        # Ejemplo de file_name: "patches/Barrio17metros_20231212_dji_rtk_pc_5cm/Barrio17metros_20231212_dji_rtk_pc_5cm_patch_4.jpg"
        # imagen: quedarme con la subcarpeta de la imagen
        # parche: quedarme con el ultimo elemento del path y quitar la extension
        image_name = file_name.split("/")[-2]
        patch_name = file_name.split("/")[-1].split(".")[0]

        # Preparar los datos filtrados sin image_id
        patch_annotations = [
            {k: v for k, v in copy.deepcopy(ann).items() if k != "image_id"}
            for ann in annotations
            if ann["image_id"] == image_id
        ]

        if not patch_annotations:
            logger.warning(f"No se encontraron anotaciones para la imagen {file_name}.")
        else:
            logger.debug(f"Se encontraron {len(patch_annotations)} anotaciones para {file_name}.")

            # Mapear las categorías a los nombres correspondientes
            patch_annotations = [
                {**ann, "category_name": category_map[ann["category_id"]]} for ann in patch_annotations
            ]

            # Eliminar la clave category_id de las anotaciones
            patch_annotations = [{k: v for k, v in ann.items() if k != "category_id"} for ann in patch_annotations]

            # Crear operación de actualización para MongoDB
            upsert_operations.append(
                UpdateOne(
                    {"id": image_name, "patches.patch_name": patch_name},
                    {
                        "$set": {
                            # Actualizar anotaciones en el patch específico
                            f"patches.$.{field_name}_annotations": patch_annotations,
                            # Actualizar la fecha de modificación a hoy
                            f"patches.$.last_modified": datetime.datetime.now(),
                        }
                    },
                    upsert=True,
                )
            )

            image_patch_pairs.append((image_name, patch_name))

    # Proceder con las operaciones en la base de datos
    if image_patch_pairs:
        imagenes = DB.get_collection("imagenes")

        # Primero, para cada par de imagen/parche, eliminar las anotaciones existentes
        for image_name, patch_name in image_patch_pairs:
            # Utilizar arrayFilters para actualizar sólo el elemento específico del array
            imagenes.update_one(
                {"id": image_name, "patches.patch_name": patch_name},
                {"$set": {f"patches.$.{field_name}_annotations": []}},
            )

        logger.info(f"Se eliminaron las anotaciones de {len(image_patch_pairs)} imágenes/parches.")

        # Luego realizar las operaciones de actualización/inserción
        if upsert_operations:
            logger.info(f"Se van a ejecutar {len(upsert_operations)} operaciones de actualización.")
            result = imagenes.bulk_write(upsert_operations, ordered=False)
            logger.info(f"Documentos modificados: {result.modified_count}")
        return True
    else:
        logger.warning("No se encontraron pares de imagen/parche para actualizar.")
        return False

##### Descargar anotación MongoDB

In [18]:
def get_image_id_from_annotations(image_name: str, coco_annotations: Dict[str, Any]) -> Optional[int]:
    """
    Obtiene el ID de una imagen a partir de las anotaciones en formato COCO.

    Busca el ID de la imagen en las anotaciones COCO utilizando el nombre de la imagen.
    Si el nombre de la imagen no tiene extensión, se le añade ".jpg" para la búsqueda,
    dado que el formato COCO suele incluir la extensión en el campo "file_name".
    Si no se encuentra el ID, se lanza una excepción.

    Args:
        image_name (str): Nombre de la imagen (con o sin extensión .jpg).
        coco_annotations (Dict[str, Any]): Diccionario con las anotaciones en formato COCO.

    Raises:
        ValueError: Si no se encuentra el ID de la imagen en las anotaciones.

    Returns:
        Optional[int]: ID de la imagen si se encuentra, de lo contrario None.
    """
    if not coco_annotations["annotations"]:
        logger.warning("No hay anotaciones en el archivo COCO.")
        return None

    image_id = next(
        (
            img["id"]
            for img in coco_annotations["images"]
            if img["file_name"] == image_name or img["file_name"] == f"{image_name}.jpg"
        ),
        None,
    )

    if not image_id:
        raise ValueError(f"No se encontró el id de la imagen {image_name} en las anotaciones.")
    return image_id

In [19]:
def create_patch_fields(
    db_image: Dict[str, Any],
    field_name: str,
    category_map: Dict[str, Any],
    patch_name: str,
) -> Optional[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
    """
    Crea anotaciones en formato COCO para un parche específico.

    Este método toma las anotaciones de un parche almacenadas en la base de datos,
    las procesa y las convierte al formato COCO, incluyendo las categorías y la
    información de la imagen asociada al parche.

    Args:
        db_image (Dict[str, Any]): Documento de la imagen en la base de datos que contiene
                                   información sobre los parches y sus anotaciones.
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        category_map (Dict[str, Any]): Mapeo de nombres de categorías a sus IDs en formato COCO.
        patch_name (str): Nombre del parche cuyas anotaciones se desean procesar.

    Returns:
        Optional[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
            Una tupla que contiene:
            - Una lista de anotaciones en formato COCO.
            - Una lista con la información de la imagen asociada al parche.
            Devuelve None si no se encuentran anotaciones para el parche.
    """
    patch = next((p for p in db_image.get("patches", []) if p["patch_name"] == patch_name), None)

    annotations = patch.get(f"{field_name}_annotations", [])
    if not annotations:
        logger.warning(f"No se encontraron anotaciones para el parche {patch_name}.")

    image = {
        "id": 1,
        "width": patch["width"],
        "height": patch["height"],
        "file_name": f"{patch_name}.jpg",
        "date_captured": patch.get("last_modified", db_image["date_captured"]).strftime("%Y-%m-%d %H:%M:%S"),
    }

    images = [image]

    # Mapeamos las categorías a los nombres correspondientes
    annotations = [
        {
            **ann,
            "category_id": category_map[ann["category_name"]],
            "image_id": image["id"],
        }
        for ann in annotations
    ]

    # Eliminamos la clave category_name de las anotaciones
    annotations = [{k: v for k, v in ann.items() if k != "category_name"} for ann in annotations]

    return annotations, images

In [20]:
def create_images_fields(
    db_image: Dict[str, Any],
    field_name: str,
    category_map: Dict[str, Any],
    image_name: str,
) -> Optional[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
    """
    Crea anotaciones en formato COCO para una imagen específica.

    Este método toma las anotaciones de los parches asociados a una imagen almacenada
    en la base de datos, las procesa y las convierte al formato COCO, incluyendo las
    categorías y la información de la imagen.

    Args:
        db_image (Dict[str, Any]): Documento de la imagen en la base de datos que contiene
                                   información sobre los parches y sus anotaciones.
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        category_map (Dict[str, Any]): Mapeo de nombres de categorías a sus IDs en formato COCO.
        image_name (str): Nombre de la imagen cuyas anotaciones se desean procesar.

    Returns:
        Optional[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
            Una tupla que contiene:
            - Una lista de anotaciones en formato COCO.
            - Una lista con la información de la imagen asociada.
            Devuelve None si no se encuentran anotaciones para la imagen.
    """
    image = {
        "id": 1,
        "width": db_image["width"],
        "height": db_image["height"],
        "file_name": f"{db_image["id"]}.jpg",
        "date_captured": db_image["date_captured"].strftime("%Y-%m-%d %H:%M:%S"),
    }

    images = [image]

    # 2 - Obtener los nombres de los parches asociados a la imagen
    db_patches = db_image.get("patches", [])
    annotations = []

    # 3 - Para cada annotación, mapeamos los bboxes a bboxes de la imagen
    for patch in db_patches:
        patch_annotations = patch.get(f"{field_name}_annotations", [])
        if not patch_annotations:
            logger.warning(f"No se encontraron anotaciones para el parche {patch['patch_name']}.")
            continue

        # Mapeamos las categorías a los nombres correspondientes
        # Agregamos el campo image_id y modificamos el id de la anotación
        # Convertimos los bboxes del parche a la imagen
        patch_annotations = [
            {
                **ann,
                "category_id": category_map[ann["category_name"]],
                "image_id": image["id"],
                "bbox": convert_bbox_patch_to_image(ann["bbox"], patch["x_start"], patch["y_start"]),
            }
            for i, ann in enumerate(patch_annotations)
        ]

        # Eliminamos la clave category_name de las anotaciones
        patch_annotations = [{k: v for k, v in ann.items() if k != "category_name"} for ann in patch_annotations]

        # Agregamos las anotaciones a la lista de anotaciones
        annotations.extend(patch_annotations)

    # Agregamos el id del parche a cada anotación
    # Se podría mejorar esto agregándolo en el for, utilizando un generador para mejorar
    # la performance.
    annotations = [{**ann, "id": i + 1} for i, ann in enumerate(annotations)]

    return annotations, images

In [21]:
def download_annotation_as_coco_from_mongodb(
    field_name: str,
    patch_name: Optional[str] = None,
    image_name: Optional[str] = None,
    output_filename: Optional[Path] = None,
) -> Optional[Path]:
    """
    Descarga anotaciones en formato COCO desde MongoDB.

    Este método permite descargar anotaciones almacenadas en MongoDB en formato COCO,
    ya sea para un parche específico o para una imagen completa. Las anotaciones se
    guardan en un archivo JSON en la ubicación especificada o en una carpeta predeterminada.

    Args:
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        patch_name (Optional[str], optional): Nombre del parche cuyas anotaciones se desean descargar.
                                              Si se proporciona, se descargan las anotaciones del parche.
                                              Defaults to None.
        image_name (Optional[str], optional): Nombre de la imagen cuyas anotaciones se desean descargar.
                                              Si se proporciona, se descargan las anotaciones de la imagen.
                                              Defaults to None.
        output_filename (Optional[Path], optional): Ruta del archivo donde se guardarán las anotaciones.
                                                    Si no se proporciona, se utiliza una ruta predeterminada.
                                                    Defaults to None.

    Raises:
        ValueError: Si no se proporciona ni `patch_name` ni `image_name`.
        ValueError: Si no se encuentra la imagen o el parche en la base de datos.

    Returns:
        Optional[Path]: Ruta del archivo JSON generado con las anotaciones en formato COCO.
                        Devuelve None si no se encuentran anotaciones.
    """
    if bool(patch_name is None) == bool(image_name is None):  # xor
        raise ValueError("Se debe proporcionar un patch_name o un image_name.")

    categories = CONFIG["coco_dataset"]["categories"]
    category_map = {cat["name"]: cat["id"] for cat in categories}

    db_images = DB.get_collection("imagenes")
    db_image = (
        db_images.find_one({"patches.patch_name": patch_name}) if patch_name else db_images.find_one({"id": image_name})
    )
    if not db_image:
        msg = (
            f"No se encontró la imagen {image_name} en la base de datos."
            if image_name
            else f"No se encontró la imagen con el parche {patch_name} en la base de datos."
        )
        raise ValueError(msg)

    annotations, images = (
        create_patch_fields(db_image, field_name, category_map, patch_name)
        if patch_name
        else create_images_fields(db_image, field_name, category_map, image_name)
    )
    if not annotations:
        logger.warning(f"No se encontraron anotaciones para el parche {patch_name}.")

    coco_annotations = {
        "info": CONFIG["coco_dataset"]["info"],
        "licenses": CONFIG["coco_dataset"]["licenses"],
        "categories": categories,
        "images": images,
        "annotations": annotations,
    }

    if not output_filename:
        Path(DOWNLOAD_COCO_ANNOTATIONS_FOLDER).mkdir(parents=True, exist_ok=True)
        output_filename = DOWNLOAD_COCO_ANNOTATIONS_FOLDER / (
            f"{field_name}_{patch_name}_annotations.json"
            if patch_name
            else f"{field_name}_{image_name}_annotations.json"
        )

    with open(output_filename, "w") as f:
        json.dump(coco_annotations, f, indent=4)
        logger.debug(f"Archivo JSON guardado en {output_filename}.")

    return output_filename

##### Cargar anotacion MongoDB

In [22]:
def load_coco_annotation_from_mongodb(
    field_name: str,
    patch_name: Optional[str] = None,
    image_name: Optional[str] = None,
    clean_files: bool = True,
) -> Optional[Dict[str, Any]]:
    """
    Carga anotaciones desde MongoDB en formato COCO.

    Este método permite descargar y cargar anotaciones almacenadas en MongoDB,
    ya sea para un parche específico o para una imagen completa. Las anotaciones
    se descargan en formato JSON y se cargan en un diccionario.

    Args:
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        patch_name (Optional[str], optional): Nombre del parche cuyas anotaciones se desean cargar.
                                              Si se proporciona, se cargan las anotaciones del parche.
                                              Defaults to None.
        image_name (Optional[str], optional): Nombre de la imagen cuyas anotaciones se desean cargar.
                                              Si se proporciona, se cargan las anotaciones de la imagen.
                                              Defaults to None.
        clean_files (bool, optional): Si se deben eliminar los archivos temporales después de la carga.
                                      Defaults to True.

    Raises:
        ValueError: Si no se proporciona ni `patch_name` ni `image_name`.
        Exception: Si ocurre un error al descargar las anotaciones.
        FileNotFoundError: Si no se encuentra el archivo de anotaciones descargado.

    Returns:
        Optional[Dict[str, Any]]: Diccionario con las anotaciones cargadas en formato COCO.
                                   Devuelve None si no se encuentran anotaciones.
    """
    if bool(patch_name is None) == bool(image_name is None):
        raise ValueError("Se debe proporcionar un patch_name o un image_name.")
    try:
        file_path = (
            download_annotation_as_coco_from_mongodb(field_name=field_name, patch_name=patch_name)
            if patch_name
            else download_annotation_as_coco_from_mongodb(field_name=field_name, image_name=image_name)
        )
    except Exception as e:
        raise Exception(f"Error al descargar las anotaciones: {e}")

    if file_path:
        annotations = load_annotations_from_file(file_path)
        logger.debug(f"Anotaciones cargadas desde {file_path}.")
        if clean_files and file_path and file_path.exists():
            try:
                file_path.unlink()
                logger.debug(f"Archivo {file_path} eliminado correctamente.")
            except OSError as e:
                logger.warning(f"Error al eliminar el archivo: {e}")
        return annotations
    else:
        raise FileNotFoundError(f"No se pudo encontrar el archivo de anotaciones en {file_path}.")

##### Descargar anotaciones MongoDB

In [23]:
def create_patches_coco_annotations(
    field_name: str, patches_names: List[str]
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """
    Crea anotaciones en formato COCO para una lista de parches.

    Este método toma las anotaciones de una lista de parches almacenadas en MongoDB,
    las procesa y las convierte al formato COCO, incluyendo las categorías y la
    información de las imágenes asociadas a los parches.

    Args:
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        patches_names (List[str]): Lista de nombres de los parches cuyas anotaciones se desean procesar.

    Returns:
        Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
            Una tupla que contiene:
            - Una lista de anotaciones en formato COCO.
            - Una lista con la información de las imágenes asociadas a los parches.
    """
    images = []
    annotations = []
    for id, patch_name in enumerate(patches_names):
        image_id = id + 1
        patch_annotations = load_coco_annotation_from_mongodb(field_name=field_name, patch_name=patch_name)

        if not patch_annotations:
            logger.warning(f"No se encontraron anotaciones para el parche {patch_name}.")

        patch_annotations["images"][0]["id"] = image_id
        patch_annotations["annotations"] = [{**ann, "image_id": image_id} for ann in patch_annotations["annotations"]]

        # Agregar las imágenes y anotaciones al diccionario principal
        images.extend(patch_annotations["images"])
        annotations.extend(patch_annotations["annotations"])

        logger.debug(f"Anotaciones del parche {patch_name} agregadas correctamente.")

    return annotations, images

In [24]:
def create_image_coco_annotations(
    field_name: str, images_names: List[str]
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """
    Crea anotaciones en formato COCO para una lista de imágenes.

    Este método toma las anotaciones de una lista de imágenes almacenadas en MongoDB,
    las procesa y las convierte al formato COCO, incluyendo las categorías y la
    información de las imágenes asociadas.

    Args:
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        images_names (List[str]): Lista de nombres de las imágenes cuyas anotaciones se desean procesar.

    Returns:
        Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
            Una tupla que contiene:
            - Una lista de anotaciones en formato COCO.
            - Una lista con la información de las imágenes asociadas.
    """
    images = []
    annotations = []
    for id, image_name in enumerate(images_names):
        image_id = id + 1
        image_annotations = load_coco_annotation_from_mongodb(field_name=field_name, image_name=image_name)

        if not image_annotations["annotations"]:
            logger.warning(f"No se encontraron anotaciones para la imagen {image_name}.")

        image_annotations["images"][0]["id"] = image_id
        image_annotations["annotations"] = [{**ann, "image_id": image_id} for ann in image_annotations["annotations"]]

        # Agregar las imágenes y anotaciones al diccionario principal
        images.extend(image_annotations["images"])
        annotations.extend(image_annotations["annotations"])

        logger.debug(f"Anotaciones de la imagen {image_name} agregadas correctamente.")

    return annotations, images

In [25]:
def download_annotations_as_coco_from_mongodb(
    field_name: str,
    patches_names: list[str] = None,
    images_names: list[str] = None,
    output_filename: Optional[Path] = None,
) -> Optional[Path]:
    """
    Descarga anotaciones en formato COCO desde MongoDB.

    Este método permite descargar anotaciones almacenadas en MongoDB en formato COCO,
    ya sea para una lista de parches o para una lista de imágenes. Las anotaciones se
    guardan en un archivo JSON en la ubicación especificada o en una carpeta predeterminada.

    Args:
        field_name (str): Nombre del campo en la base de datos que contiene las anotaciones.
        patches_names (list[str], optional): Lista de nombres de los parches cuyas anotaciones se desean descargar.
                                             Si se proporciona, se descargan las anotaciones de los parches.
                                             Defaults to None.
        images_names (list[str], optional): Lista de nombres de las imágenes cuyas anotaciones se desean descargar.
                                            Si se proporciona, se descargan las anotaciones de las imágenes.
                                            Defaults to None.
        output_filename (Optional[Path], optional): Ruta del archivo donde se guardarán las anotaciones.
                                                    Si no se proporciona, se utiliza una ruta predeterminada.
                                                    Defaults to None.

    Raises:
        ValueError: Si no se proporciona ni `patches_names` ni `images_names`.

    Returns:
        Path: Ruta del archivo JSON generado con las anotaciones en formato COCO.
    """
    if bool(patches_names is None) == bool(images_names is None):  # xor
        raise ValueError("Se debe proporcionar una lista de nombres de parches o imágenes.")

    annotations, images = (
        create_patches_coco_annotations(field_name, patches_names)
        if patches_names
        else create_image_coco_annotations(field_name, images_names)
    )

    coco_annotations = {
        "info": CONFIG["coco_dataset"]["info"],
        "licenses": CONFIG["coco_dataset"]["licenses"],
        "categories": CONFIG["coco_dataset"]["categories"],
        "images": images,
        "annotations": annotations,
    }

    if not output_filename:
        Path(DOWNLOAD_COCO_ANNOTATIONS_FOLDER).mkdir(parents=True, exist_ok=True)
        output_filename = DOWNLOAD_COCO_ANNOTATIONS_FOLDER / (
            f"{field_name}_patches_annotations.json" if patches_names else f"{field_name}_images_annotations.json"
        )

    with open(output_filename, "w") as f:
        json.dump(coco_annotations, f, indent=4)
        logger.debug(f"Archivo JSON guardado en {output_filename}.")

    return output_filename

##### Cargar anotaciones MongoDB

In [26]:
def load_coco_annotations_from_mongodb(
    field_name: str,
    patches_names: Optional[List[str]] = None,
    images_names: Optional[List[str]] = None,
    clean_files: bool = True,
) -> Optional[Dict[str, Any]]:
    if bool(patches_names is None) == bool(images_names is None):  # xor
        raise ValueError("Se debe proporcionar una lista de nombres de parches o imágenes.")

    try:
        file_path = (
            download_annotations_as_coco_from_mongodb(field_name=field_name, patches_names=patches_names)
            if patches_names
            else download_annotations_as_coco_from_mongodb(field_name=field_name, images_names=images_names)
        )
    except Exception as e:
        raise Exception(f"Error al descargar las anotaciones: {e}")

    if file_path:
        annotations = load_annotations_from_file(file_path)
        logger.debug(f"Anotaciones cargadas desde {file_path}.")
        if clean_files and file_path and file_path.exists():
            try:
                file_path.unlink()
                logger.debug(f"Archivo {file_path} eliminado correctamente.")
            except OSError as e:
                logger.warning(f"Error al eliminar el archivo: {e}")
        return annotations
    else:
        raise FileNotFoundError(f"No se pudo encontrar el archivo de anotaciones en {file_path}.")

##### Cargar JGW MongoDB

In [27]:
def load_jgw_file_from_mongodb(image_name: Optional[str] = None, patch_name: Optional[str] = None) -> Optional[Dict[str, Any]]:
    """
    Carga un archivo JGW desde MongoDB.

    Este método busca y carga un archivo JGW asociado a una imagen o parche específico
    almacenado en la base de datos MongoDB. Si se proporciona el nombre de un parche,
    se busca el archivo jgw de la imagen y se hace la conversión al parche. Si se proporciona el nombre
    de una imagen, se busca el archivo JGW asociado a esa imagen.

    Args:
        image_name (Optional[str], optional): Nombre de la imagen cuya información JGW se desea cargar.
                                              Defaults to None.
        patch_name (Optional[str], optional): Nombre del parche cuya información JGW se desea cargar.
                                              Defaults to None.

    Raises:
        ValueError: Si no se proporciona ni `image_name` ni `patch_name`.

    Returns:
        Optional[Dict[str, Any]]: Diccionario con la información del archivo JGW.
                                  Devuelve None si no se encuentra el archivo.
    """
    if bool(image_name is None) == bool(patch_name is None):  # xor
        raise ValueError("Se debe proporcionar un image_name o un patch_name.")

    db_images = DB.get_collection("imagenes")
    db_image = (
        db_images.find_one({"patches.patch_name": patch_name}) if patch_name else db_images.find_one({"id": image_name})
    )
    if not db_image:
        msg = (
            f"No se encontró la imagen {image_name} en la base de datos."
            if image_name
            else f"No se encontró la imagen con el parche {patch_name} en la base de datos."
        )
        raise ValueError(msg)

    image_jgw_data = jgw_data = db_image.get("jgw_data", None)
    if not image_jgw_data:
        logger.warning(f"No se encontró el archivo JGW para la imagen {image_name}.")

    if image_name:
        jgw_data = image_jgw_data
    else:
        # Si se proporciona un nombre de parche, debemos convertir el jgw_data a las coordenadas del parche
        patch = next((p for p in db_image.get("patches", []) if p["patch_name"] == patch_name), None)
        if not patch:
            raise ValueError(f"No se encontró el parche {patch_name} en la imagen {image_name}.")
        
        # Convertir el jgw_data a las coordenadas del parche
        x_origin_patch = image_jgw_data["x_origin"] + (patch["x_start"] * image_jgw_data["x_pixel_size"])
        y_origin_patch = image_jgw_data["y_origin"] + (patch["y_start"] * image_jgw_data["y_pixel_size"])

        jgw_data = {
            "x_pixel_size": image_jgw_data["x_pixel_size"],
            "y_rotation": image_jgw_data["y_rotation"],
            "x_rotation": image_jgw_data["x_rotation"],
            "y_pixel_size": image_jgw_data["y_pixel_size"],
            "x_origin": x_origin_patch,
            "y_origin": y_origin_patch,
        }

    return jgw_data

#### Obtener imágenes del repositorio

In [28]:
def download_image_from_minio(image_name: str, output_filename: Optional[Path] = None) -> Optional[Path]:
    """Descarga una imagen desde MinIO y la guarda en la ruta especificada.

    Este método permite descargar una imagen desde el almacenamiento MinIO y guardarla
    en una ruta local. Si no se proporciona una ruta, la imagen se guarda en la carpeta
    de descargas configurada.

    Args:
        image_name (str): Identificador de la imagen a descargar.
        path (str, optional): Ruta local donde se guardará la imagen. Defaults to None.

    Ejemplo de uso:

        >>> download_image_from_minio("8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm", "imagen.jpg")
        >>> # Esto descargará la imagen con el ID especificado y la guardará como "imagen.jpg".
    """
    if not output_filename:
        Path(DOWNLOAD_IMAGES_FOLDER).mkdir(parents=True, exist_ok=True)
        output_filename = DOWNLOAD_IMAGES_FOLDER / f"{image_name}.jpg"

    image_key = f"{CONFIG["minio"]["paths"]["images"]}/{image_name}.jpg"

    try:
        MINIO_CLIENT.download_file(Bucket=MINIO_BUCKET, Key=image_key, Filename=output_filename)
        logger.debug(f"Imagen {image_name} descargada correctamente en {output_filename}.")
    except Exception as e:
        raise Exception(f"Error al descargar la imagen {image_name}: {e}")

    return output_filename

In [29]:
def download_images_from_minio(images_name: list[str]) -> bool:
    """Descarga una lista de imágenes desde MinIO.

    Este método permite descargar múltiples imágenes desde el almacenamiento MinIO y guardarlas
    en la carpeta de descargas configurada.

    Args:
        images_name (list[str]): Lista de identificadores de las imágenes a descargar.

    Ejemplo de uso:

        >>> images_name = [
        ...    "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm",
        ...    "AntelArena_20200804_dji_pc_5c"
        ... ]
        >>> download_images_from_minio(images_name)
        >>> # Esto descargará las imágenes con los IDs especificados y las guardará en la carpeta de descargas.
    """
    try:
        for image_name in images_name:
            download_image_from_minio(image_name)
        logger.debug(f"Se descargaron {len(images_name)} imágenes de MinIO.")
    except Exception as e:
        logger.error(f"Error al descargar imágenes de MinIO: {e}")
        raise e
    return True

In [30]:
def download_patch_from_minio(patch_name: str, output_filename: Optional[Path] = None) -> Optional[Path]:
    """Descarga un parche desde MinIO y lo guarda en la ruta especificada.

    Este método permite descargar un parche desde el almacenamiento MinIO y guardarlo
    en una ruta local. Si no se proporciona una ruta, el parche se guarda en la carpeta
    de descargas configurada.

    Args:
        patch_name (str): Nombre del parche a descargar.
        path (str, optional): Ruta local donde se guardará el parche. Defaults to None.

    Ejemplo de uso:
        >>> download_patch_from_minio("8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0.jpg", "parche.jpg")
        >>> # Esto descargará el parche con el nombre especificado y lo guardará como "parche.jpg".
    """
    if not output_filename:
        Path(DOWNLOAD_PATCHES_FOLDER).mkdir(parents=True, exist_ok=True)
        output_filename = DOWNLOAD_PATCHES_FOLDER / f"{patch_name}.jpg"

    # Obtener el nombre de la imagen del parche desde mongodb
    imagenes = DB.get_collection("imagenes")
    image = imagenes.find_one({"patches.patch_name": patch_name})
    if not image:
        raise ValueError(f"No se encontró la imagen del parche {patch_name} en la base de datos.")

    patch = next((p for p in image.get("patches", []) if p["patch_name"] == patch_name), None)

    if patch["is_white"]:
        logger.warning(f"El parche {patch_name} es blanco, no se descargará.")
        return None

    image_name = image["id"]
    patch_key = f"{CONFIG['minio']['patches_path']}/{image_name}/{patch_name}.jpg"

    try:
        MINIO_CLIENT.download_file(Bucket=MINIO_BUCKET, Key=patch_key, Filename=output_filename)
        logger.debug(f"Parche {patch_name} descargado correctamente en {output_filename}.")
    except Exception as e:
        raise Exception(f"Error al descargar el parche {patch_name}: {e}")

    return output_filename

In [31]:
def download_patches_from_minio(patch_names: list[str]) -> None:
    """Descarga una lista de parches desde MinIO.

    Este método permite descargar múltiples parches desde el almacenamiento MinIO y guardarlos
    en la carpeta de descargas configurada.

    Args:
        patch_names (list[str]): Lista de nombres de los parches a descargar.

    Ejemplo de uso:

        >>> patch_names = [
        ...    "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0.jpg",
        ...    "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_2.jpg"
        ... ]
        >>> download_patches_from_minio(patch_names)
        >>> # Esto descargará los parches con los nombres especificados y los guardará en la carpeta de descargas.
    """
    try:
        for patch_name in patch_names:
            download_patch_from_minio(patch_name)
        logger.debug(f"Se descargaron {len(patch_names)} parches de MinIO.")
    except Exception as e:
        raise Exception(f"Error al descargar parches de MinIO: {e}")
    return True

#### Mostrar anotaciones

In [32]:
def load_coco_annotations(annotations, coco=None):
    """
    Args:
        annotations (List):
            a list of coco annotaions for the current image
        coco (`optional`, defaults to `False`):
            COCO annotation object instance. If set, this function will
            convert the loaded annotation category ids to category names
            set in COCO.categories
    """
    layout = lp.Layout()

    for ele in annotations:

        x, y, w, h = ele["bbox"]

        layout.append(
            lp.TextBlock(
                block=lp.Rectangle(x, y, w + x, h + y),
                type=(ele["category_id"] if coco is None else coco.cats[ele["category_id"]]["name"]),
                id=ele["id"],
            )
        )

    return layout

In [33]:
def show_anotated_image(
    image_path: Path,
    coco_annotations: Optional[Dict[str, Any]] = None,
    annotation_path: Optional[Path] = None,
    fig_size: Optional[Tuple[int, int]] = None,
    use_layoutparser: bool = False,
) -> None:
    """Muestra una imagen anotada con las anotaciones proporcionadas en formato COCO.

    Args:
        image_path (str): Ruta al archivo de imagen.
        coco_annotations (dict, optional): Diccionario con las anotaciones en formato COCO. Defaults to None.
        annotation_path (str, optional): Ruta al archivo JSON con las anotaciones en formato COCO. Defaults to None.
        fig_size (tuple, optional): Tamaño de la figura para la visualización. Defaults to None.
        use_layoutparser (bool, optional): Si se debe usar layoutparser para dibujar las anotaciones. Defaults to False.

    Raises:
        FileNotFoundError: Si el archivo de imagen no existe.
        ValueError: Si se proporcionan tanto `coco_annotations` como `annotation_path`.
        ValueError: Si no se encuentra el archivo de anotaciones especificado.
        ValueError: Si no se encuentra el ID de la imagen en las anotaciones.

    Example:
        >>> anottation_path = download_image_annotations_as_coco("8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm", "cvat")
        >>> print(f"Archivo generado en: {anottation_path}")
        >>> image_path = download_image_from_minio("8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm")
        >>> print(f"Imagen descargada en: {image_path}")
        >>> show_anotated_image(image_path, annotation_path=anottation_path, fig_size=(20, 20))
        >>> show_anotated_image(
        ...     image_path=image_path,
        ...     annotation_path=anottation_path,
        ...     fig_size=(10, 10),
        ...     use_layoutparser=False
        ... )
    """

    if not image_path.exists():
        raise FileNotFoundError(f"El archivo de imagen {image_path} no existe.")
    if bool(coco_annotations is None) == bool(annotation_path is None):  # xor
        raise ValueError("Se debe proporcionar coco_annotations o annotation_path.")
    if annotation_path.exists():
        coco_annotations = load_annotations_from_file(annotation_path)

    # Buscamos el id de la imagen en las anotaciones
    image_name = image_path.name
    image_id = get_image_id_from_annotations(image_name, coco_annotations)

    image = cv.imread(image_path)
    if use_layoutparser:
        coco = COCO(annotation_path)
        annotations = coco.loadAnns(coco.getAnnIds([image_id]))

        layout = load_coco_annotations(annotations, coco)

        if fig_size:
            plt.figure(figsize=fig_size)

        layoutparser_draw_box_config = CONFIG["layoutparser"]["draw_box"]
        viz = lp.draw_box(
            image,
            layout,
            box_width=layoutparser_draw_box_config["box_width"],
            box_alpha=layoutparser_draw_box_config["box_alpha"],
            color_map=layoutparser_draw_box_config["color_map"],
        )
        display(viz)
    else:
        if fig_size:
            plt.figure(figsize=fig_size)
        else:
            plt.figure()

        color_map = {k: tuple(v) for k, v in CONFIG["opencv"]["draw_box"]["color_map"].items()}
        category_map = {cat["id"]: cat["name"] for cat in coco_annotations["categories"]}

        # Dibujar las anotaciones en la imagen
        for annotation in coco_annotations["annotations"]:
            if annotation["image_id"] == image_id:
                x, y, w, h = annotation["bbox"]
                color = color_map.get(annotation["category_id"], (0, 255, 0))
                cv.rectangle(
                    image,
                    (int(x), int(y)),
                    (int(x + w), int(y + h)),
                    color,
                    CONFIG["opencv"]["draw_box"]["box_width"],
                )
                cv.putText(
                    image,
                    str(category_map[annotation["category_id"]]),
                    (int(x), int(y) - 10),
                    cv.FONT_HERSHEY_SIMPLEX,
                    CONFIG["opencv"]["draw_box"]["font_scale"],
                    color,
                    CONFIG["opencv"]["draw_box"]["font_thickness"],
                )

        plt.imshow(image)
        plt.axis("off")
        plt.title(f"Imagen: {os.path.basename(image_path)}")
        plt.show()

#### Recortes

In [34]:
def cutout_bbox_from_image(image: np.ndarray, bbox: list) -> np.ndarray:
    """Recorta una región de interés (ROI) de una imagen utilizando un bounding box.

    Args:
        image (numpy.ndarray): Imagen de entrada.
        bbox (list): Bounding box en formato [x_min, y_min, width, height].

    Returns:
        numpy.ndarray: Imagen recortada.
    """
    x_min, y_min, width, height = bbox
    x_max = int(x_min + width)
    y_max = int(y_min + height)

    # Recortar la imagen utilizando el bounding box
    return image[int(y_min) : int(y_max), int(x_min) : int(x_max)]

In [35]:
def cut_palms_from_image(
    image: np.ndarray,
    coco_annotations: Dict[str, Any],
    pic_name: str,
    with_metadata: bool = True,
    cutout_folder: Optional[Path] = None,
    metadata_folder: Optional[Path] = None,
) -> Optional[Path]:
    """Recorta las palmas de una imagen utilizando las anotaciones proporcionadas en formato COCO.
    También guarda los metadatos de las imágenes recortadas en un archivo JSON.

    Este método carga una imagen y sus anotaciones asociadas en formato COCO, recorta las regiones de interés
    (ROIs) correspondientes a las palmas y guarda los recortes en una carpeta especificada.

    Args:
        image (numpy.ndarray): Imagen de entrada.
        coco_annotations (dict): Diccionario con las anotaciones en formato COCO.
        pic_name (str): Nombre de la imagen o parche para identificar los recortes.
        with_metadata (bool, optional): Si se deben guardar los metadatos de las imágenes recortadas. Defaults to True.
        cutout_folder (str, optional): Carpeta donde se guardarán los recortes. Si no se proporciona, se crea una
                                       carpeta en la ubicación predeterminada. Defaults to None.
        metadata_folder (str, optional): Carpeta donde se guardarán los metadatos de las imágenes recortadas. Si no
                                        se proporciona, se crea una carpeta en la ubicación predeterminada.

    Raises:
        FileNotFoundError: Si el archivo de imagen no existe.
        ValueError: Si no se encuentran anotaciones para la imagen o si no se puede cargar la imagen.

    Returns:
        Optional[Path]: Ruta de la carpeta donde se guardaron los recortes.

    Ejemplo de uso:

        >>> image_name = "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm"
        >>> patch_name = "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0"
        >>> annotations_field = "cvat"
        >>> pic_name = patch_name
        >>> if pic_name == patch_name:
        >>>     image_path = download_patch_from_minio(patch_name)
        >>>     annotations_path = download_annotation_as_coco_from_mongodb(
        ...         field_name=annotations_field, patch_name=patch_name
        ...     )
        >>> else:
        >>>     image_path = download_image_from_minio(image_name)
        >>>     annotations_path = download_annotation_as_coco_from_mongodb(
        ...         field_name=annotations_field, image_name=image_name
        ...     )
        >>> image = cv.imread(image_path)
        >>> coco_annotations = load_annotations_from_file(annotations_path)
        >>> cutout_folder = cut_palms_from_image(
        ...     image=image,
        ...     coco_annotations=coco_annotations,
        ...     pic_name=pic_name,
        ... )
    """
    # Buscar el id de la imagen en las anotaciones dependiendo de si se proporciona el nombre del parche o de la imagen
    image_id = get_image_id_from_annotations(pic_name, coco_annotations)

    # Filtrar las anotaciones para la imagen actual
    annotations: list = [ann for ann in coco_annotations["annotations"] if ann["image_id"] == image_id]
    if not annotations:
        logger.warning(f"No se encontraron anotaciones para la imagen {pic_name}.")
        return None

    # Crear una carpeta para guardar los recortes
    if not cutout_folder:
        cutout_folder = DOWNLOAD_CUTOUTS_FOLDER / pic_name
        Path(cutout_folder).mkdir(parents=True, exist_ok=True)
        logger.debug(f"Carpeta de recortes creada: {cutout_folder}")
    if not metadata_folder:
        metadata_folder = DOWNLOAD_CUTOUTS_METADATA_FOLDER / pic_name
        Path(metadata_folder).mkdir(parents=True, exist_ok=True)
        logger.debug(f"Carpeta de metadatos creada: {metadata_folder}")

    # Guardar los metadatos de la imagen recortada
    cutout_coco_images_field: list = []
    cutout_coco_annotations_field: list = []

    # Recortar las regiones de interés (ROIs) y guardarlas
    for i, annotation in enumerate(annotations):
        bbox = annotation["bbox"]
        cutout_image = cutout_bbox_from_image(image, bbox)

        # Guardar la imagen recortada
        cutout_name = f"{pic_name}_cutout_{i + 1}"
        cutout_path = cutout_folder / f"{cutout_name}.jpg"
        cv.imwrite(cutout_path, cutout_image)
        logger.debug(f"Recorte guardado: {cutout_path}")

        # Guardar los metadatos de la imagen recortada
        output_coco_image = {
            "id": i + 1,
            "width": cutout_image.shape[1],
            "height": cutout_image.shape[0],
            "file_name": cutout_name,
            "date_captured": datetime.date.today().strftime("%Y-%m-%d"),
        }

        output_coco_annotation = {
            "id": 1,
            "image_id": output_coco_image["id"],
            "category_id": annotation["category_id"],
        }

        cutout_coco_images_field.append(output_coco_image)
        cutout_coco_annotations_field.append(output_coco_annotation)

    # Guardar los metadatos de la imagen recortada en un archivo JSON
    cutout_coco_annotations = {
        "info": coco_annotations["info"],
        "licenses": coco_annotations["licenses"],
        "categories": coco_annotations["categories"],
        "images": cutout_coco_images_field,
        "annotations": cutout_coco_annotations_field,
    }

    if with_metadata:
        cutout_metadata_path = metadata_folder / f"{pic_name}_metadata.json"
        with open(cutout_metadata_path, "w") as f:
            json.dump(cutout_coco_annotations, f, indent=4)
            logger.debug(f"Metadatos guardados en {cutout_metadata_path}.")

    return cutout_folder

In [36]:
def clear_minio_cutouts_folder(folder: str, is_metadata: bool = False) -> None:
    """
    Elimina el contenido de una carpeta específica en MinIO.

    Este método permite eliminar todos los objetos dentro de una carpeta en MinIO,
    ya sea para recortes o metadatos de recortes, según el prefijo proporcionado.

    Args:
        folder (str): Nombre de la carpeta cuyo contenido se desea eliminar.
        is_metadata (bool, optional): Indica si se trata de una carpeta de metadatos.
                                      Defaults to False.
    """
    prefix_key_path = (
        f"{CONFIG['minio']['cutouts_path']}/{folder}/"
        if not is_metadata
        else f"{CONFIG['minio']['cutouts_metadata_path']}/{folder}/"
    )
    objects_to_delete = MINIO_CLIENT.list_objects(Bucket=MINIO_BUCKET, Prefix=prefix_key_path)
    delete_keys = {"Objects": []}
    delete_keys["Objects"] = [{"Key": k} for k in [obj["Key"] for obj in objects_to_delete.get("Contents", [])]]
    if delete_keys["Objects"]:
        MINIO_CLIENT.delete_objects(Bucket=MINIO_BUCKET, Delete=delete_keys)
        logger.debug(f"Se eliminaron {len(delete_keys['Objects'])} objetos de MinIO con el prefijo {prefix_key_path}.")
    else:
        logger.debug(f"No se encontraron objetos para eliminar con el prefijo {prefix_key_path}.")

In [37]:
def upload_cutouts_to_mino(
    cutout_folder: Path = DOWNLOAD_CUTOUTS_FOLDER,
    cutout_metadata_folder: Optional[Path] = DOWNLOAD_CUTOUTS_METADATA_FOLDER,
) -> None:
    """
    Sube los recortes y sus metadatos a MinIO.

    Este método permite subir las imágenes recortadas y sus metadatos almacenados localmente
    a MinIO. Antes de subir los archivos, elimina el contenido existente en las carpetas
    correspondientes en MinIO.

    Args:
        cutout_folder (Path, optional): Carpeta local que contiene los recortes. 
                                         Defaults to DOWNLOAD_CUTOUTS_FOLDER.
        cutout_metadata_folder (Optional[Path], optional): Carpeta local que contiene los metadatos
                                                           de los recortes. Defaults to DOWNLOAD_CUTOUTS_METADATA_FOLDER.

    Raises:
        FileNotFoundError: Si la carpeta de recortes no existe.
        FileNotFoundError: Si la carpeta de metadatos no existe (si se proporciona).
    """
    if not cutout_folder.exists():
        raise FileNotFoundError(f"La carpeta de recortes {cutout_folder} no existe.")
    if cutout_metadata_folder and not cutout_metadata_folder.exists():
        raise FileNotFoundError(f"La carpeta de metadatos {cutout_metadata_folder} no existe.")

    for folder in os.listdir(cutout_folder):
        folder_path = cutout_folder / folder

        # Eliminamos el contenido del folder en MinIO
        clear_minio_cutouts_folder(folder, is_metadata=False)

        # Subir las imágenes a MinIO
        for _, _, files in os.walk(folder_path):
            for filename in files:
                MINIO_CLIENT.upload_file(
                    Filename=os.path.join(folder_path, filename),
                    Bucket=MINIO_BUCKET,
                    Key=f"{CONFIG['minio']['cutouts_path']}/{folder}/{filename}",
                )
                logger.debug(f"Imagen {filename} subida a MinIO.")

    if cutout_metadata_folder:
        for folder in os.listdir(cutout_metadata_folder):
            folder_path = cutout_metadata_folder / folder

            # Eliminamos el contenido del folder en MinIO
            clear_minio_cutouts_folder(folder, is_metadata=True)

            # Subimos los metadatos a MinIO
            for _, _, files in os.walk(folder_path):
                for filename in files:
                    # Subir el archivo JSON de metadatos a MinIO
                    MINIO_CLIENT.upload_file(
                        Filename=os.path.join(folder_path, filename),
                        Bucket=MINIO_BUCKET,
                        Key=f"{CONFIG['minio']['cutouts_metadata_path']}/{folder}/{filename}",
                    )
                    logger.debug(f"Archivo JSON de metadatos {filename} subido a MinIO.")

### Interacción con google maps

#### Generación de mapa

Procesamiento de archivos KML:
- `fastkml` $\rightarrow$ [https://fastkml.readthedocs.io/en/latest/quickstart.html](https://fastkml.readthedocs.io/en/latest/quickstart.html)

Conversión de archivos KML a GeoJSON:
- `kml2geojson` $\rightarrow$ [https://pypi.org/project/kml2geojson/](https://pypi.org/project/kml2geojson/) | [https://github.com/mrcagney/kml2geojson](https://github.com/mrcagney/kml2geojson)

Visualización:
- `folium` $\rightarrow$ [https://python-visualization.github.io/folium/latest/](https://python-visualization.github.io/folium/latest/)
- `geojson.io` $\rightarrow$ [https://geojson.io/#map=2/0/20](https://geojson.io/#map=2/0/20)

Trabajo con archivos GeoJSON:
- `geojson` $\rightarrow$ [https://github.com/jazzband/geojson](https://github.com/jazzband/geojson) | [https://geojson.org/](https://geojson.org/)
- `geopandas` $\rightarrow$ [https://geopandas.org/](https://geopandas.org/)
- `shapely` $\rightarrow$ [https://shapely.readthedocs.io/en/latest/manual.html](https://shapely.readthedocs.io/en/latest/manual.html)

In [38]:
def download_kmz_from_gmaps(
    base_url: str = CONFIG["google_maps"]["base_url"],
    mid: str = CONFIG["google_maps"]["mid"],
    output_filename: Optional[Path] = None,
) -> Optional[Path]:
    """Descarga un archivo KMZ desde Google Maps y lo descomprime.

    Este método utiliza la configuración proporcionada para construir la URL de descarga
    del archivo KMZ desde Google Maps. Una vez descargado, el archivo se descomprime
    y se guarda en la carpeta especificada.

    Args:
        filename (str, optional): Ruta donde se guardará el archivo KMZ descargado.
                                  Si no se proporciona, se utiliza una ruta temporal.

    Returns:
        str: Ruta del archivo KML descargado y descomprimido.

    Raises:
        Exception: Si ocurre un error al acceder a la URL de descarga.
    """
    url = f"{base_url}?mid={mid}"
    if not output_filename:
        Path(DOWNLOAD_TEMP_FOLDER).mkdir(parents=True, exist_ok=True)
        output_filename = DOWNLOAD_TEMP_FOLDER / "google_maps.kmz"
    try:
        response = requests.get(url)
        response.raise_for_status()
        with open(output_filename, "wb") as f:
            f.write(response.content)
        logger.debug(f"Archivo KMZ descargado y guardado en {output_filename}.")

        extract_path = (
            DOWNLOAD_GOOGLE_MAPS_FOLDER / f"google_maps_{mid}_{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}.kmz"
        )

        # Descomprimir el archivo KMZ
        with zipfile.ZipFile(output_filename, "r") as zip_ref:
            zip_ref.extractall(extract_path)
        logger.debug(f"Archivo KMZ descomprimido en {extract_path}.")

        # Clear the temporary file
        os.remove(output_filename)

        return f"{extract_path}/doc.kml"

    except requests.exceptions.HTTPError as err:
        raise Exception(f"Error al acceder a {url}. Razón: {err}")

In [39]:
def convert_kml_to_geojson(kml_file_path: Path, geojson_file_path: Optional[Path]) -> Optional[Path]:
    """
    Convierte un archivo KML a formato GeoJSON.

    Args:
        kml_file_path (str): Ruta al archivo KML de entrada.
        geojson_file_path (str): Ruta donde se guardará el archivo GeoJSON convertido.
    """
    try:
        geojson = kml2geojson.main.convert(kml_file_path)[0]
        logger.info(f"Conversión exitosa de '{kml_file_path}' a '{geojson_file_path}'.")

        if not geojson_file_path:
            Path(DOWNLOAD_GOOGLE_MAPS_FOLDER).mkdir(parents=True, exist_ok=True)
            geojson_file_path = DOWNLOAD_GOOGLE_MAPS_FOLDER / f"{kml_file_path.stem}.geojson"
        # Guardar el archivo GeoJSON
        with open(geojson_file_path, "w") as f:
            json.dump(geojson, f, indent=4)
            return geojson_file_path
        logger.info(f"Archivo GeoJSON guardado en '{geojson_file_path}'.")

    except FileNotFoundError:
        logger.error(f"Error: Archivo KML no encontrado en '{kml_file_path}'.")
    except ValueError as e:
        logger.error(f"Error de valor: {e}")
    except Exception as e:
        logger.error(f"Ocurrió un error inesperado: {e}")
    return None

In [40]:
def create_geojson_from_annotation(
    pic_name: str,
    coco_annotation: Dict[str, Any],
    jgw_data: Dict[str, Any],
    output_filename: Optional[Path] = None,
    upload_to_drive: bool = False,
    geo_sistema_referencia: str = CONFIG["georeferenciacion"]["codigo_epsg"],
) -> gpd.GeoDataFrame:
    """
    Crea un GeoDataFrame a partir de las anotaciones COCO y datos de georreferenciación.

    Args:
        pic_name (str): Nombre de la imagen para la cual se generará el GeoJSON.
        coco_annotation (Dict[str, Any]): Anotaciones en formato COCO que incluyen categorías y bounding boxes.
        jgw_data (Dict[str, Any]): Datos de georreferenciación provenientes de un archivo JGW.
        output_filename (Optional[Path], optional): Ruta donde se guardará el archivo GeoJSON. Defaults to None.
        upload_to_drive (bool, optional): Indica si el archivo GeoJSON debe subirse a Google Drive. Defaults to False.
        geo_sistema_referencia (str, optional): Código EPSG del sistema de referencia geográfico. Defaults to CONFIG["georeferenciacion"]["codigo_epsg"].

    Raises:
        NotImplementedError: Si se solicita subir el archivo a Google Drive, pero la funcionalidad no está implementada.

    Returns:
        gpd.GeoDataFrame: GeoDataFrame que contiene las geometrías y propiedades de las anotaciones.

    Ejemplo de uso:

        >>> image_name = "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm"
        >>> patch_name = "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0"
        >>> annotations_field = "cvat"
        >>> pic_name = image_name
        >>> if pic_name == image_name:
        >>>     coco_annotations = load_coco_annotation_from_mongodb(
        ...         field_name=annotations_field, image_name=image_name
        ...     )
        >>>     jgw_data = load_jgw_file_from_mongodb(image_name=image_name)
        >>> else:
        >>>     coco_annotations = load_coco_annotation_from_mongodb(
        ...         field_name=annotations_field, patch_name=patch_name
        ...     )
        >>>     jgw_data = load_jgw_file_from_mongodb(patch_name=patch_name)
        >>> gdf = create_geojson_from_annotation(
        ...     pic_name=pic_name,
        ...     coco_annotation=coco_annotations,
        ...     jgw_data=jgw_data,
        ...     output_filename=DOWNLOAD_TEMP_FOLDER / f"{pic_name}.geojson",
        ... )

        >>> # Cambiar proyectar en otro sistema de coordenadas
        >>> # gdf_crs = gdf.to_crs("EPSG:4326")
        >>> # gdf_crs.to_file(
        ... #     DOWNLOAD_TEMP_FOLDER / f"{pic_name}_4326.geojson",
        ... #     driver="GeoJSON",
        ... # )
    """
    # 1 - Configuraciones generales
    category_map = {cat["id"]: cat["name"] for cat in coco_annotation["categories"]}

    # 2 - Obtener el id de la imagen en las anotaciones
    image_id = get_image_id_from_annotations(pic_name, coco_annotation)

    # 3 - Obtener las anotaciones de la imagen
    annotations = [ann for ann in coco_annotation["annotations"] if ann["image_id"] == image_id]
    if not annotations:
        logger.warning(f"No se encontraron anotaciones para la imagen {pic_name}.")
        return gpd.GeoDataFrame()

    # 4 - Preparar listas para almacenar los datos
    geometries = []
    properties = []

    # 5 - Para cada anotación, obtener el bbox y la categoría
    for annotation in annotations:
        bbox = annotation["bbox"]
        category_name = category_map.get(annotation["category_id"], "Sin categoría")

        # 5.1 - Convertir el bbox a coordenadas geográficas utilizando los datos del archivo JGW
        global_coordinates = convert_bbox_image_to_world(bbox, jgw_data)

        # 5.2 - Obtener el centroide del bbox
        x_coords = [
            global_coordinates["tl"][0],
            global_coordinates["tr"][0],
            global_coordinates["br"][0],
            global_coordinates["bl"][0],
        ]
        y_coords = [
            global_coordinates["tl"][1],
            global_coordinates["tr"][1],
            global_coordinates["br"][1],
            global_coordinates["bl"][1],
        ]
        centroid_x = sum(x_coords) / len(x_coords)
        centroid_y = sum(y_coords) / len(y_coords)
        centroid = (centroid_x, centroid_y)

        # 5.3 - Crear un objeto Point de Shapely con las coordenadas del centroide
        point = Point(centroid)

        # 5.4 - Guardar la geometría y propiedades
        geometries.append(point)
        properties.append(
            {
                "category": category_name,
                "annotation_id": annotation.get("id", None),
                "bbox_x": bbox[0],
                "bbox_y": bbox[1],
                "bbox_width": bbox[2],
                "bbox_height": bbox[3],
                "global_tl_x": global_coordinates["tl"][0],
                "global_tl_y": global_coordinates["tl"][1],
                "global_br_x": global_coordinates["br"][0],
                "global_br_y": global_coordinates["br"][1],
            }
        )

    # 6 - Crear un DataFrame con las propiedades
    properties_df = pd.DataFrame(properties)

    # 7 - Crear un GeoDataFrame con las geometrías y propiedades
    gdf = gpd.GeoDataFrame(properties_df, geometry=geometries)

    # 8 - Configurar el sistema de coordenadas (CRS)
    gdf.crs = geo_sistema_referencia

    # 9 - Guardar como GeoJSON si se proporciona un nombre de archivo
    if output_filename:
        output_path = output_filename
        Path(output_path.parent).mkdir(parents=True, exist_ok=True)
        gdf.to_file(output_path, driver="GeoJSON")
        logger.info(f"GeoJSON guardado en {output_path}")

        # 10 - Opcional: Subir a Google Drive si se solicita
        if upload_to_drive:
            # Aquí iría tu código para subir a Drive
            raise NotImplementedError("Subida a Google Drive no implementada.")
    return gdf

In [41]:
# Se está generando el archivo con unos tags incorrectos.
def create_kml_from_geojson(
    gdf: gpd.GeoDataFrame,
    kml_filename: Optional[Path] = None,
    category_column: str = "category",
    target_category: Optional[str] = "palmera",
    reproject: bool = True,
):
    """
    Crea un archivo KML a partir de un GeoDataFrame.

    Este método toma un GeoDataFrame con geometrías y propiedades, filtra las categorías
    deseadas, y genera un archivo KML con las geometrías y propiedades correspondientes.

    Args:
        gdf (gpd.GeoDataFrame): GeoDataFrame que contiene las geometrías y propiedades.
        kml_filename (Optional[Path], optional): Ruta donde se guardará el archivo KML.
                                                 Si no se proporciona, se utiliza una ruta predeterminada.
                                                 Defaults to None.
        category_column (str, optional): Nombre de la columna que contiene las categorías.
                                         Defaults to "category".
        target_category (Optional[str], optional): Categoría objetivo que se desea incluir en el KML.
                                                   Si no se proporciona, se incluyen todas las categorías.
                                                   Defaults to "palmera".
        reproject (bool, optional): Si se debe reproyectar el GeoDataFrame a EPSG:4326 (WGS84).
                                    Defaults to True.

    Raises:
        ValueError: Si ocurre un error al reproyectar el GeoDataFrame.

    Returns:
        None: El archivo KML se guarda en la ubicación especificada o predeterminada.

    Ejemplo de uso:

        >>> image_name = "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm"
        >>> patch_name = "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0"
        >>> annotations_field = "cvat"
        >>> pic_name = image_name
        >>> if pic_name == image_name:
        >>>     coco_annotations = load_coco_annotation_from_mongodb(
        ...         field_name=annotations_field, image_name=image_name
        ...     )
        >>>     jgw_data = load_jgw_file_from_mongodb(image_name=image_name)
        >>> else:
        >>>     coco_annotations = load_coco_annotation_from_mongodb(
        ...         field_name=annotations_field, patch_name=patch_name
        ...     )
        >>>     jgw_data = load_jgw_file_from_mongodb(patch_name=patch_name)
        >>> gdf = create_geojson_from_annotation(
        ...     pic_name=pic_name,
        ...     coco_annotation=coco_annotations,
        ...     jgw_data=jgw_data,
        ...     output_filename=DOWNLOAD_TEMP_FOLDER / f"{pic_name}.geojson",
        ... )
        >>> create_kml_from_geojson(
        ...     gdf=gdf,
        ...     kml_filename=KMLS_FOLDER / f"{pic_name}.kml",
        ...     category_column="category",
        ...     target_category=None
        ... )
    """
    k = kml.KML()
    ns = "{http://www.opengis.net/kml/2.2}"

    # Crear un documento KML
    palm_document = kml.Document(ns, id="docid", description="PalmTrees")
    k.append(palm_document)

    # Crear una carpeta para las palmeras
    palm_folder = kml.Folder(ns, id="palmeras_folder", name="Palmeras")
    palm_document.append(palm_folder)

    if target_category:
        palm_gdf = gdf[gdf[category_column] == target_category].copy()
    else:
        palm_gdf = gdf.copy()

    if palm_gdf.empty:
        logger.warning(
            f"No se encontraron elementos con la categoría '{target_category}'. No se creará el archivo KML."
        )
        return

    # Reproyectar a WGS84 (EPSG:4326) si no está ya en ese CRS
    if reproject and palm_gdf.crs is not None and palm_gdf.crs != "EPSG:4326":
        try:
            palm_gdf = palm_gdf.to_crs(epsg=4326)
            logger.debug("GeoDataFrame reproyectado a EPSG:4326 para el KML.")
        except Exception as e:
            raise ValueError(
                f"Error al reproyectar el GeoDataFrame a EPSG:4326: {e}. Asegúrate de que el CRS original sea válido."
            )

    for index, row in palm_gdf.iterrows():
        if row.geometry.geom_type == "Point":
            coords = (row.geometry.x, row.geometry.y)
            point = Point(coords)
            p = kml.Placemark(ns, id=f"palmera_{index}", name=f"{row.category}", geometry=point)
            palm_folder.append(p)
        else:
            logger.warning(f"La geometría del elemento con índice {index} no es un Point. No se agregará al KML.")

    if kml_filename:
        Path(kml_filename).parent.mkdir(parents=True, exist_ok=True)
    else:
        Path(KMLS_FOLDER).mkdir(parents=True, exist_ok=True)
        kml_filename = KMLS_FOLDER / f"kml_{target_category}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.kml"
    try:
        k.write(kml_filename)
        logger.info(f"Archivo KML guardado en {kml_filename}")
    except Exception as e:
        logger.error(f"Error al guardar el archivo KML: {e}")

In [42]:
def load_gdf_from_file(
    file_path: Path,
    crs: Optional[str] = None,
) -> gpd.GeoDataFrame:
    """Carga un archivo GeoJSON y lo convierte a un GeoDataFrame.

    Args:
        file_path (str): Ruta al archivo GeoJSON.
        driver (str, optional): Controla el formato del archivo. Defaults to "GeoJSON".
        crs (str, optional): Sistema de referencia de coordenadas. Defaults to None.

    Returns:
        gpd.GeoDataFrame: GeoDataFrame que contiene los datos del archivo.
    """
    gdf = gpd.read_file(file_path)
    if crs:
        gdf.crs = crs
    return gdf

#### Procesamiento de mapas

In [43]:
def process_single_patch(
    imagen: Dict[str, Any],
    patch: Dict[str, Any],
    gdf: gpd.GeoDataFrame,
    bbox_size: Tuple[float, float] = (CONFIG["bbox_size"]["width"], CONFIG["bbox_size"]["height"]),
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
    image = {
        "width": patch["width"],
        "height": patch["height"],
        "file_name": f"{patch["patch_name"]}.jpg",
        "date_captured": imagen["date_captured"].strftime("%Y-%m-%d %H:%M:%S"),
    }
    annotations = []
    jgw_data = imagen.get("jgw_data")
    if not jgw_data:
        logger.warning(f"No se encontró el archivo JGW para la imagen {imagen['name']}.")
        return image, annotations

    # Obtener las coordenadas del parche
    x_start, y_start, patch_width, patch_height = (
        patch["x_start"],
        patch["y_start"],
        patch["width"],
        patch["height"],
    )
    # Esquinas del parche dentro de la imagen
    esquinas_imagen = [
        (x_start, y_start),  # esquina superior izquierda
        (x_start + patch_width, y_start),  # esquina superior derecha
        (x_start + patch_width, y_start + patch_height),  # esquina inferior derecha
        (x_start, y_start + patch_height),  # esquina inferior izquierda
    ]

    # Convertir las coordenadas del parche a coordenadas globales
    esquinas_mundo = [convert_point_image_to_world(punto, jgw_data=jgw_data) for punto in esquinas_imagen]

    poligono_parche = Polygon(esquinas_mundo)

    # Filtrar los puntos del GeoDataFrame que están dentro del polígono del parche
    puntos_en_parche = gdf[gdf.geometry.within(poligono_parche)]

    if puntos_en_parche.empty:
        logger.debug(f"No se encontraron puntos dentro del parche {patch['patch_name']}.")
        return image, annotations

    # Procesar cada punto encontrado del parche
    puntos_en_parche.reset_index(inplace=True, drop=True)
    for index, row in puntos_en_parche.iterrows():
        punto_mundo = (row.geometry.x, row.geometry.y)

        # Convertir a coordenadas de imagen
        punto_imagen = convert_point_world_to_image(punto_mundo, jgw_data)

        # Convertir a coordenadas locales del parche
        punto_parche = convert_point_image_to_patch(punto_imagen, x_start, y_start, patch_width, patch_height)

        # Crear el bounding box
        bbox_ancho, bbox_alto = bbox_size
        x_centro, y_centro = punto_parche

        # Asegurarse que el bbox no exceda los límites del parche
        x_min = max(0, x_centro - bbox_ancho / 2)
        y_min = max(0, y_centro - bbox_alto / 2)
        x_max = min(patch_width, x_centro + bbox_ancho / 2)
        y_max = min(patch_height, y_centro + bbox_alto / 2)

        # Calcular dimensiones finales del bbox
        ancho = x_max - x_min
        alto = y_max - y_min
        area = ancho * alto

        category_name = "palmera-google-maps"
        annotation = {
            "id": index + 1,
            "segmentation": [],
            "iscrowd": 0,
            "attributes": {
                "occluded": False,
                "rotation": 0.0,
            },
            "category_name": category_name,
            "area": area,
            "bbox": [x_min, y_min, ancho, alto],
        }

        annotations.append(annotation)

    return image, annotations

In [44]:
def create_annotations_from_geojson(
    gdf: gpd.GeoDataFrame,
    output_filename: Optional[Path] = None,
    use_parallel: bool = True,
    max_workers: int = 10,
) -> Dict[str, Any]:
    coco_annotations = {
        "info": CONFIG["coco_dataset"]["info"],
        "licenses": CONFIG["coco_dataset"]["licenses"],
        "categories": CONFIG["google_maps"]["categories"],
        "images": [],
        "annotations": [],
    }

    category_map = {cat["name"]: cat["id"] for cat in coco_annotations["categories"]}

    coco_images = []
    image_annotations = []
    imagenes = DB.get_collection("imagenes")

    # Consulta con agregación para filtrar imágenes y sus patches
    pipeline = [
        # Filtrar imágenes donde downloaded = true
        {"$match": {"downloaded": True}},
        # Crear un nuevo campo 'patches_filtrados' que contenga solo los patches donde is_white = false
        {
            "$addFields": {
                "patches_filtrados": {
                    "$filter": {"input": "$patches", "as": "patch", "cond": {"$eq": ["$$patch.is_white", False]}}
                }
            }
        },
        # Filtrar para incluir solo imágenes que tienen al menos un patch válido
        {"$match": {"patches_filtrados.0": {"$exists": True}}},
        # Opcionalmente: proyectar solo campos necesarios con $project (mejorar optimización, pero queda hardcodeado)
    ]

    filtered_images = list(imagenes.aggregate(pipeline))
    logger.debug(f"Se encontraron {len(filtered_images)} imágenes con parches no blancos.")

    # Creamos tareas asíncronas para cada imagen y parche
    tareas = [(imagen, patch) for imagen in filtered_images for patch in imagen["patches_filtrados"]]
    logger.debug(f"Se encontraron {len(tareas)} tareas para procesar.")

    if tareas:
        annotation_images = []
        if use_parallel:
            with ProcessPoolExecutor(max_workers=max_workers) as executor:
                futures = [(executor.submit(process_single_patch, imagen, patch, gdf)) for imagen, patch in tareas]
                for future in tqdm(futures, desc="Procesando parches"):
                    try:
                        image, annotations = future.result()
                        if image and annotations:
                            annotation_images.append((image, annotations))
                    except Exception as e:
                        logger.error(f"Error procesando el parche: {e}")
        else:
            for imagen, patch in tqdm(tareas, desc="Procesando parches"):
                try:
                    image, annotations = process_single_patch(imagen, patch, gdf)
                    if image and annotations:
                        annotation_images.append((image, annotations))
                except Exception as e:
                    logger.error(f"Error procesando el parche: {e}")

        for id, image, annotations in enumerate(annotation_images):
            image_id = id + 1
            image = {"id": image_id, **image}

            annotations = [
                {
                    **annotation,
                    "image_id": image_id,
                    "category_id": category_map[annotation["category_name"]],
                }
                for annotation in annotations
            ]

            coco_images.append(image)
            image_annotations.append(annotations)

        coco_annotations["images"] = coco_images
        coco_annotations["annotations"] = image_annotations

        if output_filename:
            with open(output_filename, "w") as f:
                json.dump(coco_annotations, f, indent=4)
                logger.debug(f"Anotaciones guardadas en {output_filename}")

        pprint(coco_annotations)
    else:
        logger.warning("No se encontraron tareas para procesar.")

In [45]:
# gdf = load_gdf_from_file(DOWNLOAD_TEMP_FOLDER / "google_maps.geojson", crs=CONFIG["georeferenciacion"]["codigo_epsg"])
# output_filename = None
# use_parallel = False
# max_workers = 10

In [46]:
def merge_annotations():
    pass

### Ejemplo de carga de anotaciones

- Desde CVAT:
```python
coco_annotations = load_annotations_from_cvat(task_id=8)
save_coco_annotations(coco_annotations=coco_annotations, field_name="cvat")
```

- Desde un archivo:
```python
coco_annotations = load_annotations_from_file(file_path="cvat.json")
save_coco_annotations(coco_annotations=coco_annotations, field_name="cvat")
```

In [47]:
# patches_list = [
#     "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_0",
#     "8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm_patch_2",
# ]
# images_list = ["8deOctubreyCentenario-EspLibreLarranaga_20190828_dji_pc_5cm", "AntelArena_20200804_dji_pc_5c"]
# coco_annotations = load_coco_annotations_from_mongodb(field_name="cvat", images_names=images_list, clean_files=False)
# save_coco_annotations(coco_annotations, "test")

### Fixes

In [48]:
def fix_image_width_height(image_id: str, width: int, height: int) -> bool:
    """Actualiza el ancho y alto de una imagen en la base de datos.

    Args:
        image_id (str): Identificador de la imagen a actualizar.
        width (int): Nuevo ancho de la imagen.
        height (int): Nuevo alto de la imagen.

    Returns:
        bool: True si la actualización fue exitosa, False en caso contrario.
    """
    try:
        imagenes = DB.get_collection("imagenes")
        result = imagenes.update_one(
            {"id": image_id},
            {
                "$set": {
                    "width": width,
                    "height": height,
                }
            },
        )
        if result.modified_count > 0:
            logger.debug(f"Ancho y alto de la imagen {image_id} actualizados correctamente.")
            return True
        else:
            logger.warning(f"No se encontraron cambios para la imagen {image_id}.")
            return False
    except Exception as e:
        logger.error(f"Error al actualizar el ancho y alto de la imagen {image_id}: {e}")
        return False

In [49]:
def fix_width_height_images():
    """Corrige el ancho y alto de las imágenes en la base de datos.

    Esta función descarga las imágenes desde MinIO, obtiene sus dimensiones (ancho y alto),
    actualiza estos valores en la base de datos y elimina las imágenes descargadas localmente.

    Pasos realizados:
    1. Configura el logger para registrar las actualizaciones.
    2. Obtiene todas las imágenes almacenadas en la base de datos.
    3. Descarga cada imagen desde MinIO si está marcada como descargada.
    4. Calcula las dimensiones de la imagen descargada.
    5. Actualiza las dimensiones en la base de datos.
    6. Elimina la imagen descargada localmente.

    Raises:
        OSError: Si ocurre un error al eliminar la imagen descargada localmente.
    """

    set_log_to_file(f"updates_{datetime.date.today()}.log")
    logger.setLevel(logging.INFO)
    logger.info("Iniciando actualización de imágenes y parches.")
    # Actualizar el width y height de las imágenes en la base de datos
    # 1 - Obtener todas las imágenes de la base de datos
    imagenes = DB.get_collection("imagenes")

    # 2 - Para cada imagen:
    for image in imagenes.find():
        image_id = image["id"]
        downloaded: bool = image["downloaded"]
        if not downloaded:
            logger.warning(f"La imagen {image_id} no está descargada, se omitirá.")
            continue
        logger.info(f"Actualizando imagen {image_id}...")
        # 2.1 - Descargar la imagen desde MinIO

        jpg_path = download_image_from_minio(image_id)
        logger.info(f"Imagen {image_id} descargada correctamente.")
        # 2.2 - Obtener el ancho y alto de la imagen descargada
        img = cv.imread(jpg_path)
        height, width = img.shape[:2]
        # 2.3 - Actualizar el ancho y alto de la imagen en la base de datos
        fix_image_width_height(image_id, width, height)
        logger.info(f"Ancho y alto de la imagen {image_id} actualizados correctamente.")
        # 2.4 - Eliminar la imagen descargada
        try:
            os.remove(jpg_path)
            logger.info(f"Imagen {jpg_path} eliminada correctamente.")
        except OSError as e:
            logger.warning(f"Error al eliminar la imagen {jpg_path}: {e}")
            continue
    logger.info("Actualización de imágenes y parches finalizada.")

In [50]:
def fix_dates_images():
    set_log_to_file(f"fix_dates_images_{datetime.date.today()}.log")
    logger.setLevel(logging.INFO)
    logger.info("Iniciando actualización de fechas de imágenes.")
    # Actualizar el width y height de las imágenes en la base de datos
    # 1 - Obtener todas las imágenes de la base de datos
    imagenes = DB.get_collection("imagenes")
    date_format = "%d/%m/%Y"

    # 2 - Para cada imagen:
    for image in imagenes.find():
        # 2.1 - Obtener el título de la imagen
        image_title = image["title"]

        # 2.2 - Extraer la fecha del título de la imagen
        match = re.search(r"\d{2}/\d{2}/\d{4}", image_title)
        date_str = match.group(0) if match else None
        if not date_str:
            logger.warning(f"No se encontró una fecha válida en el título de la imagen {image_title}.")
            continue

        # 2.3 - Actualizar la fecha de captura en la base de datos
        try:
            date_captured = datetime.datetime.strptime(date_str, date_format)
            imagenes.update_one(
                {"id": image["id"]},
                {
                    "$set": {
                        "date_captured": date_captured,
                    }
                },
            )
            logger.info(f"Fecha de captura de la imagen {image_title} actualizada correctamente.")
        except Exception as e:
            logger.warning(f"Error al actualizar la fecha de captura: {e}")

    logger.info("Actualización de fechas de imágenes finalizada.")

In [51]:
def fix_patch_name():
    """Corrige el nombre de los parches en la base de datos.
    Le saca el .jpg al final del nombre del parche y lo actualiza en la base de datos.
    """
    set_log_to_file(f"fix_patch_name_{datetime.date.today()}.log")
    logger.setLevel(logging.INFO)
    logger.info("Iniciando actualización de nombres de parches.")
    # 1 - Obtener todas las imágenes de la base de datos
    imagenes = DB.get_collection("imagenes")

    # 2 - Para cada imagen:
    for image in imagenes.find():
        image_id = image["id"]
        # 2.1 - Obtener los parches de la imagen
        try:
            patches = image["patches"]
        except KeyError:
            logger.warning(f"No se encontraron parches para la imagen {image_id}.")
            continue
        # 2.2 - Para cada parche:
        for patch in patches:
            patch_name = patch["patch_name"]
            if patch_name.endswith(".jpg"):
                new_patch_name = patch_name[:-4]
                # 2.3 - Actualizar el nombre del parche en la base de datos
                imagenes.update_one(
                    {"id": image_id, "patches.patch_name": patch_name},
                    {"$set": {"patches.$.patch_name": new_patch_name}},
                )
                logger.info(f"Nombre del parche {patch_name} actualizado a {new_patch_name}.")
            else:
                logger.warning(f"El parche {patch_name} no tiene .jpg al final.")
    logger.info("Actualización de nombres de parches finalizada.")