In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import json
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from itertools import combinations


# allow dataframes to show all columns
pd.set_option('display.max_columns', None)

In [None]:
df = pd.read_csv("ramp_downhill.csv")
df.head()

In [None]:
with open("MuscleJointForces_downhill.json", "r") as f:
    data = json.load(f)

In [None]:
df_all = pd.json_normalize(data)
df_all.columns = df_all.columns.str.replace(r"^results\.", "", regex=True)
df_all.head()

In [None]:
df_all['angleHip_length'] = df_all['angleHip'].apply(len)
print(df_all[['file', 'side', 'angleHip_length']])

In [None]:
# merge data
df_merged = pd.merge(df_all, df, on=["file", "side"])
df_merged.head()

In [None]:
# labels for plotting
label_map = {
    "GRF_ap": "GRF Anterior-Posterior (N)",
    "GRF_ml": "GRF Medio-Lateral (N)",
    "GRF_vert": "GRF Vertical (N)",
    "angleHip": "Hip Angle (°)",
    "angleKnee": "Knee Angle (°)",
    "angleAnkle": "Ankle Angle (°)",
    "momentHip": "Hip Moment (Nm)",
    "momentKnee": "Knee Moment (Nm)",
    "momentAnkle": "Ankle Moment (Nm)",
    "ankleFres": "Ankle Joint Reaction Force (N)",
    "kneeFres": "Knee Joint Reaction Force (N)",
    "hipFres": "Hip Joint Reaction Force (N)",
    "knee_PFc": "Patellofemoral Contact Force (N)",
    "gastrocnemius": "Gastrocnemius Force (N)",
    "soleus": "Soleus Force (N)",
    "tib_ant": "Tibialis Anterior Force (N)",
    "quadriceps": "Quadriceps Force (N)",
    "iliopsoas": "Iliopsoas Force (N)",
    "hamstrings": "Hamstrings Force (N)"
}

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 1. Definición del diccionario de colores (Esto faltaba en tu ejecución)
color_slopes = {
    '0': '#3c6d56', 
    '-6': '#1b5962', 
    '-12': '#103f60', 
    '-18': '#011959'
}

# 2. Definición del label_map
label_map = {
    "GRF_ap": "GRF Anterior-Posterior (N)",
    "GRF_ml": "GRF Medio-Lateral (N)",
    "GRF_vert": "GRF Vertical (N)",
    "angleHip": "Hip Angle (°)",
    "angleKnee": "Knee Angle (°)",
    "angleAnkle": "Ankle Angle (°)",
    "momentHip": "Hip Moment (Nm)",
    "momentKnee": "Knee Moment (Nm)",
    "momentAnkle": "Ankle Moment (Nm)",
    "ankleFres": "Ankle Joint Reaction Force (N)",
    "kneeFres": "Knee Joint Reaction Force (N)",
    "hipFres": "Hip Joint Reaction Force (N)",
    "knee_PFc": "Patellofemoral Contact Force (N)",
    "gastrocnemius": "Gastrocnemius Force (N)",
    "soleus": "Soleus Force (N)",
    "tib_ant": "Tibialis Anterior Force (N)",
    "quadriceps": "Quadriceps Force (N)",
    "iliopsoas": "Iliopsoas Force (N)",
    "hamstrings": "Hamstrings Force (N)"
}

variables_a_tracer = ['angleKnee', 'angleHip', 'angleAnkle', 'GRF_vert', 'knee_PFc', 'kneeFres']

# 3. Ordenar pendientes de 0 a -18 (Descendente numéricamente)
slopes = sorted(df_merged['slope'].unique(), reverse=True)

for var in variables_a_tracer:
    # Crear la figura
    fig, axes = plt.subplots(1, 4, figsize=(15, 4), sharex=True, sharey=True)
    axes = axes.flatten()
    
    y_label = label_map.get(var, var)
    
    for i, slope in enumerate(slopes):
        ax = axes[i]
        df_sub = df_merged[df_merged['slope'] == slope]
        
        # Usamos str(int(slope)) para que coincida con las llaves del diccionario '0', '-6', etc.
        color = color_slopes.get(str(int(slope)), "grey")

        for _, row in df_sub.iterrows():
            if isinstance(row[var], (list, np.ndarray)):
                ax.plot(row[var], color=color, alpha=0.7)
        
        ax.set_title(f"Slope: {slope}°")
        ax.set_xlabel('Frames')
        if i == 0: # Solo poner el label en el primero para que no se amontone
            ax.set_ylabel(y_label)

    # Limpiar ejes si hay menos de 4 pendientes
    for j in range(len(slopes), len(axes)):
        fig.delaxes(axes[j])

    fig.suptitle(f"Analysis of {y_label}", y=1.05, fontsize=14)
    fig.tight_layout()
    plt.show()

In [None]:
df = df_merged.copy()

# 0. Datasets

* Datasets for peak 1

In [None]:
extracted_rows_p1 = []

# Loop through each trial
for idx, row in df.iterrows():
    # Safety check: make sure the list is not empty
    if len(row['GRF_vert']) == 0:
        continue
        
    # taking 30% of the graph (approx to find the first peak)
    stance_limit = int(len(row['GRF_vert']) * 0.6)
    mid_stance = int(stance_limit / 2)
    
    # Safety check for very short cycles
    if mid_stance <= 0:
        continue
    
    # Find peak indices
    idx_peak_grf = np.argmax(row['GRF_vert'][:mid_stance]) 
    
    # Function to extract ALL data at a specific index
    def get_all_data_at_index(index, event_name):
        data_point = {}
        # Manually add Event_Type to ensure it exists
        data_point['Event_Type'] = event_name
        
        for col in df.columns:
            val = row[col]
            # If it is a list (time-series data) -> take the value at time T
            if isinstance(val, (list, np.ndarray)):
                try:
                    data_point[col] = val[index]
                except IndexError:
                    data_point[col] = np.nan
            # If it is a constant (ID, Weight, Age, etc.) -> keep it as is
            else:
                data_point[col] = val
                
        return data_point
    

    extracted_rows_p1.append(get_all_data_at_index(idx_peak_grf, 'First_Peak_GRF'))

# Create the final DataFrame
df_peak1 = pd.DataFrame(extracted_rows_p1)

# --- COLUMN REORGANIZATION ---
# Define priority columns
priority_cols = ['id', 'condition', 'Event_Type']

# Retrieve all other columns that are NOT in the priority list
other_cols = [c for c in df_peak1.columns if c not in priority_cols]

# Combine everything: id first, then Event_Type, then the rest
final_order = priority_cols + other_cols
df_peak1 = df_peak1[final_order]

# --- SEPARATION INTO TWO DATAFRAMES ---

# 1. DataFrame for 'self-selected'
df_self_selected = df_peak1[df_peak1['condition'] == 'self-selected'].copy()

# 2. DataFrame for 'constant'
df_constant = df_peak1[df_peak1['condition'] == 'constant'].copy()

# --- DISPLAY RESULTS ---
print(f"Number of rows (Self Selected): {len(df_self_selected)}")
print(f"Number of rows (Constant): {len(df_constant)}")

print("\n--- Preview Self Selected ---")
display(df_self_selected.head(20))

print("\n--- Preview Constant ---")
display(df_constant.head(20))

* Datasets for peak 2

In [None]:
extracted_rows_p2 = []

# Loop through each trial
for idx, row in df.iterrows():
    # Safety check: make sure the list is not empty
    if len(row['GRF_vert']) == 0:
        continue
        
    stance_limit = int(len(row['GRF_vert']) * 0.6)  # ~60% of the cycle
    mid_stance = int(stance_limit / 2)              # ~30% of the cycle
    
    # Safety check for very short cycles
    if mid_stance >= stance_limit:
        continue
    
    # --- MAIN CHANGE HERE ---
    # Define the window of interest: from 30% to 60%
    slice_grf = row['GRF_vert'][mid_stance:stance_limit]
    
    # Safety check if the slice is empty
    if len(slice_grf) == 0:
        continue

    # Find peak indices
    idx_peak_grf = np.argmax(slice_grf) + mid_stance 
    
    # Function to extract ALL data at a specific index
    def get_all_data_at_index(index, event_name):
        data_point = {}
        # Manually add Event_Type to ensure it exists
        data_point['Event_Type'] = event_name
        
        for col in df.columns:
            val = row[col]
            # If it is a list (time-series data) -> take the value at time T
            if isinstance(val, (list, np.ndarray)):
                try:
                    data_point[col] = val[index]
                except IndexError:
                    data_point[col] = np.nan
            # If it is a constant (ID, Weight, Age, etc.) -> keep it as is
            else:
                data_point[col] = val
                
        return data_point
    
    # Append data with the new event names
    extracted_rows_p2.append(get_all_data_at_index(idx_peak_grf, 'Second_Peak_GRF'))

# Create the final DataFrame for the 2nd peak
df_peak2 = pd.DataFrame(extracted_rows_p2)

# --- COLUMN REORGANIZATION ---
# Define priority columns (Added 'condition')
priority_cols = ['id', 'condition', 'Event_Type']

# Retrieve all other columns that are NOT in the priority list
other_cols = [c for c in df_peak2.columns if c not in priority_cols]

# Combine everything: id first, then Event_Type, then the rest
final_order = priority_cols + other_cols
df_peak2 = df_peak2[final_order]

# --- SEPARATION INTO TWO DATAFRAMES ---

# 1. DataFrame for 'self-selected'
df_peak2_self_selected = df_peak2[df_peak2['condition'] == 'self-selected'].copy()

# 2. DataFrame for 'constant'
df_peak2_constant = df_peak2[df_peak2['condition'] == 'constant'].copy()

# --- DISPLAY RESULTS ---
print(f"Number of rows (Self Selected - Peak 2): {len(df_peak2_self_selected)}")
print(f"Number of rows (Constant - Peak 2): {len(df_peak2_constant)}")

print("\n--- Preview Self Selected (Peak 2) ---")
display(df_peak2_self_selected.head(20))

print("\n--- Preview Constant (Peak 2) ---")
display(df_peak2_constant.head(20))

In [None]:
# List of every participants
tous_les_participants = sorted(df_merged['id'].unique())
print(f"Participants : {tous_les_participants}")
print(f"Total number of participants : {len(tous_les_participants)}")

# Visualization for each participant and each trial

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

def plot_subject_analysis(participant_id, df_data):
    # 1. Configuración de ID y filtrado
    p_id = str(participant_id).strip()
    df_sub = df_data[df_data['id'].astype(str).str.strip() == p_id].copy()
    
    if df_sub.empty:
        print(f"No data found for participant: {p_id}")
        return

    variables = ['GRF_vert', 'knee_PFc', 'angleKnee', 'angleAnkle', 'angleHip', 'quadriceps', 'GRF_ap']
    slopes = sorted(df_sub['slope'].unique(), reverse=True)
    
    # --- COLOR EXPLANATION ---
    print(f"\n{' BIOMECHANICAL ANALYSIS GUIDE ':^80}")
    print("-" * 80)
    print("PHASES (Background Shading):")
    print("  - Orange Area : Impact Phase (0-30% of stance)")
    print("  - Blue Area   : Propulsion Phase (30-60% of stance)")
    print("\nPEAK TIMING MARKERS:")
    print("  - RED Circles (●) : Vertical GRF Peak Frame")
    print("  - GREEN Cross (X) : Patellofemoral Force Peak Frame")
    print("-" * 80)

    # 2. Resumen Estadístico Detallado (Frames y Delays)
    delay_records = []
    for slope in slopes:
        df_slope = df_sub[df_sub['slope'] == slope]
        for _, row in df_slope.iterrows():
            g, p = np.array(row['GRF_vert']), np.array(row['knee_PFc'])
            l30, l60 = int(len(g)*0.3), int(len(g)*0.6)
            
            # P1 Detection
            f_g1 = np.argmax(g[:l30])
            f_p1 = np.argmax(p[:l30])
            
            # P2 Detection
            f_g2 = np.argmax(g[l30:l60]) + l30
            f_p2 = np.argmax(p[l30:l60]) + l30
            
            delay_records.append({
                'Slope': slope,
                'P1_GRF_Frame': f_g1,
                'P1_PFC_Frame': f_p1,
                'P1_Delay': f_p1 - f_g1,
                'P2_GRF_Frame': f_g2,
                'P2_PFC_Frame': f_p2,
                'P2_Delay': f_p2 - f_g2
            })
    
    print("\nDETAILED PEAK TIMING SUMMARY (Average Frames per Slope)")
    # Agrupamos por Slope y calculamos la media de todos los ensayos
    summary_df = pd.DataFrame(delay_records).groupby('Slope').mean().round(2)
    print(summary_df)
    print("\n" + "="*80)

    # 3. Generación de Gráficas por Variable
    c_g1, c_g2 = '#FFADAD', '#8B0000' # Reds
    c_p1, c_p2 = '#B7E4C7', '#1B4332' # Greens

    for var in variables:
        fig, axes = plt.subplots(1, len(slopes), figsize=(22, 5), squeeze=False)
        var_display_name = label_map.get(var, var)
        
        for i, slope in enumerate(slopes):
            ax = axes[0, i]
            df_plot = df_sub[df_sub['slope'] == slope]
            
            for t_idx, (_, row) in enumerate(df_plot.iterrows()):
                y_val = np.array(row[var])
                g_ref = np.array(row['GRF_vert'])
                p_ref = np.array(row['knee_PFc'])
                
                l30, l60 = int(len(y_val)*0.3), int(len(y_val)*0.6)
                
                # Shading
                if t_idx == 0:
                    ax.axvspan(0, l30, color='orange', alpha=0.1)
                    ax.axvspan(l30, l60, color='blue', alpha=0.05)
                
                ax.plot(y_val, alpha=0.5, linewidth=1.2)
                
                # Timing detection
                ig1, ig2 = np.argmax(g_ref[:l30]), np.argmax(g_ref[l30:l60]) + l30
                ip1, ip2 = np.argmax(p_ref[:l30]), np.argmax(p_ref[l30:l60]) + l30

                # Markers
                ax.scatter([ig1, ig2], [y_val[ig1], y_val[ig2]], c=[c_g1, c_g2], 
                           s=80, edgecolors='black', label='GRF Peak' if t_idx==0 and i==0 else "", zorder=5)
                ax.scatter([ip1, ip2], [y_val[ip1], y_val[ip2]], c=[c_p1, c_p2], 
                           marker='X', s=100, edgecolors='black', label='PFC Peak' if t_idx==0 and i==0 else "", zorder=6)

            ax.set_title(f"Slope {slope}°", fontweight='bold')
            if i == 0: ax.set_ylabel(var_display_name)
            ax.grid(True, linestyle='--', alpha=0.3)
        
        handles, labels = ax.get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        fig.legend(by_label.values(), by_label.keys(), loc='upper center', ncol=6, bbox_to_anchor=(0.5, 1.08))
        
        plt.tight_layout()
        plt.show()

# EJECUCIÓN
plot_subject_analysis(participant_id=11, df_data=df)

In [None]:
plot_subject_analysis(12,df)

In [None]:
plot_subject_analysis(13,df)

In [None]:
plot_subject_analysis(14,df)

In [None]:
plot_subject_analysis(15,df)

In [None]:
plot_subject_analysis(16,df)

In [None]:
plot_subject_analysis(17,df)

In [None]:
plot_subject_analysis(18,df)

In [None]:
plot_subject_analysis(20,df)

In [None]:
plot_subject_analysis(21,df)

In [None]:
plot_subject_analysis(22,df)

In [None]:
plot_subject_analysis(23,df)

In [None]:
plot_subject_analysis(24,df)

In [None]:
plot_subject_analysis(25,df)

In [None]:
plot_subject_analysis(26,df)

In [None]:
plot_subject_analysis(27,df)

In [None]:
plot_subject_analysis(29,df)

In [None]:
plot_subject_analysis(30,df)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from scipy.interpolate import interp1d

def plot_population_analysis_with_peaks(df_data, variables=None):
    if variables is None:
        variables = ['GRF_vert', 'knee_PFc', 'angleKnee', 'quadriceps', 'GRF_ap']
    
    unique_slopes = sorted(df_data['slope'].unique(), reverse=True)
    participants = sorted(df_data['id'].unique())
    palette = sns.color_palette("tab20", len(participants))
    
    # Peak Colors
    c_g1, c_g2 = '#FF4D4D', '#8B0000' # Reds
    c_p1, c_p2 = '#4DFF4D', '#006400' # Greens

    # --- EXPLANATION GUIDE ---
    print(f"\n{' POPULATION BIOMECHANICAL GUIDE ':^80}")
    print("-" * 80)
    print("SHADING: Orange (0-30% Impact) | Blue (30-60% Propulsion)")
    print("MARKERS: Circle = GRF Peak Timing | X = PFC Peak Timing")
    print("Each line represents the average of one participant.")
    print("-" * 80)

    for var in variables:
        fig, axes = plt.subplots(1, len(unique_slopes), figsize=(24, 6), sharey=False, squeeze=False)
        var_display_name = label_map.get(var, var)
        
        for i, slope in enumerate(unique_slopes):
            ax = axes[0, i]
            
            # Sombreado de fondo
            ax.axvspan(0, 30, color='orange', alpha=0.1)
            ax.axvspan(30, 60, color='blue', alpha=0.05)

            for p_idx, p_id in enumerate(participants):
                df_sub = df_data[(df_data['id'] == p_id) & (df_data['slope'] == slope)]
                if df_sub.empty: continue
                
                norm_trials = []
                peak_positions = {'g1': [], 'g2': [], 'p1': [], 'p2': []}

                for _, row in df_sub.iterrows():
                    y = np.array(row[var])
                    g_ref = np.array(row['GRF_vert'])
                    p_ref = np.array(row['knee_PFc'])
                    
                    # 1. Detect peaks in original frames
                    l30, l60 = int(len(g_ref)*0.3), int(len(g_ref)*0.6)
                    ig1, ig2 = np.argmax(g_ref[:l30]), np.argmax(g_ref[l30:l60]) + l30
                    ip1, ip2 = np.argmax(p_ref[:l30]), np.argmax(p_ref[l30:l60]) + l30
                    
                    # 2. Convert peak frames to percentage (%)
                    peak_positions['g1'].append((ig1 / len(g_ref)) * 100)
                    peak_positions['g2'].append((ig2 / len(g_ref)) * 100)
                    peak_positions['p1'].append((ip1 / len(p_ref)) * 100)
                    peak_positions['p2'].append((ip2 / len(p_ref)) * 100)

                    # 3. Normalize variable data
                    x_old = np.linspace(0, 100, len(y))
                    x_new = np.linspace(0, 100, 101)
                    norm_trials.append(interp1d(x_old, y, kind='linear')(x_new))
                
                # Mean trajectory and mean peak positions
                mean_y = np.mean(norm_trials, axis=0)
                m_g1, m_g2 = np.mean(peak_positions['g1']), np.mean(peak_positions['g2'])
                m_p1, m_p2 = np.mean(peak_positions['p1']), np.mean(peak_positions['p2'])

                # --- PLOT LINE ---
                line_color = palette[p_idx]
                ax.plot(x_new, mean_y, color=line_color, alpha=0.6, linewidth=1.5, label=f"Subj {p_id}" if i==0 else "")

                # --- PLOT PEAKS (On the mean line at the mean % frame) ---
                # GRF Peaks (Circles)
                ax.scatter([m_g1, m_g2], [mean_y[int(m_g1)], mean_y[int(m_g2)]], 
                           color=[c_g1, c_g2], s=50, edgecolors='white', zorder=5)
                # PFC Peaks (X)
                ax.scatter([m_p1, m_p2], [mean_y[int(m_p1)], mean_y[int(m_p2)]], 
                           color=[c_p1, c_p2], marker='X', s=70, edgecolors='black', zorder=6)

            ax.set_title(f"Slope {slope}°", fontweight='bold')
            ax.set_xlabel("% of Stance")
            if i == 0: ax.set_ylabel(var_display_name)
            ax.grid(True, linestyle=':', alpha=0.5)

        # Legend
        handles, labels = ax.get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        fig.legend(by_label.values(), by_label.keys(), loc='center right', bbox_to_anchor=(1.0, 0.5), title="Participants")
        
        plt.suptitle(f"Population Comparison: {var_display_name}\nRed=GRF Timing | Green=PFC Timing", y=1.05, fontsize=16, fontweight='bold')
        plt.tight_layout(rect=[0, 0.03, 0.93, 0.95])
        plt.show()

# EJECUCIÓN
plot_population_analysis_with_peaks(df)

In [None]:
import pandas as pd
import numpy as np

def generate_super_summary_v4(df_data):
    all_records = []
    # Obtenemos IDs y Slopes únicos
    participants = sorted(df_data['id'].unique())
    
    for p_id in participants:
        df_sub = df_data[df_data['id'] == p_id]
        slopes = sorted(df_sub['slope'].unique(), reverse=True)
        
        for slope in slopes:
            df_slope = df_sub[df_sub['slope'] == slope]
            
            # Listas para almacenar valores de cada ensayo
            temp_p1_g, temp_p1_p = [], []
            temp_p2_g, temp_p2_p = [], []
            
            for _, row in df_slope.iterrows():
                g = np.array(row['GRF_vert'])
                p = np.array(row['knee_PFc'])
                
                # Definición de ventanas (0-30% y 30-60%)
                l30, l60 = int(len(g)*0.3), int(len(g)*0.6)
                
                # Detección de frames
                f_g1, f_p1 = np.argmax(g[:l30]), np.argmax(p[:l30])
                f_g2 = np.argmax(g[l30:l60]) + l30
                f_p2 = np.argmax(p[l30:l60]) + l30
                
                temp_p1_g.append(f_g1); temp_p1_p.append(f_p1)
                temp_p2_g.append(f_g2); temp_p2_p.append(f_p2)
            
            # Calcular promedios para este participante en esta pendiente
            m_g1, m_p1 = np.mean(temp_p1_g), np.mean(temp_p1_p)
            m_g2, m_p2 = np.mean(temp_p2_g), np.mean(temp_p2_p)
            d1, d2 = m_p1 - m_g1, m_p2 - m_g2
            
            all_records.append({
                'ID': p_id,
                'Slope': slope,
                'P1_GRF_Fr': round(m_g1, 2),
                'P1_PFC_Fr': round(m_p1, 2),
                'P1_Delay': d1,
                'P2_GRF_Fr': round(m_g2, 2),
                'P2_PFC_Fr': round(m_p2, 2),
                'P2_Delay': d2
            })

    # Crear el DataFrame resumen
    super_df = pd.DataFrame(all_records)

    # Función de marcado con asterisco (Criterio: > 4 o < 0)
    def apply_asterisk(val):
        if val > 4 or val < 0:
            return f"{val:.2f}*"
        return f"{val:.2f}"

    # Aplicar formato a las columnas de Delay
    super_df['P1_Delay'] = super_df['P1_Delay'].apply(apply_asterisk)
    super_df['P2_Delay'] = super_df['P2_Delay'].apply(apply_asterisk)
    
    return super_df

# EJECUCIÓN
final_table = generate_super_summary_v4(df)

# Visualización
print(f"\n{' BIOMECHANICAL PEAK DELAY SUMMARY ':=^95}")
print(f"{'Criterio *: Delay > 4 frames o Negativo':^95}")
print("-" * 95)
print(final_table.to_string(index=False))
print("-" * 95)
    
    
    