In [1]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Usando dispositivo:", device)

Usando dispositivo: cuda


In [None]:
import os
import glob
import numpy as np
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from efficientnet_pytorch import EfficientNet  # opcional, si más adelante cargas el extractor

class DAiSEEVideoDataset(Dataset):
    """
    Dataset que carga vídeos de ../DAiSEE/DataSet/Aug/{0,1,2,3}/
    extrae un número fijo de frames, los redimensiona a target_size,
    y aplica la normalización de ImageNet para EfficientNet.
    """
    def __init__(self,
                 root_dir="../DAiSEE/DataSet/Aug",
                 classes=[0,1,2,3],
                 num_frames=300,
                 target_size=(224,224),
                 transform=None):
        self.samples = []
        for label in classes:
            folder = os.path.join(root_dir, str(label))
            for ext in ("*.avi", "*.mp4"):
                for path in glob.glob(os.path.join(folder, ext)):
                    self.samples.append((path, label))
        self.num_frames = num_frames
        self.target_size = target_size
        # pipeline por frame: resize/crop y normalización imagenet
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize(target_size),
            transforms.CenterCrop(target_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485,0.456,0.406],
                                 std=[0.229,0.224,0.225])
        ])

    def __len__(self):
        return len(self.samples)

    def _extract_frames(self, video_path):
        cap = cv2.VideoCapture(video_path)
        total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        # indices uniformes sobre toda la duración
        if total >= self.num_frames:
            idxs = np.linspace(0, total-1, self.num_frames, dtype=int)
        else:
            # si hay menos frames, repetir el último
            idxs = list(range(total)) + [total-1]*(self.num_frames-total)
        frames = []
        for i in idxs:
            cap.set(cv2.CAP_PROP_POS_FRAMES, int(i))
            ret, frame = cap.read()
            if not ret:
                break
            # convertimos BGR→RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
        cap.release()
        return frames

    def __getitem__(self, idx):
        video_path, label = self.samples[idx]
        frames = self._extract_frames(video_path)
        # aplicamos transform por frame
        tensor_frames = [ self.transform(f) for f in frames ]
        # queda tensor de forma (num_frames, 3, H, W)
        video_tensor = torch.stack(tensor_frames)
        return video_tensor, label

In [None]:
import torch
import torch.nn as nn
from efficientnet_pytorch import EfficientNet

class FeatureExtractor(nn.Module):
    """
    Envuelve EfficientNet-B0 pretrained en ImageNet.
    De cada frame extrae un vector de features aplastando 
    el mapa de características final con GlobalAvgPool.
    """
    def __init__(self, model_name='efficientnet-b0', pretrained=True):
        super().__init__()
        self.backbone = EfficientNet.from_pretrained(model_name) if pretrained \
                        else EfficientNet.from_name(model_name)
        # quitamos el classifier final
        self.backbone._fc = nn.Identity()
        self.pool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        # x: [B*T, 3, H, W]
        feats = self.backbone.extract_features(x)        # [B*T, C, h', w']
        pooled = self.pool(feats).view(feats.size(0), -1)  # [B*T, C]
        return pooled


class TemporalLSTM(nn.Module):
    """
    Agrega las features en secuencia con un LSTM bidireccional.
    """
    def __init__(self, feat_dim, hidden_dim=256, num_layers=1, bidir=True):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=feat_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidir
        )
        self.out_dim = hidden_dim * (2 if bidir else 1)

    def forward(self, x):
        # x: [B, T, feat_dim]
        outputs, _ = self.lstm(x)          # [B, T, out_dim]
        # toma la última salida temporal
        last = outputs[:, -1, :]           # [B, out_dim]
        return last


class TemporalTransformer(nn.Module):
    """
    Agrega las features en secuencia con un TransformerEncoder.
    Incluye pos-encoding simple.
    """
    def __init__(self, feat_dim, n_heads=4, ff_dim=512, n_layers=2, dropout=0.1, max_len=300):
        super().__init__()
        self.pos_emb = nn.Parameter(torch.randn(1, max_len, feat_dim))
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=feat_dim, nhead=n_heads,
            dim_feedforward=ff_dim, dropout=dropout,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        self.feat_dim = feat_dim

    def forward(self, x):
        # x: [B, T, feat_dim]
        B, T, C = x.shape
        pos = self.pos_emb[:, :T, :]        # [1, T, C]
        x = x + pos
        out = self.encoder(x)               # [B, T, C]
        # pooling: promedio sobre la secuencia
        pooled = out.mean(dim=1)            # [B, C]
        return pooled


class VideoClassifier(nn.Module):
    """
    Modelo completo: extractor → temporal (LSTM/Transformer) → clasificación.
    """
    def __init__(self,
                 temporal_type='lstm',
                 feat_model_name='efficientnet-b0',
                 num_classes=4,
                 lstm_hidden=256,
                 transformer_heads=4):
        super().__init__()
        # extractor
        self.extractor = FeatureExtractor(model_name=feat_model_name)
        feat_dim = self.extractor.backbone._fc.in_features if False else self.extractor.backbone._fc.in_features  # B0 tiene 1280
        # módulo temporal
        if temporal_type == 'lstm':
            self.temporal = TemporalLSTM(feat_dim, hidden_dim=lstm_hidden)
            temporal_dim = self.temporal.out_dim
        elif temporal_type == 'transformer':
            self.temporal = TemporalTransformer(feat_dim, n_heads=transformer_heads)
            temporal_dim = feat_dim
        else:
            raise ValueError("temporal_type debe ser 'lstm' o 'transformer'")
        # cabeza de clasificación
        self.classifier = nn.Sequential(
            nn.Linear(temporal_dim, temporal_dim//2),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(temporal_dim//2, num_classes)
        )

    def forward(self, video):
        """
        video: [B, T, 3, H, W]
        """
        B, T, C, H, W = video.shape
        # extraer features frame a frame
        x = video.view(B*T, C, H, W)
        feats = self.extractor(x)            # [B*T, feat_dim]
        feats = feats.view(B, T, -1)         # [B, T, feat_dim]
        # agrega secuencia
        temp = self.temporal(feats)          # [B, temporal_dim]
        # clasifica
        logits = self.classifier(temp)       # [B, num_classes]
        return logits


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1) Instancio el Dataset y el DataLoader
ds = DAiSEEVideoDataset(
    root_dir="../DAiSEE/DataSet/Aug",
    num_frames=300,
    target_size=(224,224)
)
dl = DataLoader(ds, batch_size=4, shuffle=True, num_workers=4)

# 2) Instancio el modelo, el optimizador y la loss
model = VideoClassifier(
    temporal_type='lstm',
    feat_model_name='efficientnet-b0',
    num_classes=4
).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = torch.nn.CrossEntropyLoss()

# 3) (Opcional) Si quisieras extraer y cachear “features a mano” en lugar de usar VideoClassifier:
feat_extractor = EfficientNet.from_pretrained("efficientnet-b0").to(device)
feat_extractor.eval()

num_epochs = 250

# 4) Bucle de entrenamiento
for epoch in range(num_epochs):
    model.train()
    for vids, labels in dl:
        vids = vids.to(device)       # [B, T, 3, H, W]
        labels = labels.to(device)

        optimizer.zero_grad()

        # --- Opción A: pasar todo por tu modelo integrado ---
        logits = model(vids)         # VideoClassifier hace extracción + LSTM/Transformer
        loss = criterion(logits, labels)

        # --- Opción B (extractor por separado) --- 
        # with torch.no_grad():
        #     B, T, C, H, W = vids.shape
        #     vids_flat = vids.view(B*T, C, H, W)
        #     feats_flat = feat_extractor.extract_features(vids_flat)
        #     # aplicar pooling y reconstruir tensor [B, T, feat_dim]
        #     # luego pasar esa secuencia por LSTM/Transformer + clasificador
        #     logits = tu_segundo_modulo(feats_seq)
        #     loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

    # aquí iría validación, guardado de checkpoints, etc.