In [None]:
"""
Script collection: limpieza, unificación y entrenamiento para tus CSV de Motive (MoCap)
Contiene 4 secciones / scripts (en un solo archivo para que puedas copiarlos separadamente si lo prefieres):

1) parse_single_csv(input_path, label, output_path)
- Parsea un CSV "raw" exportado por Motive (una línea por frame con tokens mixtos)
- Extrae solo los marcadores válidos (los de la mano) y rellena con NaN los faltantes
- Guarda un csv limpio por take

2) process_all(dataset_root, output_folder)
- Recorre la estructura de carpetas esperada (/rock/, /paper/, /scissors/)
- Llama a parse_single_csv para cada take y concatena todo en mocap_dataset_final.csv

3) train_models(mocap_csv_path)
- Carga mocap_dataset_final.csv
- Imputa NaN (SimpleImputer - mean), estandariza, entrena SVM, RandomForest y XGBoost (si instalado)
- Guarda modelos y muestra métricas

4) prepare_fusion_example(mocap_csv, rgb_csv)
- Ejemplo de cómo preparar la fusión early-level si tienes coincidencia por "image_file" o por orden/label

USO RÁPIDO (desde terminal):
- Para procesar todo: python motive_mocap_processing_and_training.py --process_all /ruta/a/dataset_root /ruta/de/salida
- Para entrenar: python motive_mocap_processing_and_training.py --train /ruta/de/salida/mocap_dataset_final.csv

Ajustes que puedes modificar en el archivo:
- valid_labels: lista de labels exactos que llevarán a columnas
- tolerance/handling de tokens
- strategy de imputación (ahora usa mean)

NOTA: Asumimos que los labels en Motive son exactamente: P1,I1,M1,A1,E1, I2,M2,A2, P2,I3,M3,A3,E2, R,C,U
(16 marcadores). Si encuentras diferencias o prefieres otro orden, edita `MARKER_ORDER`.
"""

import os
import argparse
import csv
import math
from typing import List, Dict

import pandas as pd
import numpy as np

# ML libs
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib

# Try import xgboost (opcional)
try:
from xgboost import XGBClassifier
XGBOOST_AVAILABLE = True
except Exception:
XGBOOST_AVAILABLE = False

In [None]:
# ---------------------------
# CONFIG: marcadores válidos
# ---------------------------
# Lista exacta de labels esperados en los CSV (según tu confirmación)
MARKER_ORDER = [
"P1","I1","M1","A1","E1", # Distales
"I2","M2","A2", # PIP
"P2","I3","M3","A3","E2", # MCP (nudillos)
"R","C","U" # Muñeca
]
# Comprobar que tenemos 16 marcadores
assert len(MARKER_ORDER) == 16, f"Expected 16 markers, got {len(MARKER_ORDER)}"


# Column names for final dataframe
def make_columns(marker_order: List[str]) -> List[str]:
cols = []
for m in marker_order:
cols += [f"{m}_x", f"{m}_y", f"{m}_z"]
return cols


FINAL_COLUMNS = ["label"] + make_columns(MARKER_ORDER)

In [None]:
# ---------------------------
# Si token not in marker_order but next token is in marker_order, quizás formato id,label,x,y,z
if i+1 < n and tokens[i+1].strip() in marker_order:
label = tokens[i+1].strip()
try:
x = float(tokens[i+2])
y = float(tokens[i+3])
z = float(tokens[i+4])
parsed[f"{label}_x"] = x
parsed[f"{label}_y"] = y
parsed[f"{label}_z"] = z
i += 5
continue
except Exception:
i += 1
continue
i += 1
return parsed




def parse_single_csv(input_path: str, output_path: str, label_value: str = None) -> pd.DataFrame:
"""
Lee un CSV exportado por Motive (raw) y devuelve un DataFrame con columnas FINAL_COLUMNS.
- input_path: ruta al archivo raw
- output_path: si se indica se guarda el csv limpio
- label_value: si se indica se añade la columna 'label' con ese valor para todas las filas
"""
rows = []
with open(input_path, 'r', encoding='utf-8', errors='ignore') as f:
reader = csv.reader(f)
# Intentamos detectar si el CSV ya está tokenizado en columnas (caso raro)
# Si la primera fila tiene muchos campos y contiene alguno de los markers en header, no hacemos parsing raw
# Pero: asumimos formato raw como en tus ejemplos: una fila con muchos tokens
for raw in reader:
# raw is a list resulting from csv.reader splitting by commas
# convertir cada celda a string
tokens = [str(x).strip() for x in raw if str(x).strip() != ""]
if len(tokens) == 0:
continue
# parse line
parsed = parse_motive_line_to_dict(tokens, MARKER_ORDER)
# construir fila con NaN por defecto
row = {c: np.nan for c in FINAL_COLUMNS}
if label_value is not None:
row['label'] = label_value
# completar con valores encontrados
for k, v in parsed.items():
if k in row:
row[k] = v
rows.append(row)
df = pd.DataFrame(rows, columns=FINAL_COLUMNS)
if output_path is not None:
df.to_csv(output_path, index=False)
return df

In [None]:
# ---------------------------
# Process all dataset
# ---------------------------


def process_all(dataset_root: str, output_folder: str) -> str:
"""
Recorre dataset_root/rock, /paper, /scissors. Para cada CSV llama parse_single_csv.
Guarda cada cleaned csv en output_folder/cleaned/ y concatena todo en output_folder/mocap_dataset_final.csv
Devuelve la ruta al CSV final.
"""
os.makedirs(output_folder, exist_ok=True)
cleaned_dir = os.path.join(output_folder, 'cleaned')
os.makedirs(cleaned_dir, exist_ok=True)


final_dfs = []
for label in ['rock', 'paper', 'scissors']:
folder = os.path.join(dataset_root, label)
if not os.path.isdir(folder):
print(f"Warning: folder {folder} not found, saltando")
continue
for fname in os.listdir(folder):
if not fname.lower().endswith('.csv'):
continue
inpath = os.path.join(folder, fname)
outname = f"{label}_{os.path.splitext(fname)[0]}_cleaned.csv"
outpath = os.path.join(cleaned_dir, outname)
print(f"Procesando {inpath} -> {outpath}")
df = parse_single_csv(inpath, outpath, label_value=label)
final_dfs.append(df)
if len(final_dfs) == 0:
raise RuntimeError("No se procesaron archivos. Revisa la estructura de carpetas o nombres.")
df_all = pd.concat(final_dfs, ignore_index=True)
final_csv = os.path.join(output_folder, 'mocap_dataset_final.csv')
df_all.to_csv(final_csv, index=False)
print(f"CSV final guardado en: {final_csv}")
return final_csv

In [None]:
# ---------------------------
# Imputar NaN con mean (Option A)
imputer = SimpleImputer(strategy='mean')
X_imp = imputer.fit_transform(X)


# Escalar
scaler = StandardScaler()
Xs = scaler.fit_transform(X_imp)


# Encode labels
le = LabelEncoder()
y_enc = le.fit_transform(y)


X_train, X_test, y_train, y_test = train_test_split(Xs, y_enc, test_size=test_size, random_state=random_state, stratify=y_enc)


# SVM
svm = SVC(kernel='rbf', probability=True, random_state=random_state)
svm.fit(X_train, y_train)
y_pred = svm.predict(X_test)
print('\n--- SVM Results ---')
print(classification_report(y_test, y_pred, target_names=le.classes_))
print('Confusion matrix:')
print(confusion_matrix(y_test, y_pred))


# RandomForest
rf = RandomForestClassifier(n_estimators=200, random_state=random_state)
rf.fit(X_train, y_train)
y_pred_rf = rf.predict(X_test)
print('\n--- RandomForest Results ---')
print(classification_report(y_test, y_pred_rf, target_names=le.classes_))
print('Confusion matrix:')
print(confusion_matrix(y_test, y_pred_rf))


# XGBoost (opcional)
if XGBOOST_AVAILABLE:
xgb = XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', random_state=random_state)
xgb.fit(X_train, y_train)
y_pred_xgb = xgb.predict(X_test)
print('\n--- XGBoost Results ---')
print(classification_report(y_test, y_pred_xgb, target_names=le.classes_))
print('Confusion matrix:')
print(confusion_matrix(y_test, y_pred_xgb))


# Guardar artefactos
os.makedirs(out_models_dir, exist_ok=True)
joblib.dump(imputer, os.path.join(out_models_dir, 'imputer.joblib'))
joblib.dump(scaler, os.path.join(out_models_dir, 'scaler.joblib'))
joblib.dump(le, os.path.join(out_models_dir, 'label_encoder.joblib'))
joblib.dump(svm, os.path.join(out_models_dir, 'svm_model.joblib'))
joblib.dump(rf, os.path.join(out_models_dir, 'rf_model.joblib'))
if XGBOOST_AVAILABLE:
joblib.dump(xgb, os.path.join(out_models_dir, 'xgb_model.joblib'))
print(f"Modelos y artefactos guardados en {out_models_dir}")

In [None]:
# ---------------------------
# Ejemplo de preparación para fusión con RGB
# ---------------------------


def prepare_fusion_example(mocap_csv: str, rgb_csv: str, out_csv: str):
"""
Ejemplo simple de fusión: si RGB CSV contiene 'image_file' que incluye un frame id comparable
o si ambas tablas comparten la columna 'label' y el mismo número de filas por take, podemos
concatenar por orden o join por nombre de archivo.


Este es solo un ejemplo: en tu caso lo ideal es tener timestamps y usar merge_asof.
"""
df_m = pd.read_csv(mocap_csv)
df_r = pd.read_csv(rgb_csv)


# Caso A: si ambos tienen 'Time' -> usar merge_asof
if 'Time' in df_m.columns and 'Time' in df_r.columns:
df_m = df_m.sort_values('Time')
df_r = df_r.sort_values('Time')
merged = pd.merge_asof(df_r, df_m, on='Time', direction='nearest', tolerance=0.02)
merged.to_csv(out_csv, index=False)
print('Fusión por Time guardada en', out_csv)
return


# Caso B: si no hay timestamps, pero los archivos están ordenados y tienen la misma cantidad de frames por take
# WARNING: solo válido si estás seguro que las filas corresponden en orden
min_len = min(len(df_r), len(df_m))
merged = pd.concat([df_r.iloc[:min_len].reset_index(drop=True), df_m.iloc[:min_len].reset_index(drop=True)], axis=1)
merged.to_csv(out_csv, index=False)
print('Fusión por orden guardada en', out_csv)

In [None]:
# ---------------------------
# CLI wrapper
# ---------------------------


def main():
parser = argparse.ArgumentParser(description='Procesamiento y entrenamiento MoCap -> CSV limpio y modelos')
parser.add_argument('--process_all', nargs=2, help='Procesar dataset: <dataset_root> <output_folder>')
parser.add_argument('--train', nargs=1, help='Entrenar modelos: <mocap_csv_path>')
parser.add_argument('--prepare_fusion', nargs=3, help='Preparar fusión: <mocap_csv> <rgb_csv> <out_csv>')


args = parser.parse_args()


if args.process_all:
dataset_root, output_folder = args.process_all
process_all(dataset_root, output_folder)
return
if args.train:
mocap_csv = args.train[0]
train_models(mocap_csv)
return
if args.prepare_fusion:
mocap_csv, rgb_csv, out_csv = args.prepare_fusion
prepare_fusion_example(mocap_csv, rgb_csv, out_csv)
return


parser.print_help()


if __name__ == '__main__':
main()

In [None]:
''''
Coloca tu dataset con la estructura /dataset_root/rock/*.csv, /dataset_root/paper/*.csv, /dataset_root/scissors/*.csv.

Desde la terminal ejecuta (ejemplo):

python motive_mocap_processing_and_training.py --process_all /ruta/a/dataset_root /ruta/de/salida

luego: python motive_mocap_processing_and_training.py --train /ruta/de/salida/mocap_dataset_final.csv

Si necesitas ajustar nombres de markers o la estrategia de imputación, edita la lista MARKER_ORDER y/o la sección de imputación en el script.
''''