In [None]:
import torch
import torchvision.transforms as transforms
import torchvision.models.video as models
import cv2
import numpy as np
from PIL import Image
from imblearn.over_sampling import SMOTE
import re
import os
from sklearn.model_selection import train_test_split
from collections import Counter
import json
import pickle
from sklearn.preprocessing import LabelEncoder

In [None]:
#Función que carga las etiquetas
def load_action_labels(json_path):

    with open(json_path, 'r') as f:
        data = json.load(f)
    
    action_labels = {}
    for action_id, action_data in data['Actions'].items():
        label = action_data['Severity']  # Extraer la etiqueta de 'Action class'
        action_name = f"action_{action_id}"  # Formar el nombre de la acción
        action_labels[action_name] = label
    
    return action_labels

In [None]:
with open('F:/MVFoulRecognition/features/Mvit/train/severity/end/train_combinados.pkl', 'rb') as file:  # 'rb' es lectura en modo binario
    loaded_data = pickle.load(file)

resultado_train = {k: v for dic in loaded_data for k, v in dic.items()}

dic_ordenado_train = dict(sorted(resultado_train.items(), key=lambda x: int(x[0].split('_')[1])))

#print(dic_ordenado_train)
with open('F:/MVFoulRecognition/features/Mvit/test/test_combined.pkl', 'rb') as file:  # 'rb' es lectura en modo binario
    loaded_data = pickle.load(file)

resultado_test = {k: v for dic in loaded_data for k, v in dic.items()}

# Salida: {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}
dic_ordenado_test = dict(sorted(resultado_test.items(), key=lambda x: int(x[0].split('_')[1])))


In [None]:
ls = load_action_labels("F:/data/mvfouls/train/annotations.json")
ls = sorted(ls.items())
sorted_labels_train = sorted(ls, key=lambda x: int(x[0].split('_')[1]))

ls = load_action_labels("F:/data/mvfouls/test/annotations.json")
ls = sorted(ls.items())
sorted_labels_test = sorted(ls, key=lambda x: int(x[0].split('_')[1]))

In [None]:
#Comprobación de las acciones que no se han extraido características correctamente

diccionario = {i: 0 for i in range(0, 4052)}

for r in dic_ordenado_train:

    action = int(r.split("_")[1])  
    if action in diccionario:
        diccionario[action] = 1

# Mostrar el diccionario actualizado
diccionario_filtrado = {k: v for k, v in diccionario.items() if v == 0}
print(diccionario_filtrado)

In [None]:
sorted_labels_train

In [None]:
#Comprobamos el número de acciones
len(dic_ordenado_train)

In [None]:
#AÑADIMOS NUEVAS ACCIONES AL DICCIONARIO DE LABELS 



# Calcular cuántas nuevas acciones tenemos hasta llegar a 3174 (Primer grupo de acciones aumentadas)
new_actions_needed = 3174 - len(sorted_labels_train)+1

# Generar nuevas acciones con la etiqueta '3.0'
new_actions = [('action_' + str(i), '3.0') for i in range(len(sorted_labels_train), len(sorted_labels_train) + new_actions_needed)]

# Agregar las nuevas acciones al diccionario
updated_labels = sorted_labels_train + new_actions

print(f"Total de acciones después de la ampliación: {len(updated_labels)}")
print(updated_labels[:])  # Mostrar las últimas 10 acciones para verificar

# Calcular cuántas nuevas acciones tenemos hasta llegar a 4052 (Segundo grupo de acciones aumentadas)
new_actions_needed = 4052 - len(updated_labels)+1

# Crear las nuevas acciones con etiqueta '5.0' desde el último índice actual
start_index = int(updated_labels[-1][0].split('_')[1]) + 1  # último índice + 1

# Generar nuevas acciones
new_actions_5_0 = [('action_' + str(i), '5.0') for i in range(start_index, start_index + new_actions_needed)]

# Agregar al listado actualizado
updated_labels += new_actions_5_0

# Verificación
print(f"Total final de acciones: {len(updated_labels)}")
print("Últimas acciones añadidas:")
print(updated_labels[-new_actions_needed:])


In [None]:
# Comprobamos la distribución de etiquetas
# Extraer solo las etiquetas
only_labels = [label for (_, label) in sorted_labels_train]

# Contar la frecuencia de cada clase
print(Counter(only_labels))


In [None]:
# Diccionario de mapeo
conversion = {
    "1.0": "no card",
    "2.0": "no card",
    "3.0": "yellow card",
    "4.0": "yellow card",
    "5.0": "red card",
    "": ""  # dejamos las vacías tal cual
}

# Aplicar conversión
updated_labels = [(action, conversion[label]) for action, label in updated_labels]

# Verificamos algunas etiquetas convertidas
print(sorted_labels_train[:10])



In [None]:
# Comprobamos la distribución de etiquetas
# Extraer solo las etiquetas
only_labels = [label for (_, label) in sorted_labels_train]

# Contar la frecuencia de cada clase
print(Counter(only_labels))


In [None]:
# Filtrar las acciones con etiqueta '""'
offence_actions = [item for item in updated_labels if item[1] == ""]

actions_to_remove = offence_actions

actions_to_remove_list = actions_to_remove

updated_labels = [item for item in updated_labels if item not in actions_to_remove_list]

for action in actions_to_remove_list:
    dic_ordenado_train.pop(action[0], None) 
print("Diccionario después de eliminar las acciones:", dic_ordenado_train.keys())

In [None]:
# Comprobamos la distribución de etiquetas
# Extraer solo las etiquetas
only_labels = [label for (_, label) in updated_labels]

# Contar la frecuencia de cada clase
print(Counter(only_labels))

In [None]:
# Filtrar las acciones con etiqueta 'no card'
offence_actions = [item for item in updated_labels if item[1] == 'no card']

# Seleccionar las ultima acciones con etiqueta 'no card'
actions_to_remove = offence_actions[-500:]

actions_to_remove_list = actions_to_remove

updated_labels = [item for item in updated_labels if item not in actions_to_remove_list]

for action in actions_to_remove_list:
    dic_ordenado_train.pop(action[0], None) 

print("Diccionario después de eliminar las acciones:", dic_ordenado_train.keys())

In [None]:
# Comprobamos la distribución de etiquetas
# Extraer solo las etiquetas
only_labels = [label for (_, label) in updated_labels]

# Contar la frecuencia de cada clase
print(Counter(only_labels))

In [None]:
#Eliminamos las acciones que no se han extraido correctamente
updated_labels = [item for item in updated_labels if item[0] != 'action_3320' and item[0] != 'action_3691' and item[0] != 'action_3848' ]


In [None]:
# Comprobamos la distribución de etiquetas de test
# Extraer solo las etiquetas
only_labels = [label for (_, label) in sorted_labels_test]

# Contar la frecuencia de cada clase
print(Counter(only_labels))

In [None]:
# Cambiamos y agrupamos las etiquetas
conversion = {
    "1.0": "no card",
    "2.0": "no card",
    "3.0": "yellow card",
    "4.0": "yellow card",
    "5.0": "red card",
    "": ""  # dejamos las vacías tal cual
}

# Aplicar conversión
sorted_labels_test = [(action, conversion[label]) for action, label in sorted_labels_test]

# Verificamos algunas etiquetas convertidas
print(sorted_labels_test[:10])



In [None]:
# 1. Filtrar las acciones con etiqueta '""'
offence_actions = [item for item in sorted_labels_test if item[1] == ""]

# 2. Seleccionar las primeras mil acciones con etiqueta '""'
# Aseguramos que hay al menos mil acciones con etiqueta '""'
actions_to_remove = offence_actions

# 3. Guardar estas mil acciones en una lista
actions_to_remove_list = actions_to_remove

sorted_labels_test = [item for item in sorted_labels_test if item not in actions_to_remove_list]

for action in actions_to_remove_list:
    dic_ordenado_test.pop(action[0], None) 
# Ver el diccionario después de la eliminación
print("Diccionario después de eliminar las acciones:", dic_ordenado_test.keys())

In [None]:
X_train  =np.array(list(dic_ordenado_train.values()))
labels_dic_train = dict(updated_labels)
Y_train = np.array(list(labels_dic_train.values()))

X_test = np.array(list(dic_ordenado_test.values()))
labels_dict_test = dict(sorted_labels_test)
Y_test = np.array(list(labels_dict_test.values()))

print("Ejemplo de etiquetas en Y_train:", Y_train[:10])
print("Ejemplo de etiquetas en Y_valid:", Y_test[:10])

label_encoder = LabelEncoder()

# Ajustamos el codificador con TODAS las etiquetas posibles
label_encoder.fit(np.concatenate([Y_train, Y_test]))

# Transformamos las etiquetas de texto a números
Y_train_encoded = label_encoder.transform(Y_train)
Y_test_encoded = label_encoder.transform(Y_test)

print("Etiquetas originales:", np.unique(Y_train)) 
print("Etiquetas codificadas:", np.unique(Y_train_encoded))
print("Diccionario de conversión:", dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_))))


In [None]:
print(f"X_train shape: {X_train.shape}, Y_train shape: {Y_train.shape}")
print(f"X_valid shape: {X_test.shape}, Y_valid shape: {Y_test.shape}")

In [None]:
from imblearn.over_sampling import SMOTE
from xgboost import XGBClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from scipy.stats import uniform
import numpy as np
from sklearn.model_selection import train_test_split

# Aplanar la segunda dimensión de X_train y X_valid
X_train_flattened = X_train.squeeze(1)  # (2916, 400)
X_test_flattened = X_test.squeeze(1)  # (301, 400)

# Suponiendo que estos son tus datos completos
X_total = np.concatenate((X_train_flattened, X_test_flattened))
Y_total = np.concatenate((Y_train_encoded, Y_test_encoded))

# Nueva división estratificada
X_train, X_test, Y_train, Y_test = train_test_split(
    X_total, Y_total,
    test_size=0.2,  # o el porcentaje que quieras
    stratify=Y_total,  # 👈 esto asegura la misma proporción de clases
    random_state=42
)

# Aplicar SMOTE para balancear las clases en el conjunto de entrenamiento
smote = SMOTE(random_state=42)
X_train_resampled, Y_train_resampled = smote.fit_resample(X_train, Y_train)

# Definir el espacio de búsqueda de hiperparámetros
param_dist = {
    'n_estimators': [100, 200, 300, 400, 500],  # Número de árboles
    'learning_rate': uniform(0.01, 0.2),         # Tasa de aprendizaje
    'max_depth': [3, 4, 5, 6, 7],                # Profundidad máxima de los árboles
    'min_child_weight': [1, 2, 3, 4],            # Peso mínimo de los hijos
    'subsample': uniform(0.6, 0.4),              # Submuestra de las instancias
    'colsample_bytree': uniform(0.6, 0.4),       # Submuestra de las columnas
    'gamma': [0, 0.1, 0.2, 0.3],                 # Penalización por complejidad
    'class_weight': ['balanced', None],          # Manejo del desbalance
}

# Definir el modelo XGBoost
xgb = XGBClassifier(random_state=42)

# Realizar Randomized Search
random_search = RandomizedSearchCV(
    estimator=xgb,
    param_distributions=param_dist,
    n_iter=30,  # Número de combinaciones a probar
    scoring='recall_weighted',  # Usar F1 macro como métrica de optimización
    cv=3,  # Validación cruzada
    random_state=42,
    verbose=2,
    n_jobs=-1  # Para usar todos los núcleos del procesador
)

# Ajustar al conjunto de entrenamiento con las muestras balanceadas
random_search.fit(X_train_resampled, Y_train_resampled)

# Resultados de los mejores parámetros
print(f"Mejores parámetros encontrados: {random_search.best_params_}")

# Usar el mejor modelo
best_xgb = random_search.best_estimator_

# Realizar predicciones en el conjunto de prueba
Y_test_pred = best_xgb.predict(X_test)

# Evaluar el modelo
print(f"Precisión del modelo optimizado: {accuracy_score(Y_test, Y_test_pred)}")
print("Matriz de Confusión:")
print(confusion_matrix(Y_test, Y_test_pred))
print("Reporte de Clasificación:")
print(classification_report(Y_test, Y_test_pred))


In [None]:
import pickle

# Guarda el modelo en un archivo
with open('modelo_xgboost_severity.pkl', 'wb') as f:
    pickle.dump(best_xgb, f)
