In [7]:
import os
import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image
import sys

# === CONFIG ===
CLIP_LENGTH = 32  # O 16 si usaste menos frames en entrenamiento
IMAGE_SIZE = 224  # Debe coincidir con el entrenamiento
NUM_CLASSES = 2
CLASS_NAMES = ['entrada', 'salida']
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# === TRANSFORMACIÓN ===
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor()
])

# === MODELO ===
class CNN_LSTM(nn.Module):
    def __init__(self, hidden_dim=512, num_layers=1, num_classes=2):
        super(CNN_LSTM, self).__init__()
        base_model = models.mobilenet_v2(pretrained=True)
        self.cnn = base_model.features
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.feature_dim = 1280

        for param in self.cnn.parameters():  # congelar el CNN
            param.requires_grad = False

        self.lstm = nn.LSTM(input_size=self.feature_dim,
                            hidden_size=hidden_dim,
                            num_layers=num_layers,
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        with torch.no_grad():
            features = self.cnn(x)
            features = self.pool(features).view(B, T, -1)
        output, _ = self.lstm(features)
        final_output = output[:, -1, :]
        logits = self.fc(final_output)
        return logits

# === FUNCIÓN DE INFERENCIA ===
def inferir_clip(clip_path, model):
    frames = sorted(os.listdir(clip_path))[:CLIP_LENGTH]
    clip = []

    for frame_name in frames:
        img_path = os.path.join(clip_path, frame_name)
        image = Image.open(img_path).convert('RGB')
        image = transform(image)
        clip.append(image)

    clip_tensor = torch.stack(clip, dim=0).unsqueeze(0).to(DEVICE)  # (1, T, C, H, W)

    model.eval()
    with torch.no_grad():
        logits = model(clip_tensor)
        probs = torch.softmax(logits, dim=1)
        predicted = torch.argmax(probs, dim=1).item()

    return CLASS_NAMES[predicted], probs.cpu().numpy()

# === EJECUCIÓN ===
if __name__ == "__main__":
    # Cambia este path por la carpeta de tu clip de prueba
    clip_test_path = '../dataset/entrada/entrada_001'  # ejemplo

    model = CNN_LSTM(num_classes=NUM_CLASSES).to(DEVICE)
    model.load_state_dict(torch.load('../checkpoints/cnn_lstm_supermercado.pth', map_location=DEVICE))

    clase, probs = inferir_clip(clip_test_path, model)
    print(f"🧠 Predicción: {clase} — Probabilidades: {probs}")




🧠 Predicción: entrada — Probabilidades: [[0.99555004 0.00444997]]


In [10]:
import cv2
import torch
import torch.nn as nn
from torchvision import transforms, models
from collections import deque
import numpy as np
from PIL import Image

# === CONFIG ===
CLIP_LENGTH = 32
IMAGE_SIZE = 224
CLASS_NAMES = ['entrada', 'salida']
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# === TRANSFORMACIÓN ===
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor()
])

# === MODELO ===
class CNN_LSTM(nn.Module):
    def __init__(self, hidden_dim=512, num_layers=1, num_classes=2):
        super(CNN_LSTM, self).__init__()
        base_model = models.mobilenet_v2(pretrained=True)
        self.cnn = base_model.features
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.feature_dim = 1280

        for param in self.cnn.parameters():
            param.requires_grad = False

        self.lstm = nn.LSTM(input_size=self.feature_dim,
                            hidden_size=hidden_dim,
                            num_layers=num_layers,
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        with torch.no_grad():
            features = self.cnn(x)
            features = self.pool(features).view(B, T, -1)
        output, _ = self.lstm(features)
        final_output = output[:, -1, :]
        logits = self.fc(final_output)
        return logits

# === INFERENCIA EN CLIP ===
def inferir_clip_tensor(clip_frames, model):
    clip_tensor = torch.stack(clip_frames, dim=0).unsqueeze(0).to(DEVICE)
    model.eval()
    with torch.no_grad():
        logits = model(clip_tensor)
        probs = torch.softmax(logits, dim=1)
        pred = torch.argmax(probs, dim=1).item()
        return CLASS_NAMES[pred], probs[0][pred].item()

# === PROCESAMIENTO DE VIDEO FIJO ===
def procesar_video_guardado(video_path, salida_path, model):
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    ancho = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    alto = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    out = cv2.VideoWriter(salida_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (ancho, alto))

    frame_queue = deque(maxlen=CLIP_LENGTH)
    frames_buffer = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Preprocesar para la red
        pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGB')
        tensor_img = transform(pil_img)
        frame_queue.append(tensor_img)
        frames_buffer.append(frame)

        # Hacer predicción si se llena el clip
        if len(frame_queue) == CLIP_LENGTH:
            pred, prob = inferir_clip_tensor(list(frame_queue), model)
            texto = f"{pred.upper()} ({prob:.2f})"

            # Dibujar texto en los últimos CLIP_LENGTH frames
            for i in range(CLIP_LENGTH):
                cv2.putText(frames_buffer[-CLIP_LENGTH + i], texto, (10, 30),
                            cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)

        # Escribir frame con o sin texto
        out.write(frames_buffer[-1])

    cap.release()
    out.release()
    print(f"✅ Video procesado guardado en: {salida_path}")

# === MAIN ===
if __name__ == "__main__":
    model = CNN_LSTM(num_classes=2).to(DEVICE)
    model.load_state_dict(torch.load('../checkpoints/cnn_lstm_supermercado.pth', map_location=DEVICE))

    video_entrada = '../videos/entrada_001.mp4'
    video_salida = '../test-result/salida_etiquetada.mp4'

    procesar_video_guardado(video_entrada, video_salida, model)


✅ Video procesado guardado en: ../test-result/salida_etiquetada.mp4
