In [1]:
%%capture
!pip install mediapipe opencv-python pandas dtw hmmlearn fastdtw

==================== Recuperacion de datos ===================

In [2]:
import os

# Para hacer formating de los videos originales
video_folder = "Videos"
person_prefix = "Person"
action_prefix = "Action"
take_prefix = "take"

video_files = sorted([f for f in os.listdir(video_folder) if f.startswith("DSC_")])


person_count = 1
action_count = 1
take_count = 1
files_per_person = 15
files_per_action = 3

# Guardarlos con sus nuevos nombres para manejo mas sencillo
for i, filename in enumerate(video_files):

    new_name = f"{person_prefix} {person_count} {action_prefix} {action_count} {take_prefix} {take_count}{os.path.splitext(filename)[1]}"
    old_path = os.path.join(video_folder, filename)
    new_path = os.path.join(video_folder, new_name)

    os.rename(old_path, new_path)
    print(f"Renamed: {filename} -> {new_name}")

    take_count += 1
    if take_count > files_per_action:
        take_count = 1
        action_count += 1

    if action_count > 5:
        action_count = 1
        person_count += 1


In [3]:
import os
import cv2
import mediapipe as mp
import pandas as pd
import json

# Inicializar Mediapipe
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Funcion paratomar todos los videos y sus caracteristicas por frame 
def get_videos(video_folder):
    output = []
    video_files = sorted([f for f in os.listdir(video_folder) if f.endswith(('.mp4', '.avi', '.MOV'))])
    
    for video_file in video_files:
        video_path = os.path.join(video_folder, video_file)
        cap = cv2.VideoCapture(video_path)
        frame_count = 0
        video_data = []
    
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
    
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(frame_rgb)
    
            if results.pose_landmarks:
                frame_keypoints = {}
                for idx, landmark in enumerate(results.pose_landmarks.landmark):
                    frame_keypoints[f"landmark_{idx}"] = {
                        "x": landmark.x,
                        "y": landmark.y,
                        "z": landmark.z,
                        "visibility": landmark.visibility
                    }
                video_data.append({"frame": frame_count, "keypoints": frame_keypoints})
    
            frame_count += 1
    
        cap.release()
        output.append({"video": video_file, "frames": video_data})
    return output

I0000 00:00:1739422966.448556 19529424 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M1 Pro
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [4]:
output_data = get_videos("Videos")
print(output_data)

W0000 00:00:1739422966.505243 19529570 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1739422966.518876 19529569 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1739422966.542566 19529572 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



==================== Aumentar Datos y normalizacion ====================

In [5]:
def center_pose(keypoints):
    # Usar los puntos de las caderas para centrar al sujeto
    lh = keypoints["landmark_23"]
    rh = keypoints["landmark_24"]
    
    center_x = (lh["x"] + rh["x"]) / 2
    center_y = (lh["y"] + rh["y"]) / 2
    
    centered = {}
    for key, coords in keypoints.items():
        centered[key] = {
            "x": coords["x"] - center_x,
            "y": coords["y"] - center_y,
            "z": coords["z"],
            "visibility": coords["visibility"]
        }
    return centered


In [6]:
import math

def scale_pose(centered_keypoints):
    # Calcular que tan lejos esta por los dos puntos de la cadera y escalarlo para tener datos mas similares
    lh = centered_keypoints["landmark_23"]
    rh = centered_keypoints["landmark_24"]
    scale = math.sqrt((lh["x"] - rh["x"])**2 + (lh["y"] - rh["y"])**2)

    # En caso de que la escala sea 0 mejor ignorarlo y no escalar
    if scale == 0:
        scale = 1.0

    scaled = {}
    for key, coords in centered_keypoints.items():
        scaled[key] = {
            "x": coords["x"] / scale,
            "y": coords["y"] / scale,
            "z": coords["z"],
            "visibility": coords["visibility"]
        }
    return scaled


In [7]:
def normalize_pose(keypoints):
    centered = center_pose(keypoints)
    normalized = scale_pose(centered)
    return normalized


In [8]:
def extract_normalized_features(keypoints):
    normalized = normalize_pose(keypoints)
    features = []
    for i in range(33):  # Assuming 33 landmarks
        lm = normalized.get(f"landmark_{i}", {"x": 0, "y": 0, "z": 0, "visibility": 0})
        features.extend([lm["x"], lm["y"], lm["z"], lm["visibility"]])
    return features


In [9]:
def augment_sequence(sequence, noise_std=0.01):
    #Esta secuencia de augmentacion realiza solamente la inversion de la imagen y anade un poco de ruido al reflejo apra evitar que sean exactamente iguales
    seq_aug = sequence.copy()
    num_landmarks = seq_aug.shape[1] // 4
    T = seq_aug.shape[0]
    
    for t in range(T): 
        for i in range(num_landmarks):
            base_idx = i * 4
            
            original_x = seq_aug[t, base_idx]
            seq_aug[t, base_idx] = 1.0 - original_x + np.random.normal(0, noise_std)
            
            seq_aug[t, base_idx + 1] += np.random.normal(0, noise_std)
            seq_aug[t, base_idx + 2] += np.random.normal(0, noise_std)
    
    return seq_aug

In [10]:
import numpy as np
# Crear el CSV y aplicar la aumentacion  de datos y normalizacion
def data_to_csv(output, name):
    csv_data = []
    for video in output:
        video_name_parts = video["video"].split()
        action_num = int(video_name_parts[3])
        take_num = int(video_name_parts[5].split(".")[0])
    
        completion_value = {1: 1.0, 2: 0.5, 3: 0.0}.get(take_num, 0.0)
    
        #Procesamiento de los frames originales
        original_sequence_features = []
        for frame in video["frames"]:
            # Normalizar
            normalized_keypoints = normalize_pose(frame["keypoints"])
            features = extract_normalized_features(normalized_keypoints)
            original_sequence_features.append(features)
            
            row = {
                "video": video["video"],
                "frame": frame["frame"],
                "action": f"Action {action_num}",
                "completion": completion_value
            }
            for idx, value in enumerate(features):
                row[f"feat_{idx}"] = value
                
            csv_data.append(row)
        
        sequence_array = np.array(original_sequence_features)
        
        # Aumentar los datos
        augmented_sequence = augment_sequence(sequence_array, noise_std=0.01)

        for i, frame in enumerate(video["frames"]):
            features = augmented_sequence[i]
            row = {
                "video": video["video"] + "_aug",
                "frame": frame["frame"],
                "action": f"Action {action_num}",
                "completion": completion_value
            }
            for idx, value in enumerate(features):
                row[f"feat_{idx}"] = value
            csv_data.append(row)

    # Guardarlo en CSV
    df = pd.DataFrame(csv_data)
    csv_output_path = name
    df.to_csv(csv_output_path, index=False)
    
    print(f"Pose data saved to {csv_output_path}")

In [11]:
data_to_csv(output_data,"pose_data.csv")

Pose data saved to pose_data.csv


==================== Modelos ====================

======== Para predecir Accion ==========

In [21]:
# Cargar y preparar datos desde "pose_data.csv"
df = pd.read_csv("pose_data.csv")
# Seleccionar las columnas de caracteristicas que comienzan con "feat"
feature_cols = [col for col in df.columns if col.startswith("feat")]

# Ordenar por video y frame
df = df.sort_values(['video', 'frame'])

# Agrupar los frames por video (cada video/toma es una secuencia)
grouped = df.groupby('video')

X_sequences = []
y_action_seq = []
for video, group in grouped:
    seq = group[feature_cols].values
    X_sequences.append(seq)
    y_action_seq.append(group['action'].iloc[0])

# Dividir en conjuntos de entrenamiento y prueba
indices = np.arange(len(X_sequences))
train_idx, test_idx = train_test_split(indices, test_size=0.3, random_state=42)

X_train_seq = [X_sequences[i] for i in train_idx]
y_train_seq = [y_action_seq[i] for i in train_idx]
X_test_seq  = [X_sequences[i] for i in test_idx]
y_test_seq  = [y_action_seq[i] for i in test_idx]

# Funcion para calcular la distancia DTW entre dos secuencias usando distancia
def dtw_distance(seq1, seq2):
    distance, path = fastdtw(seq1, seq2, dist=euclidean)
    return distance

# Funcion para predecir la etiqueta de acción mediante kNN (con DTW)
def predict_knn_dtw(test_seq, train_sequences, train_labels, k=1):
    distances = []
    for seq, label in zip(train_sequences, train_labels):
        d = dtw_distance(test_seq, seq)
        distances.append((d, label))
    distances.sort(key=lambda x: x[0])
    # Para k=1, se retorna la etiqueta del vecino más cercano.
    return distances[0][1]

# Predecir en el conjunto de prueba usando DTW kNN
y_pred_dtw = [predict_knn_dtw(seq, X_train_seq, y_train_seq, k=1) for seq in X_test_seq]
print("=== Clasificación de Acción con DTW kNN ===")
print("Precisión:", accuracy_score(y_test_seq, y_pred_dtw))


=== Clasificación de Acción con DTW kNN ===
Precisión: 0.9666666666666667


In [22]:
# Cargar y preparar datos desde "pose_data.csv"
df = pd.read_csv("pose_data.csv")
feature_cols = [col for col in df.columns if col.startswith("feat")]

# Ordenar por video y frame
df = df.sort_values(['video', 'frame'])

# Agrupar frames por video
grouped = df.groupby('video')

# Funcion para extraer caracteristicas de una secuencia
def extract_features_from_sequence(seq):
    mean_features = np.mean(seq, axis=0)
    std_features = np.std(seq, axis=0)
    return np.concatenate([mean_features, std_features])

X_features = []
y_action = []
y_completion = []

for video, group in grouped:
    seq = group[feature_cols].values
    features = extract_features_from_sequence(seq)
    X_features.append(features)
    
    y_action.append(group['action'].iloc[0])
    y_completion.append(group['completion'].iloc[0])

X_features = np.array(X_features)
y_action = np.array(y_action)
y_completion = np.array(y_completion)

# Codificar las etiquetas de accion para la clasificacion
le = LabelEncoder()
y_action_enc = le.fit_transform(y_action)

# Dividir los datos por toma (videos)
X_train, X_test, y_train_action, y_test_action, y_train_completion, y_test_completion = train_test_split(
    X_features, y_action_enc, y_completion, test_size=0.3, random_state=42
)

clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train_action)
y_pred_action = clf.predict(X_test)

print("\n=== Clasificacion de Acción con RandomForest ===")
print("Precisión:", accuracy_score(y_test_action, y_pred_action))
print(classification_report(y_test_action, y_pred_action, target_names=le.classes_))


=== Clasificacion de Acción con RandomForest ===
Precisión: 0.9777777777777777
              precision    recall  f1-score   support

    Action 1       1.00      1.00      1.00        16
    Action 2       1.00      1.00      1.00        16
    Action 3       0.91      1.00      0.95        21
    Action 4       1.00      0.89      0.94        18
    Action 5       1.00      1.00      1.00        19

    accuracy                           0.98        90
   macro avg       0.98      0.98      0.98        90
weighted avg       0.98      0.98      0.98        90



======== Para predecir completion ==========

In [30]:
# Entrenar un regresor RandomForest para la ver el completion
reg = RandomForestRegressor(n_estimators=100, random_state=42)
reg.fit(X_train, y_train_completion)
y_pred_completion = reg.predict(X_test)

# Definir las categorías validas de las etiquetas
completion_mapping = {0.0: 0, 0.5: 1, 1.0: 2}

# Funcion para redondear las predicciones al valor de finalización más cercano (0.0, 0.5 o 1.0)
def round_to_closest_completion(value):
    return min([0.0, 0.5, 1.0], key=lambda x: abs(x - value))

y_pred_completion_rounded = np.array([round_to_closest_completion(y) for y in y_pred_completion])

# Convertir los valores de finalizacion a etiquetas discretas
y_test_completion_class = np.array([completion_mapping[val] for val in y_test_completion])
y_pred_completion_rounded_class = np.array([completion_mapping[val] for val in y_pred_completion_rounded])

# Calcular la precisión de las predicciones redondeadas tratandolas como clases
accuracy_completion = np.mean(y_test_completion_class == y_pred_completion_rounded_class)

# Calcular el mae
mae_completion = np.mean(np.abs(y_test_completion - y_pred_completion))

print("=== Regresión de Finalización con RandomForest ===")
print("MSE:", mean_squared_error(y_test_completion, y_pred_completion))
print("MAE:", mae_completion)
print("Precision:", accuracy_completion * 100, "%")

# Calcular y mostrar la matriz de confusión
conf_matrix = confusion_matrix(y_test_completion_class, y_pred_completion_rounded_class, labels=[0, 1, 2])
print("\nMatriz de Confusion:")
print(pd.DataFrame(conf_matrix, index=["Verdadero 0.0", "Verdadero 0.5", "Verdadero 1.0"], 
                   columns=["Predicho 0.0", "Predicho 0.5", "Predicho 1.0"]))

=== Regresión de Finalización con RandomForest ===
MSE: 0.05085194444444444
MAE: 0.16727777777777778
Precision: 75.55555555555556 %

Matriz de Confusion:
               Predicho 0.0  Predicho 0.5  Predicho 1.0
Verdadero 0.0            23             5             2
Verdadero 0.5             3            20             2
Verdadero 1.0             0            10            25


==================== Pruebas ====================

In [36]:
output_data = get_videos("Test_Videos")

In [37]:
data_to_csv(output_data,"pose_data_test.csv")

Pose data saved to pose_data_test.csv


In [38]:
df_test = pd.read_csv("pose_data_test.csv")
df_test = df_test.sort_values(['video', 'frame'])

X_test_new_seq = []
y_test_new_seq = []  
test_videos = df_test["video"].unique()

for video in test_videos:
    group = df_test[df_test["video"] == video]
    seq = group[feature_cols].values
    X_test_new_seq.append(seq)
    

    if "action" in df_test.columns:
        y_test_new_seq.append(group['action'].iloc[0])

y_pred_new_dtw = [predict_knn_dtw(seq, X_train_seq, y_train_seq, k=1) for seq in X_test_new_seq]

print("=== DTW kNN ===")
for video_name, predicted_action in zip(test_videos, y_pred_new_dtw):
    print(f"En el video: {video_name} -> Se predijo: {predicted_action}")

if len(y_test_new_seq) > 0:
    accuracy = accuracy_score(y_test_new_seq, y_pred_new_dtw)
    print("\nPrecision:", accuracy)


=== DTW kNN ===
En el video: Person 1 Action 1 take 1.mp4 -> Se predijo: Action 1
En el video: Person 1 Action 1 take 1.mp4_aug -> Se predijo: Action 1
En el video: Person 2 Action 1 take 2.mp4 -> Se predijo: Action 2
En el video: Person 2 Action 1 take 2.mp4_aug -> Se predijo: Action 2

Precision: 0.5


In [39]:

df_test = pd.read_csv("pose_data_test.csv")
df_test = df_test.sort_values(['video', 'frame'])

X_test_new_features = []
y_test_new_action = []
y_test_new_completion = []
test_videos = df_test["video"].unique()

for video in test_videos:
    group = df_test[df_test["video"] == video]
    seq = group[feature_cols].values
    features = extract_features_from_sequence(seq)
    X_test_new_features.append(features)
    
    if "action" in df_test.columns:
        y_test_new_action.append(group['action'].iloc[0])
    if "completion" in df_test.columns:
        y_test_new_completion.append(group['completion'].iloc[0])

X_test_new_features = np.array(X_test_new_features)

y_pred_new_action = clf.predict(X_test_new_features)
y_pred_new_completion = reg.predict(X_test_new_features)

y_pred_new_completion_rounded = np.array([round_to_closest_completion(y) for y in y_pred_new_completion])

print("=== Random Forest Predictions on pose_data_test.csv ===")
for video_name, predicted_action, predicted_completion in zip(test_videos, y_pred_new_action, y_pred_new_completion_rounded):
    predicted_action_label = le.inverse_transform([predicted_action])[0]
    print(f"Video: {video_name} -> Predicted Action: {predicted_action_label}, Predicted Completion: {predicted_completion}")

if len(y_test_new_action) > 0:
    y_test_new_action_enc = le.transform(y_test_new_action)
    accuracy_action = accuracy_score(y_test_new_action_enc, y_pred_new_action)
    print("\nAction Classification Accuracy on pose_data_test.csv:", accuracy_action)

if len(y_test_new_completion) > 0:
    y_test_new_completion_class = np.array([completion_mapping[val] for val in y_test_new_completion])
    y_pred_new_completion_class = np.array([completion_mapping[val] for val in y_pred_new_completion_rounded])

    accuracy_completion = np.mean(y_test_new_completion_class == y_pred_new_completion_class)
    print("\nCompletion Prediction Accuracy on pose_data_test.csv:", accuracy_completion)


=== Random Forest Predictions on pose_data_test.csv ===
Video: Person 1 Action 1 take 1.mp4 -> Predicted Action: Action 1, Predicted Completion: 1.0
Video: Person 1 Action 1 take 1.mp4_aug -> Predicted Action: Action 1, Predicted Completion: 1.0
Video: Person 2 Action 1 take 2.mp4 -> Predicted Action: Action 2, Predicted Completion: 0.5
Video: Person 2 Action 1 take 2.mp4_aug -> Predicted Action: Action 2, Predicted Completion: 0.5

Action Classification Accuracy on pose_data_test.csv: 0.5

Completion Prediction Accuracy on pose_data_test.csv: 1.0
