In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
import os

base_dir = '/content/drive/MyDrive/TFG/data'
print("Contenido de la carpeta data:")
print(os.listdir(base_dir))


Contenido de la carpeta data:
['all_images', 'test_vigilancia']


In [None]:
# pip install Ultralytics

In [2]:
import torch
import torchvision
from torchvision import transforms
import torchvision.transforms.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import xml.etree.ElementTree as ET
import cv2
import os
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from sklearn.model_selection import train_test_split
import glob
import time
import gc
import pandas as pd
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np
from google.colab import files
from ultralytics import YOLO
import shutil
from IPython.display import Image, display

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Usando dispositivo:", device)
print("Nombre de GPU:", torch.cuda.get_device_name(0))
print(f"VRAM disponible: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")


Usando dispositivo: cuda
Nombre de GPU: NVIDIA A100-SXM4-40GB
VRAM disponible: 42.47 GB


In [6]:
# Directorio raíz en Google Drive
DATA_DIR = '/content/drive/MyDrive/TFG/data'

# Usaremos solo esta carpeta con todas las imágenes y anotaciones juntas
ALL_IMAGES_DIR = os.path.join(DATA_DIR, 'all_images')
IMAGES_DIR = os.path.join(ALL_IMAGES_DIR, 'JPEGImages')
ANNOTATIONS_DIR = os.path.join(ALL_IMAGES_DIR, 'Annotations')

# Imágenes aparte para testear manualmente
TEST_VIG_DIR = os.path.join(DATA_DIR, 'test_vigilancia')
TEST_VIG_IMAGES_DIR = os.path.join(TEST_VIG_DIR, 'JPEGImages')
TEST_VIG_ANNOTATIONS_DIR = os.path.join(TEST_VIG_DIR, 'Annotations')

print("Directorio de imágenes:", IMAGES_DIR)
print("Directorio de anotaciones:", ANNOTATIONS_DIR)


Directorio de imágenes: /content/drive/MyDrive/TFG/data/all_images/JPEGImages
Directorio de anotaciones: /content/drive/MyDrive/TFG/data/all_images/Annotations


In [None]:
# Listar todas las imágenes válidas
all_image_files = glob.glob(os.path.join(IMAGES_DIR, '*.png')) + glob.glob(os.path.join(IMAGES_DIR, '*.jpg'))

valid_image_files = []
for img_path in all_image_files:
    base_name = os.path.basename(img_path)
    xml_name = os.path.splitext(base_name)[0] + ".xml"
    xml_path = os.path.join(ANNOTATIONS_DIR, xml_name)
    if os.path.exists(xml_path):
        valid_image_files.append(img_path)

print(f"Total imágenes válidas con anotación: {len(valid_image_files)}")

# Split en Train/Test
train_ratio = 0.8
test_ratio = 0.2

train_files, test_files = train_test_split(
    valid_image_files,
    test_size=test_ratio,
    random_state=42
)

print(f"Filtrando train_files para quitar imágenes sin cajas...")

filtered_train_files = []
for img_path in tqdm(train_files):
    xml_name = os.path.splitext(os.path.basename(img_path))[0] + ".xml"
    xml_path = os.path.join(ANNOTATIONS_DIR, xml_name)
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
        # Buscamos al menos 1 objeto 'person'
        found = any(
            obj.find("name").text.lower() == "person"
            for obj in root.findall("object")
        )
        if found:
            filtered_train_files.append(img_path)
    except Exception as e:
        print(f"Error procesando {xml_path}: {e}")

print(f"Train antes del filtro: {len(train_files)}")
print(f"Train después del filtro: {len(filtered_train_files)}")
train_files = filtered_train_files

# Guardar listas
with open("train_files.txt", "w") as f:
    for path in train_files:
        f.write(f"{path}\n")

with open("test_files.txt", "w") as f:
    for path in test_files:
        f.write(f"{path}\n")

In [35]:
class CustomFRCNNDataset(Dataset):
    def __init__(self, image_paths, annotations_dir, transforms=None):
        self.image_paths = image_paths
        self.annotations_dir = annotations_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.image_paths)

    def parse_voc_xml(self, xml_file):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        boxes = []
        labels = []
        for obj in root.findall("object"):
            if obj.find("name").text.lower() != "person":
                continue
            bbox = obj.find("bndbox")
            xmin = int(float(bbox.find("xmin").text))
            ymin = int(float(bbox.find("ymin").text))
            xmax = int(float(bbox.find("xmax").text))
            ymax = int(float(bbox.find("ymax").text))
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(1)  # Solo clase 'person'
        return boxes, labels

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("RGB")

        xml_name = os.path.splitext(os.path.basename(img_path))[0] + ".xml"
        xml_path = os.path.join(self.annotations_dir, xml_name)
        boxes, labels = self.parse_voc_xml(xml_path)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor([idx])
        }

        if self.transforms:
            img = self.transforms(img)

        return img, target


In [46]:
# Transforms para Faster R-CNN
transform_frcnn = transforms.Compose([
    transforms.ToTensor()
])

# Dataset y DataLoader para Train
train_dataset = CustomFRCNNDataset(train_files, ANNOTATIONS_DIR, transforms=transform_frcnn)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True,
                          collate_fn=lambda x: tuple(zip(*x)))

# Dataset y DataLoader para Test
test_dataset = CustomFRCNNDataset(test_files, ANNOTATIONS_DIR, transforms=transform_frcnn)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False,
                         collate_fn=lambda x: tuple(zip(*x)))


In [None]:
def show_sample_with_boxes(image_tensor, target):
    img = F.to_pil_image(image_tensor)
    plt.figure(figsize=(8, 8))
    plt.imshow(img)
    ax = plt.gca()
    for box in target['boxes']:
        xmin, ymin, xmax, ymax = box
        rect = patches.Rectangle(
            (xmin, ymin), xmax - xmin, ymax - ymin,
            linewidth=2, edgecolor='red', facecolor='none'
        )
        ax.add_patch(rect)
    plt.axis('off')
    plt.show()

# Visualizar la primera imagen del batch
show_sample_with_boxes(images[0], targets[0])


In [None]:
# Cargar Faster R-CNN preentrenado (en COCO)

model_frcnn = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model_frcnn.to(device)
model_frcnn.eval()

print("Modelo Faster R-CNN cargado y listo para inferencia.")


In [39]:
# Función para leer XML
def parse_voc_boxes(xml_path):
    boxes = []
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
        for obj in root.findall("object"):
            name = obj.find("name").text
            if name.lower() != "person":
                continue
            bbox = obj.find("bndbox")
            xmin = int(float(bbox.find("xmin").text))
            ymin = int(float(bbox.find("ymin").text))
            xmax = int(float(bbox.find("xmax").text))
            ymax = int(float(bbox.find("ymax").text))
            boxes.append([xmin, ymin, xmax, ymax])
    except Exception as e:
        print(f"Error parsing {xml_path}: {e}")
    return boxes


In [40]:
# Función para mostrar resultados
def show_image_with_boxes(image_pil, pred_boxes, pred_scores, true_boxes, threshold=0.5):
    plt.figure(figsize=(12,9))
    plt.imshow(image_pil)
    ax = plt.gca()

    # Cajas reales en ROJO
    for box in true_boxes:
        xmin, ymin, xmax, ymax = box
        rect = patches.Rectangle(
            (xmin, ymin), xmax - xmin, ymax - ymin,
            linewidth=2, edgecolor='red', facecolor='none'
        )
        ax.add_patch(rect)

    # Predicciones en AZUL (con threshold)
    for box, score in zip(pred_boxes, pred_scores):
        if score >= threshold:
            xmin, ymin, xmax, ymax = box
            rect = patches.Rectangle(
                (xmin, ymin), xmax - xmin, ymax - ymin,
                linewidth=2, edgecolor='blue', facecolor='none'
            )
            ax.add_patch(rect)
            ax.text(xmin, ymin - 5, f"{score:.2f}", color='blue', fontsize=9)

    # Leyenda
    handles = [
        patches.Patch(edgecolor='red', facecolor='none', label='Real', linewidth=2),
        patches.Patch(edgecolor='blue', facecolor='none', label='Predicha', linewidth=2)
    ]
    ax.legend(handles=handles, loc='upper right')

    plt.axis('off')
    plt.show()


In [None]:
# Inferencia sobre todas las imágenes de TEST_VIG

for img_path in vig_image_files:
    print(f"\nProcesando imagen: {os.path.basename(img_path)}")

    # Cargar imagen
    image_pil = Image.open(img_path).convert("RGB")
    image_tensor = transform_frcnn(image_pil).to(device)

    # Inferencia
    with torch.no_grad():
        prediction = model_frcnn([image_tensor])[0]

    pred_boxes = prediction['boxes'].cpu()
    pred_scores = prediction['scores'].cpu()
    pred_labels = prediction['labels'].cpu()

    # Filtrar SOLO clase 'person' (label=1)
    person_mask = pred_labels == 1
    pred_boxes = pred_boxes[person_mask]
    pred_scores = pred_scores[person_mask]

    # Cargar Annotations (XML)
    xml_name = os.path.splitext(os.path.basename(img_path))[0] + ".xml"
    xml_path = os.path.join(TEST_VIG_ANNOTATIONS_DIR, xml_name)
    true_boxes = parse_voc_boxes(xml_path)

    # Mostrar imagen con cajas reales y predichas
    show_image_with_boxes(
        image_pil,
        pred_boxes,
        pred_scores,
        true_boxes,
        threshold=0.5
    )


In [None]:
# Para el entrenamiento
val_ratio = 0.2
val_size = int(len(train_dataset) * val_ratio)
train_size = len(train_dataset) - val_size

train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

print(f"Train subset: {len(train_subset)} imágenes")
print(f"Validation subset: {len(val_subset)} imágenes")

# Loaders
train_loader_frcnn = DataLoader(train_subset, batch_size=8, shuffle=True,
                                collate_fn=lambda x: tuple(zip(*x)))
val_loader_frcnn = DataLoader(val_subset, batch_size=8, shuffle=False,
                              collate_fn=lambda x: tuple(zip(*x)))

In [None]:
print(model_frcnn.backbone)


In [None]:
# Fine Tuning: descongelar solo layer4
for name, param in model_frcnn.backbone.body.named_parameters():
    if "layer4" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

# Cabeza del detector (FastRCNNPredictor) ya está entrenable por defecto

# Verificar
total_params = sum(p.numel() for p in model_frcnn.parameters() if p.requires_grad)
print(f"Parámetros entrenables: {total_params}")

# Optimizer
optimizer = torch.optim.Adam([p for p in model_frcnn.parameters() if p.requires_grad], lr=1e-4)


In [None]:
best_val_loss = float('inf')
patience = 3
counter = 0
epochs = 100

train_losses = []
val_losses = []

for epoch in range(epochs):
    print(f"\n Epoch {epoch+1}/{epochs} iniciando...")
    epoch_start = time.time()

    # =======================
    # TRAIN
    # =======================
    model_frcnn.train()
    train_loss_epoch = 0
    num_batches = len(train_loader_frcnn)

    for batch_idx, (images, targets) in enumerate(train_loader_frcnn):
        batch_start = time.time()

        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        try:
            loss_dict = model_frcnn(images, targets)
            loss = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            batch_time = time.time() - batch_start
            train_loss_epoch += loss.item()

            if batch_idx % 10 == 0:
                print(f"   Batch {batch_idx}/{num_batches} - Loss: {loss.item():.4f} - {batch_time:.2f}s")

        except Exception as e:
            print(f" Error en batch {batch_idx}: {e}")
            continue

    avg_train_loss = train_loss_epoch / num_batches
    train_losses.append(avg_train_loss)

    # =======================
    # VALIDATION
    # =======================
    model_frcnn.train()
    val_loss_epoch = 0
    num_val_batches = len(val_loader_frcnn)

    with torch.no_grad():
        for batch_idx, (images, targets) in enumerate(val_loader_frcnn):
            batch_start = time.time()

            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            try:
                loss_dict = model_frcnn(images, targets)
                loss = sum(loss for loss in loss_dict.values())

                batch_time = time.time() - batch_start
                val_loss_epoch += loss.item()

                if batch_idx % 10 == 0:
                    print(f"   Val Batch {batch_idx}/{num_val_batches} - Loss: {loss.item():.4f} - {batch_time:.2f}s")

            except Exception as e:
                print(f" Error en batch val {batch_idx}: {e}")
                continue

    avg_val_loss = val_loss_epoch / num_val_batches
    val_losses.append(avg_val_loss)

    epoch_time = time.time() - epoch_start
    print(f"\n Epoch {epoch+1} completada en {epoch_time:.2f}s")
    print(f" Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # =======================
    # Early Stopping
    # =======================
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0
        torch.save(model_frcnn.state_dict(), "best_fasterrcnn.pth")
        print(" Mejor modelo guardado")
    else:
        counter += 1
        print(f" No mejora. EarlyStopping contador: {counter}/{patience}")
        if counter >= patience:
            print(f" Early stopping activado en epoch {epoch+1}")
            break

    # =======================
    # Limpieza de memoria
    # =======================
    torch.cuda.empty_cache()
    gc.collect()

# =======================
# Guardar histórico
# =======================
history_df = pd.DataFrame({
    'epoch': range(1, len(train_losses)+1),
    'train_loss': train_losses,
    'val_loss': val_losses
})
history_df.to_csv("loss_history.csv", index=False)
print(" Historial de pérdidas guardado en loss_history.csv")


In [None]:
# Loss VS Epoch

# Cargar historial de pérdidas
history = pd.read_csv("loss_history.csv")

# Gráfica
plt.figure(figsize=(10,6))
plt.plot(history['epoch'], history['train_loss'], label='Train Loss', marker='o')
plt.plot(history['epoch'], history['val_loss'], label='Validation Loss', marker='s')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Evolución del Loss durante el entrenamiento')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Thresholds para evaluar (para la curva)
thresholds = np.arange(0.1, 0.95, 0.05)

# Almacenamos métricas por threshold
all_precisions = []
all_recalls = []
all_APs = []

all_iou_scores = []

all_true_labels = []
all_pred_labels = []

inference_times = []

print(f"\nEntorno de medición: Google Colab Pro con GPU {torch.cuda.get_device_name(0)}, batch_size=1\n")

model_frcnn.eval()

with torch.no_grad():
    for images, targets in tqdm(test_loader, desc="Evaluando imágenes de TEST"):
        images = [img.to(device) for img in images]
        true_boxes = targets[0]['boxes'].cpu().numpy()
        true_label = 1 if len(true_boxes) > 0 else 0
        all_true_labels.append(true_label)

        # Medir tiempo
        start_time = time.time()
        outputs = model_frcnn(images)
        end_time = time.time()

        inference_times.append(end_time - start_time)

        pred_boxes = outputs[0]['boxes'].cpu().numpy()
        pred_scores = outputs[0]['scores'].cpu().numpy()
        pred_labels = outputs[0]['labels'].cpu().numpy()

        # Filtrar solo clase "person"
        mask_person = pred_labels == 1
        pred_boxes = pred_boxes[mask_person]
        pred_scores = pred_scores[mask_person]

        # Evaluar IoU promedio
        for t_box in true_boxes:
            best_iou = 0
            for p_box in pred_boxes:
                xA = max(t_box[0], p_box[0])
                yA = max(t_box[1], p_box[1])
                xB = min(t_box[2], p_box[2])
                yB = min(t_box[3], p_box[3])
                interArea = max(0, xB - xA) * max(0, yB - yA)
                boxAArea = (t_box[2] - t_box[0]) * (t_box[3] - t_box[1])
                boxBArea = (p_box[2] - p_box[0]) * (p_box[3] - p_box[1])
                unionArea = boxAArea + boxBArea - interArea + 1e-6
                iou = interArea / unionArea
                best_iou = max(best_iou, iou)
            all_iou_scores.append(best_iou)

        # Para cada threshold, guardar label predicho (presencia/ausencia)
        for thresh in thresholds:
            pred_label = 1 if np.any(pred_scores >= thresh) else 0
            all_pred_labels.append((thresh, pred_label))


# =========================
# IoU Promedio
# =========================
avg_iou_best_fasterrcnn = np.mean(all_iou_scores)
print(f"\nIoU promedio en TEST: {avg_iou_best_fasterrcnn:.4f}")

# =========================
# Precision y Recall para thresholds
# =========================
for thresh in thresholds:
    pred_bin = [pred for t, pred in all_pred_labels if t == thresh]
    TP = sum((np.array(pred_bin) == 1) & (np.array(all_true_labels) == 1))
    FP = sum((np.array(pred_bin) == 1) & (np.array(all_true_labels) == 0))
    FN = sum((np.array(pred_bin) == 0) & (np.array(all_true_labels) == 1))

    precision = TP / (TP + FP + 1e-6)
    recall = TP / (TP + FN + 1e-6)

    all_precisions.append(precision)
    all_recalls.append(recall)

# Aproximar AP como área bajo Precision-Recall con thresholds como pasos
all_APs = [p * r for p, r in zip(all_precisions, all_recalls)]
mAP_best_fasterrcnn = np.mean(all_APs)
print(f"\nAP aproximado (mAP) en TEST: {mAP_best_fasterrcnn:.4f}")

# =========================
# Matriz de confusión para threshold óptimo
# =========================
# Elegir threshold óptimo
best_thresh_idx = np.argmax(all_APs)
best_threshold = thresholds[best_thresh_idx]
print(f"\nThreshold óptimo encontrado: {best_threshold:.2f}")

final_preds = [pred for t, pred in all_pred_labels if t == best_threshold]
cm = confusion_matrix(all_true_labels, final_preds)
class_names = ['Sin persona', 'Persona']
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
disp.plot(cmap=plt.cm.Blues, values_format='d')
plt.title('Matriz de Confusión en TEST')
plt.show()

# =========================
# Gráfica Precision vs Threshold
# =========================
plt.figure(figsize=(10,6))
plt.plot(thresholds, all_precisions, label='Precision', marker='o')
plt.plot(thresholds, all_recalls, label='Recall', marker='x')
plt.title("Precision y Recall vs. Threshold")
plt.xlabel("Threshold")
plt.ylabel("Score")
plt.legend()
plt.grid()
plt.show()

# =========================
# Tiempo promedio de inferencia
# =========================
avg_inference_time_best_fasterrcnn = np.mean(inference_times)
print(f"\nTiempo promedio de inferencia por imagen (batch_size=1, Google Colab Pro, GPU {torch.cuda.get_device_name(0)}): {avg_inference_time_best_fasterrcnn:.4f} segundos")


In [None]:
# Inferencia sobre todas las imágenes de TEST_VIG

for img_path in vig_image_files:
    print(f"\nProcesando imagen: {os.path.basename(img_path)}")

    # Cargar imagen
    image_pil = Image.open(img_path).convert("RGB")
    image_tensor = transform_frcnn(image_pil).to(device)

    # Inferencia
    with torch.no_grad():
        prediction = model_frcnn([image_tensor])[0]

    pred_boxes = prediction['boxes'].cpu()
    pred_scores = prediction['scores'].cpu()
    pred_labels = prediction['labels'].cpu()

    # Filtrar SOLO clase 'person' (label=1)
    person_mask = pred_labels == 1
    pred_boxes = pred_boxes[person_mask]
    pred_scores = pred_scores[person_mask]

    # Cargar Annotations (XML)
    xml_name = os.path.splitext(os.path.basename(img_path))[0] + ".xml"
    xml_path = os.path.join(TEST_VIG_ANNOTATIONS_DIR, xml_name)
    true_boxes = parse_voc_boxes(xml_path)

    # Mostrar imagen con cajas reales y predichas
    show_image_with_boxes(
        image_pil,
        pred_boxes,
        pred_scores,
        true_boxes,
        threshold=best_threshold
    )

In [3]:
BASE_DRIVE = '/content/drive/MyDrive/TFG'
YOLO_DATA_DIR = os.path.join(BASE_DRIVE, 'yolo_data')
TEST_VIG_DIR = os.path.join(DATA_DIR, 'test_vigilancia')

def voc_to_yolo(xml_path, img_w, img_h):
    yolo_lines = []
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
        for obj in root.findall("object"):
            if obj.find("name").text.lower() != "person":
                continue
            bbox = obj.find("bndbox")
            xmin = float(bbox.find("xmin").text)
            ymin = float(bbox.find("ymin").text)
            xmax = float(bbox.find("xmax").text)
            ymax = float(bbox.find("ymax").text)

            cx = ((xmin + xmax) / 2) / img_w
            cy = ((ymin + ymax) / 2) / img_h
            w = (xmax - xmin) / img_w
            h = (ymax - ymin) / img_h

            yolo_lines.append(f"0 {cx:.6f} {cy:.6f} {w:.6f} {h:.6f}")
    except Exception as e:
        print(f"Error parsing {xml_path}: {e}")
    return yolo_lines

In [77]:
def process_split(split_name, file_list_path):
    with open(file_list_path, 'r') as f:
        image_paths = [line.strip() for line in f if line.strip()]

    images_out = os.path.join(YOLO_DATA_DIR, 'images', split_name)
    labels_out = os.path.join(YOLO_DATA_DIR, 'labels', split_name)

    count_total = 0
    count_converted = 0

    for img_path in image_paths:
        base_name = os.path.basename(img_path)
        xml_name = os.path.splitext(base_name)[0] + '.xml'
        xml_path = os.path.join(DATA_DIR, 'all_images', 'Annotations', xml_name)

        if not os.path.exists(xml_path):
            continue

        try:
            with Image.open(img_path) as im:
                w, h = im.size
        except Exception as e:
            print(f"Error opening image {img_path}: {e}")
            continue

        # Convertir
        yolo_lines = voc_to_yolo(xml_path, w, h)
        if len(yolo_lines) == 0:
            continue  # filtrar sin boxes

        # Copiar imagen
        shutil.copy2(img_path, os.path.join(images_out, base_name))

        # Guardar label
        txt_name = os.path.splitext(base_name)[0] + '.txt'
        txt_path = os.path.join(labels_out, txt_name)
        with open(txt_path, 'w') as f:
            f.write("\n".join(yolo_lines))

        count_converted += 1
        count_total += 1

    print(f"Procesadas {count_total} imágenes. Convertidas con cajas: {count_converted}.")


In [None]:
process_split('train', '/content/train_files.txt')
process_split('val', '/content/test_files.txt')


In [None]:
TEST_VIG_YOLO = os.path.join(BASE_DRIVE, 'test_vigilancia_yolo')

TEST_VIG_YOLO_IMAGES = os.path.join(TEST_VIG_YOLO, 'images')
TEST_VIG_YOLO_LABELS = os.path.join(TEST_VIG_YOLO, 'labels')

test_vig_imgs = [f for f in os.listdir(TEST_VIG_IMAGES_DIR) if f.lower().endswith(('.jpg', '.png'))]

print(f"Total imágenes en TEST_VIG: {len(test_vig_imgs)}")

converted_count = 0

for img_file in test_vig_imgs:
    img_path = os.path.join(TEST_VIG_IMAGES_DIR, img_file)
    xml_path = os.path.join(TEST_VIG_ANNOTATIONS_DIR, os.path.splitext(img_file)[0] + '.xml')

    try:
        with Image.open(img_path) as im:
            w, h = im.size
    except:
        print(f"Error abriendo {img_file}")
        continue

    # Convertir a formato YOLO
    yolo_lines = voc_to_yolo(xml_path, w, h)

    # Copiar SIEMPRE la imagen
    shutil.copy2(img_path, os.path.join(TEST_VIG_YOLO_IMAGES, img_file))

    # Guardar TXT (vacío si no hay cajas)
    txt_name = os.path.splitext(img_file)[0] + '.txt'
    txt_path = os.path.join(TEST_VIG_YOLO_LABELS, txt_name)
    with open(txt_path, 'w') as f:
        f.write("\n".join(yolo_lines))

    converted_count += 1

print(f"Conversión completada: {converted_count} imágenes procesadas en test_vigilancia.")

In [None]:
yaml_content = """
path: /content/drive/MyDrive/TFG/yolo_data
train: images/train
val: images/val

nc: 1
names: ['person']
"""

with open('/content/drive/MyDrive/TFG/yolo_data/data.yaml', 'w') as f:
    f.write(yaml_content.strip())

print("data.yaml generado en /content/drive/MyDrive/TFG/yolo_data")


In [5]:
# Cargar modelo preentrenado de Ultralytics
yolov8m = YOLO('yolov8m.pt')

In [None]:
# Entrenar (fine-tuning)
yolov8m.train(
    data='/content/drive/MyDrive/TFG/yolo_data/data.yaml',
    epochs=100,
    imgsz=800,
    batch=4,
    lr0=0.0001,
    optimizer='Adam',
    patience=3,
    workers=2,           # Menos workers para evitar RAM del sistema
    project='/content/drive/MyDrive/TFG/yolo_data',
    name='train_yolov8m',
    device=0
)

In [None]:
model_path = '/content/drive/MyDrive/TFG/yolo_data/train_yolov8m2/weights/best.pt'
best_yolov8m = YOLO(model_path)
Image('/content/drive/MyDrive/TFG/yolo_data/train_yolov8m2/results.png')


In [None]:
results = best_yolov8m.val(
    data='/content/drive/MyDrive/TFG/yolo_data/data.yaml',
    project="/content/drive/MyDrive/TFG/yolo_data",
    name="val_manual",
    imgsz=1024,
    batch=8
)
Image('/content/drive/MyDrive/TFG/yolo_data/val_manual/confusion_matrix.png')


In [None]:
TEST_VIG_YOLO_IMAGES_DIR = '/content/drive/MyDrive/TFG/test_vigilancia_yolo/images'
TEST_VIG_YOLO_LABELS_DIR = '/content/drive/MyDrive/TFG/test_vigilancia_yolo/labels'

def load_yolo_labels(label_path, img_w, img_h):
    """
    Lee un archivo txt de YOLO y devuelve las cajas en formato xyxy
    """
    boxes = []
    if not os.path.exists(label_path):
        return boxes
    with open(label_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 5:
                continue
            # YOLO format: class cx cy w h (normalized)
            cls, cx, cy, w, h = map(float, parts)
            xmin = (cx - w/2) * img_w
            ymin = (cy - h/2) * img_h
            xmax = (cx + w/2) * img_w
            ymax = (cy + h/2) * img_h
            boxes.append([xmin, ymin, xmax, ymax])
    return boxes


def show_image_with_yolo_preds_and_truth(image, pred_boxes, true_boxes, pred_scores, threshold=0.1):
    plt.figure(figsize=(12,9))
    plt.imshow(image)
    ax = plt.gca()

    # Dibujar cajas reales en rojo
    for box in true_boxes:
        xmin, ymin, xmax, ymax = box
        rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                 linewidth=2, edgecolor='red', facecolor='none')
        ax.add_patch(rect)

    # Dibujar predicciones en azul
    for box, score in zip(pred_boxes, pred_scores):
        if score < threshold:
            continue
        xmin, ymin, xmax, ymax = box
        rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                 linewidth=2, edgecolor='blue', facecolor='none')
        ax.add_patch(rect)
        ax.text(xmin, ymin - 5, f"{score:.2f}", color='blue', fontsize=9)

    # Leyenda
    handles = [
        patches.Patch(edgecolor='red', facecolor='none', label='Real', linewidth=2),
        patches.Patch(edgecolor='blue', facecolor='none', label='Predicha', linewidth=2)
    ]
    ax.legend(handles=handles, loc='upper right')

    plt.axis('off')
    plt.show()

# Listar imágenes
test_vig_image_files = sorted([
    os.path.join(TEST_VIG_YOLO_IMAGES_DIR, f)
    for f in os.listdir(TEST_VIG_YOLO_IMAGES_DIR)
    if f.lower().endswith(('.jpg', '.jpeg', '.png'))
])

print(f"Total imágenes a procesar: {len(test_vig_image_files)}")

for img_path in test_vig_image_files:
    print(f"Procesando imagen: {os.path.basename(img_path)}")

    # Cargar imagen
    img_bgr = cv2.imread(img_path)
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    img_h, img_w = img_rgb.shape[:2]

    # Inferencia
    results = best_yolov8m(img_rgb)
    boxes_xyxy = results[0].boxes.xyxy.cpu().numpy()
    scores = results[0].boxes.conf.cpu().numpy()
    labels = results[0].boxes.cls.cpu().numpy()

    # Filtrar solo la clase 'person' (asumimos clase 0)
    mask_person = labels == 0
    pred_boxes = boxes_xyxy[mask_person]
    pred_scores = scores[mask_person]

    # Cargar cajas reales
    label_txt = os.path.join(TEST_VIG_YOLO_LABELS_DIR, os.path.splitext(os.path.basename(img_path))[0] + ".txt")
    true_boxes = load_yolo_labels(label_txt, img_w, img_h)

    # Mostrar imagen con cajas
    show_image_with_yolo_preds_and_truth(
        img_rgb,
        pred_boxes,
        true_boxes,
        pred_scores,
        threshold=0.1  # usar tu threshold óptimo si quieres
    )


In [None]:
TEST_VIG_YOLO_IMAGES_DIR = '/content/drive/MyDrive/TFG/test_vigilancia_yolo/images'
TEST_VIG_YOLO_LABELS_DIR = '/content/drive/MyDrive/TFG/test_vigilancia_yolo/labels'

def load_yolo_labels(label_path, img_w, img_h):
    """
    Lee un archivo txt de YOLO y devuelve las cajas en formato xyxy
    """
    boxes = []
    if not os.path.exists(label_path):
        return boxes
    with open(label_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 5:
                continue
            # YOLO format: class cx cy w h (normalized)
            cls, cx, cy, w, h = map(float, parts)
            xmin = (cx - w/2) * img_w
            ymin = (cy - h/2) * img_h
            xmax = (cx + w/2) * img_w
            ymax = (cy + h/2) * img_h
            boxes.append([xmin, ymin, xmax, ymax])
    return boxes


def show_image_with_yolo_preds_and_truth(image, pred_boxes, true_boxes, pred_scores, threshold=0.1):
    plt.figure(figsize=(12,9))
    plt.imshow(image)
    ax = plt.gca()

    # Dibujar cajas reales en rojo
    for box in true_boxes:
        xmin, ymin, xmax, ymax = box
        rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                 linewidth=2, edgecolor='red', facecolor='none')
        ax.add_patch(rect)

    # Dibujar predicciones en azul
    for box, score in zip(pred_boxes, pred_scores):
        if score < threshold:
            continue
        xmin, ymin, xmax, ymax = box
        rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                 linewidth=2, edgecolor='blue', facecolor='none')
        ax.add_patch(rect)
        ax.text(xmin, ymin - 5, f"{score:.2f}", color='blue', fontsize=9)

    # Leyenda
    handles = [
        patches.Patch(edgecolor='red', facecolor='none', label='Real', linewidth=2),
        patches.Patch(edgecolor='blue', facecolor='none', label='Predicha', linewidth=2)
    ]
    ax.legend(handles=handles, loc='upper right')

    plt.axis('off')
    plt.show()

# Listar imágenes
test_vig_image_files = sorted([
    os.path.join(TEST_VIG_YOLO_IMAGES_DIR, f)
    for f in os.listdir(TEST_VIG_YOLO_IMAGES_DIR)
    if f.lower().endswith(('.jpg', '.jpeg', '.png'))
])

print(f"Total imágenes a procesar: {len(test_vig_image_files)}")

model = YOLO('yolov8m.pt')
print("Modelo YOLOv8m preentrenado cargado.")

for img_path in test_vig_image_files:
    print(f"Procesando imagen: {os.path.basename(img_path)}")

    # Cargar imagen
    img_bgr = cv2.imread(img_path)
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    img_h, img_w = img_rgb.shape[:2]

    # Inferencia
    results = model(img_rgb)
    boxes_xyxy = results[0].boxes.xyxy.cpu().numpy()
    scores = results[0].boxes.conf.cpu().numpy()
    labels = results[0].boxes.cls.cpu().numpy()

    # Filtrar solo la clase 'person' (asumimos clase 0)
    mask_person = labels == 0
    pred_boxes = boxes_xyxy[mask_person]
    pred_scores = scores[mask_person]

    # Cargar cajas reales
    label_txt = os.path.join(TEST_VIG_YOLO_LABELS_DIR, os.path.splitext(os.path.basename(img_path))[0] + ".txt")
    true_boxes = load_yolo_labels(label_txt, img_w, img_h)

    # Mostrar imagen con cajas
    show_image_with_yolo_preds_and_truth(
        img_rgb,
        pred_boxes,
        true_boxes,
        pred_scores,
        threshold=0.1  # usar tu threshold óptimo si quieres
    )
