In [6]:
import os
import random
import numpy as np
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import pandas as pd


In [3]:
# 1. Selección de frames basados en área de máscara
def select_candidate_frames(mask_npy_path, n_samples=5, tol=0.2):
    """
    - Carga el .npy con máscaras (shape: D x H x W).
    - Calcula el área (número de pixeles != 0) por frame.
    - Encuentra el índice del frame de área máxima.
    - Selecciona aleatoriamente hasta n_samples frames adicionales
      cuyo área >= (1 - tol) * área_máxima.
    - Devuelve lista de índices [idx_max, idx1, idx2, ...].
    """
    masks = np.load(mask_npy_path)  # (D, H, W)
    # Área = conteo de pixeles > 0
    areas = (masks > 0).reshape(masks.shape[0], -1).sum(axis=1)
    idx_max = int(np.argmax(areas))
    max_area = areas[idx_max]

    # candidatos con área suficiente (excluyendo el máximo)
    eligible = [i for i, a in enumerate(areas)
                if i != idx_max and a >= (1 - tol) * max_area]
    # muestreo aleatorio
    sampled = random.sample(eligible, min(len(eligible), n_samples))
    return [idx_max] + sampled

In [4]:
# 2. Función para leer sólo los frames necesarios de un video
def load_frames_from_video(video_path, frame_indices):
    """
    - Usa cv2.VideoCapture para leer sólo los índices indicados.
    - Devuelve lista de arrays (H x W x C).
    """
    cap = cv2.VideoCapture(video_path)
    frames = {}
    for idx in sorted(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            raise RuntimeError(f"Error leyendo frame {idx} de {video_path}")
        # convertir BGR → RGB
        frames[idx] = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    cap.release()
    # devolver en orden original de indices
    return [frames[i] for i in frame_indices]



In [5]:
# 3. Dataset para clasificación
class FrameDataset(Dataset):
    def __init__(self, frame_list, transform=None, patient_id=None):
        """
        frame_list: lista de np.array HxWxC
        """
        self.frames = frame_list
        self.transform = transform

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        img = self.frames[idx]
        if self.transform:
            img = self.transform(img)
        return img, 0, 0  # (image, dummy_label, dummy_patient_id)



In [None]:
# 4. Pipeline de inferencia
def run_classification_on_npy_masks(
    mask_dir, video_dir, model_list, device, transform,
    n_per_patient=5, tol=0.2, batch_size=8
):
    """
    Para cada paciente:
      1. Leer mask.npy y video.mp4 (o .avi).
      2. Seleccionar índices de frames.
      3. Leer esos frames.
      4. Crear DataLoader y pasar por ensemble_predictions.
    Devuelve dict paciente → etiqueta final.
    """
    from your_module import ensemble_predictions  # tu función de ensemble

    results = {}
    for fname in os.listdir(mask_dir):
        if not fname.endswith(".npy"):
            continue
        patient_id = os.path.splitext(fname)[0]
        mask_path  = os.path.join(mask_dir, fname)
        video_path = os.path.join(video_dir, patient_id + ".mp4")

        # 1. seleccionar frames
        idxs = select_candidate_frames(mask_path, n_per_patient, tol)
        # 2. cargar imágenes
        frames = load_frames_from_video(video_path, idxs)
        # 3. dataset + dataloader
        ds = FrameDataset(frames, transform=transform)
        dl = DataLoader(ds, batch_size=batch_size, shuffle=False, num_workers=2)
        # 4. inferencia ensemble
        pred = ensemble_predictions(model_list, dl, device, method="average")
        results[patient_id] = pred  # asume un único valor por paciente

    return results



In [None]:
# Ejemplo de uso:
if __name__ == "__main__":
    # cargar modelos (adaptar a tu load_model)
    from your_module import load_model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    models = []
    for ckpt in ["densenet.ckpt","mobilenet.ckpt","vgg16.ckpt"]:
        model,_ = load_model(ckpt, model_name=os.path.splitext(ckpt)[0])
        models.append(model)

    # transformaciones
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3,[0.5]*3)
    ])

    preds = run_classification_on_npy_masks(
        mask_dir="masks_npy",
        video_dir="videos",
        model_list=models,
        device=device,
        transform=transform,
        n_per_patient=5,
        tol=0.2,
        batch_size=8
    )
    print(preds)