In [3]:
import pandas as pd
from ultralytics import YOLO
import cv2

# Cargar el modelo YOLOv8 pre-entrenado
model = YOLO("yolov8n.pt")

def extract_features_from_video(video_path, clip_name, label):
    """
    Procesa un video para extraer una lista de detecciones.
    En un caso real, aquí se integraría un tracker como Deep Sort.
    """
    cap = cv2.VideoCapture(video_path)
    frame_num = 0
    video_detections = []

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        # Realizar detección de personas (clase 0)
        results = model.predict(frame, classes=[0], verbose=False)

        # Para cada persona detectada, guardar sus datos
        for r in results:
            for box in r.boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                video_detections.append({
                    'clip': clip_name,
                    'frame': frame_num,
                    'left': x1, 'top': y1,
                    'width': x2 - x1, 'height': y2 - y1,
                    'confidence': float(box.conf[0].cpu().numpy()),
                    'label': label
                })
        frame_num += 1

    cap.release()
    return video_detections

# Se procesaría un video y se guardaría el resultado
detections = extract_features_from_video('path/to/anomaly_video.mp4', 'Video_01', 'anomaly')
df_frames = pd.DataFrame(detections)

# Guardar el dataset para la siguiente fase
df_frames.to_csv("dataframe_frames.csv", index=False)

print("Dataset a nivel de frame generado:")
print(df_frames.head())

Dataset a nivel de frame generado:
Empty DataFrame
Columns: []
Index: []


In [3]:
import numpy as np
import pandas as pd
from tsai.all import *

# Cargar el dataset específico
df_frames = pd.read_csv("dataframe_frames.csv")

# Selección de Características
features = [
    'center_x', 'center_y',       # Posición central
    'width', 'height',            # Dimensiones
    'bbox_area',                  # Tamaño total del cuadro
    'movement_since_last_frame',  # ¡Característica temporal clave!
    'confidence',                 # Confianza de la detección
    'distance_from_center',       # Contexto espacial
    'total_persons_in_frame'      # Contexto del entorno
]
n_vars = len(features)
print(f"Se utilizarán {n_vars} variables: {features}")

# Agrupamos por 'video_folder' y 'person_id' para crear una secuencia
# única para cada persona en cada video.
X_list, y_list = [], []
for (video, person), group in df_frames.groupby(['video_folder', 'person_id']):
    if len(group) < 10:
        continue

    # Usamos .to_numpy() para garantizar que obtenemos un array de NumPy
    sequence = group[features].to_numpy().T
    X_list.append(sequence)
    y_list.append(group['label'].iloc[0])

# Unificar la longitud de las secuencias con padding
X_padded = pad_sequences(X_list, padding='post', truncating='post')

# Convertir a arrays de NumPy y codificar etiquetas
X = np.array(X_padded)
y, classes = pd.factorize(np.array(y_list))

print(f"\nForma final del array de datos (X): {X.shape}")
print(f"Clases: {classes}")
print(f"Total de secuencias de personas a analizar: {len(X)}")

Se utilizarán 9 variables: ['center_x', 'center_y', 'width', 'height', 'bbox_area', 'movement_since_last_frame', 'confidence', 'distance_from_center', 'total_persons_in_frame']

Forma final del array de datos (X): (2899, 9, 10)
Clases: ['anomaly' 'normal']
Total de secuencias de personas a analizar: 2899


In [8]:
# Entrenamiento y Evaluación con TSAI
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np

# Se asume que X, y, n_vars, y classes ya existen del paso anterior

# Crear los splits usando los índices
indices = np.arange(len(X))
train_idx, test_idx = train_test_split(
    indices, test_size=0.2, random_state=42, stratify=y
)
splits = [train_idx, test_idx]

# 1. Crear los DataLoaders
dls = TSDataLoaders.from_numpy(X, y, splits=splits, bs=32)

# 2. Instanciar la arquitectura del modelo
model = XceptionTime(n_vars, dls.c)

# 3. Crear el 'Learner' que orquesta todo el proceso
learn = Learner(dls, model, metrics=[accuracy, F1Score(average='macro')])

# 4. Entrenar el modelo con la política 'fit_one_cycle'
learn.fit_one_cycle(15, lr_max=1e-3)


# 5. Crear un DataLoader de prueba limpio para evitar inconsistencias de tamaño
test_dl = dls.test_dl(X[test_idx])

# 6. Obtener las predicciones usando el nuevo DataLoader de prueba
probas, _, preds = learn.get_preds(dl=test_dl, with_decoded=True)


# Imprimir el reporte de clasificación final
print("\n### Reporte de Clasificación del Modelo XceptionTime ###")
print(classification_report(y[test_idx], preds.numpy(), target_names=classes))

epoch,train_loss,valid_loss,accuracy,f1_score,time
0,0.599552,0.631089,0.654577,0.640391,00:15
1,0.584131,0.639277,0.647668,0.641656,00:16
2,0.575248,0.63103,0.65285,0.645744,00:16
3,0.567615,0.671879,0.632124,0.434998,00:15
4,0.563222,0.597698,0.677029,0.65636,00:15
5,0.548442,0.710244,0.594128,0.594123,00:15
6,0.540353,0.586495,0.694301,0.646972,00:15
7,0.529988,0.583106,0.708117,0.668572,00:16
8,0.52209,0.564846,0.690846,0.672226,00:16
9,0.508199,0.581265,0.709845,0.646561,00:15



### Reporte de Clasificación del Modelo XceptionTime ###
              precision    recall  f1-score   support

     anomaly       0.72      0.71      0.72       234
      normal       0.81      0.81      0.81       346

    accuracy                           0.77       580
   macro avg       0.76      0.76      0.76       580
weighted avg       0.77      0.77      0.77       580

