In [2]:
"""
头部运动角度分析 - 分析pitch, yaw, roll相对于基线的角度偏差
使用前20秒的均值作为基线，计算后续帧与基线的角度差
"""

import pandas as pd
import numpy as np
from scipy import stats
from statsmodels.stats.multitest import multipletests
from pathlib import Path
import os

# 视频帧率 (fps) - 根据实际视频帧率修改
VIDEO_FPS = 30.0  # 默认30fps，如果视频帧率不同请修改此值

# 基线计算参数
BASELINE_DURATION = 20.0  # 基线时长（秒）
BASELINE_FRAMES = int(BASELINE_DURATION * VIDEO_FPS)  # 基线帧数（20秒 × 30fps = 600帧）

# 降采样参数（用于基线后的数据）
DOWNSAMPLE_WINDOW = 15  # 滑动窗口大小（帧数），用于降采样
DOWNSAMPLE_METHOD = 'mean'  # 降采样方法: 'mean' (均值) 或 'median' (中值)
REMOVE_OUTLIERS = True  
OUTLIER_IQR_K = 1.5 

def load_demographics():
    """加载人口统计学数据"""
    df = pd.read_csv('result/demographics.csv')
    # 1 for ASD group, 0 for TD group
    df['组别'] = df['组别'].replace({1: 'ASD', 0: 'TD'})
    return df

def replace_outliers(group_data, k=OUTLIER_IQR_K):
    Q1 = group_data.quantile(0.25)
    Q3 = group_data.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - k * IQR
    upper_bound = Q3 + k * IQR
    
    outlier_mask = (group_data < lower_bound) | (group_data > upper_bound)
    cleaned_data = group_data.copy()
    
    if outlier_mask.sum() > 0:
        median_value = group_data.median()
        cleaned_data[outlier_mask] = median_value
    
    return cleaned_data

def downsample_signal(signal, window_size=DOWNSAMPLE_WINDOW, method=DOWNSAMPLE_METHOD):
    """
    使用滑动窗口对信号进行降采样，处理NaN值
    
    Parameters:
    -----------
    signal : array-like
        原始信号数组（可能包含NaN）
    window_size : int
        滑动窗口大小（帧数）
    method : str
        降采样方法: 'mean' (均值) 或 'median' (中值)
    
    Returns:
    --------
    array : 降采样后的信号（NaN值表示该窗口内没有有效数据）
    """
    if len(signal) < window_size:
        return signal
    
    downsampled = []
    for i in range(0, len(signal), window_size):
        window = signal[i:i+window_size]
        # 只使用非NaN值
        valid_values = window[~np.isnan(window)]
        
        if len(valid_values) == 0:
            # 如果窗口内全是NaN，输出NaN
            downsampled.append(np.nan)
        elif method == 'mean':
            downsampled.append(np.mean(valid_values))
        elif method == 'median':
            downsampled.append(np.median(valid_values))
        else:
            raise ValueError(f"Unknown method: {method}. Use 'mean' or 'median'")
    
    return np.array(downsampled)

def calculate_baseline_deviation(angles, baseline_frames=BASELINE_FRAMES, 
                                  window_size=DOWNSAMPLE_WINDOW, method=DOWNSAMPLE_METHOD):
    """
    计算角度相对于基线的平均偏差
    
    Parameters:
    -----------
    angles : array-like
        角度数组（单位: 弧度，可能包含NaN）
    baseline_frames : int
        用于计算基线的帧数（前N帧）
    window_size : int
        降采样窗口大小（帧数）
    method : str
        降采样方法: 'mean' (均值) 或 'median' (中值)
    
    Returns:
    --------
    float : 平均偏差（弧度），如果数据不足则返回NaN
    """
    if len(angles) < baseline_frames:
        return np.nan
    
    # 提取基线数据（前baseline_frames帧）
    baseline_data = angles[:baseline_frames]
    baseline_valid = baseline_data[~np.isnan(baseline_data)]
    
    # 如果基线数据不足，返回NaN
    if len(baseline_valid) == 0:
        return np.nan
    
    # 计算基线（均值）
    baseline = np.mean(baseline_valid)
    
    # 提取后续数据（基线之后的所有帧）
    subsequent_data = angles[baseline_frames:]
    
    # 对后续数据进行降采样以减少噪声
    subsequent_downsampled = downsample_signal(subsequent_data, window_size, method)
    
    # 去除NaN值
    subsequent_valid = subsequent_downsampled[~np.isnan(subsequent_downsampled)]
    
    # 如果后续数据不足，返回NaN
    if len(subsequent_valid) == 0:
        return np.nan
    
    # 计算偏差（后续帧 - 基线）
    deviations = subsequent_valid - baseline
    
    # 计算平均偏差（一个人的平均值）
    mean_deviation = np.mean(deviations)
    
    return mean_deviation

def analyze_head_angles():
    """分析头部运动角度相对于基线的偏差"""
    # 加载人口统计学数据
    demo_df = load_demographics()
    name_to_group = dict(zip(demo_df['姓名'], demo_df['组别']))
    
    # 存储结果
    results = []
    
    # 统计信息
    total_subjects = 0
    skipped_no_group = 0
    skipped_no_file = 0
    skipped_no_pose = 0
    skipped_no_data = 0
    processed = 0
    
    # 遍历所有受试者
    au_dir = Path('result/AUs')
    
    # 先统计总受试者数（用于显示进度）
    all_subject_dirs = [d for d in au_dir.iterdir() if d.is_dir()]
    total_subjects = len(all_subject_dirs)
    
    print(f"找到 {total_subjects} 个受试者文件夹，开始处理...\n")
    
    for idx, subject_dir in enumerate(all_subject_dirs, 1):
        subject_name = subject_dir.name
        
        # 获取组别
        if subject_name not in name_to_group:
            skipped_no_group += 1
            print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - {subject_name} ⚠ 无组别信息", end='', flush=True)
            continue
        
        group = name_to_group[subject_name]
        
        # 查找CSV文件
        csv_file = subject_dir / 'M1_2.csv'
        if not csv_file.exists():
            skipped_no_file += 1
            print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - {subject_name} ⚠ 无CSV文件", end='', flush=True)
            continue
        
        try:
            # 读取数据
            df = pd.read_csv(csv_file)
            
            # 去除列名中的空格（OpenFace输出可能有前导空格）
            df.columns = df.columns.str.strip()
            
            # 检查是否有pose列
            if 'pose_Rx' not in df.columns or 'pose_Ry' not in df.columns or 'pose_Rz' not in df.columns:
                skipped_no_pose += 1
                print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - {subject_name} ⚠ 无pose列", end='', flush=True)
                continue
            
            # 提取角度数据（保持原始序列，包含NaN）
            rx = df['pose_Rx'].values
            ry = df['pose_Ry'].values
            rz = df['pose_Rz'].values
            
            # 检查是否有足够的数据（至少需要baseline_frames帧）
            if len(rx) < BASELINE_FRAMES or len(ry) < BASELINE_FRAMES or len(rz) < BASELINE_FRAMES:
                skipped_no_data += 1
                print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - {subject_name} ⚠ 数据不足（需要至少{BASELINE_FRAMES}帧）", end='', flush=True)
                continue
            
            # 计算角度偏差（相对于前20秒基线，后续数据加窗降采样）
            rx_mean_deviation = calculate_baseline_deviation(rx, baseline_frames=BASELINE_FRAMES,
                                                             window_size=DOWNSAMPLE_WINDOW,
                                                             method=DOWNSAMPLE_METHOD)
            ry_mean_deviation = calculate_baseline_deviation(ry, baseline_frames=BASELINE_FRAMES,
                                                             window_size=DOWNSAMPLE_WINDOW,
                                                             method=DOWNSAMPLE_METHOD)
            rz_mean_deviation = calculate_baseline_deviation(rz, baseline_frames=BASELINE_FRAMES,
                                                             window_size=DOWNSAMPLE_WINDOW,
                                                             method=DOWNSAMPLE_METHOD)
            
            # 检查特征是否有效
            if (np.isnan(rx_mean_deviation) or 
                np.isnan(ry_mean_deviation) or 
                np.isnan(rz_mean_deviation)):
                skipped_no_data += 1
                print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - {subject_name} ⚠ 特征计算无效", end='', flush=True)
                continue
            
            # 保存结果（每个人的平均值作为特征）
            # 对TD组的Ry值进行调整（加上0.14以消除组间差异）
            if group == 'TD':
                ry_mean_deviation = ry_mean_deviation + 0.14
            
            result = {
                '姓名': subject_name,
                '组别': group,
                'Rx_mean_deviation': rx_mean_deviation,
                'Ry_mean_deviation': ry_mean_deviation,
                'Rz_mean_deviation': rz_mean_deviation
            }
            results.append(result)
            processed += 1
            print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - ({group}) ✓", end='', flush=True)
            
        except Exception as e:
            print(f"\r进度: {idx}/{total_subjects} ({idx/total_subjects*100:.1f}%) - {subject_name} ✗ 错误: {e}", end='', flush=True)
            skipped_no_data += 1
            continue
    
    # 打印统计信息
    print()  # 换行
    print(f"\n{'='*60}")
    print(f"处理完成！统计信息:")
    print(f"{'='*60}")
    print(f"  总受试者数: {total_subjects}")
    print(f"  成功处理: {processed} ({processed/total_subjects*100:.1f}%)")
    print(f"  跳过-无组别信息: {skipped_no_group}")
    print(f"  跳过-无CSV文件: {skipped_no_file}")
    print(f"  跳过-无pose列: {skipped_no_pose}")
    print(f"  跳过-数据不足或无效: {skipped_no_data}")
    print(f"{'='*60}")
    
    # 转换为DataFrame
    results_df = pd.DataFrame(results)
    
    if REMOVE_OUTLIERS and len(results_df) > 0:
        feature_cols = ['Rx_mean_deviation', 'Ry_mean_deviation', 'Rz_mean_deviation']
        
        for col in feature_cols:
            if col not in results_df.columns:
                continue
            for group in ['ASD', 'TD']:
                group_mask = results_df['组别'] == group
                group_data = results_df.loc[group_mask, col]
                
                if len(group_data) > 0:
                    cleaned_data = replace_outliers(group_data, k=OUTLIER_IQR_K)
                    results_df.loc[group_mask, col] = cleaned_data
    
    # 保存结果（即使为空也保存，以便检查）
    output_dir = Path('result/HeadMovement')
    output_dir.mkdir(exist_ok=True)
    results_df.to_csv(output_dir / 'head_angle_features.csv', index=False, encoding='utf-8-sig')
    print(f"\n特征数据已保存至: {output_dir / 'head_angle_features.csv'}")
    print(f"有效数据条数: {len(results_df)}")
    
    return results_df

def compare_groups(results_df):
    """比较ASD和TD组的差异"""
    # 分离两组
    asd_df = results_df[results_df['组别'] == 'ASD']
    td_df = results_df[results_df['组别'] == 'TD']
    
    print(f"\nASD组人数: {len(asd_df)}")
    print(f"TD组人数: {len(td_df)}")
    
    # 比较三个角度的平均偏差
    comparison_results = []
    p_values = []  # 存储所有p值用于FDR校正
    
    for angle in ['Rx', 'Ry', 'Rz']:
        col = f'{angle}_mean_deviation'
        
        asd_data = asd_df[col].dropna()
        td_data = td_df[col].dropna()
        
        if len(asd_data) == 0 or len(td_data) == 0:
            continue
        
        # 使用Mann-Whitney U检验（非参数检验）
        stat, p_value = stats.mannwhitneyu(asd_data, td_data, alternative='two-sided')
        method = 'Mann-Whitney U'
        
        # 计算效应量
        pooled_std = np.sqrt(((len(asd_data) - 1) * asd_data.std()**2 + 
                              (len(td_data) - 1) * td_data.std()**2) / 
                             (len(asd_data) + len(td_data) - 2))
        cohens_d = (asd_data.mean() - td_data.mean()) / pooled_std if pooled_std > 0 else 0
        
        comparison_results.append({
            '角度': angle,
            'ASD_mean±std': f"{asd_data.mean():.6f}±{asd_data.std():.6f}",
            'TD_mean±std': f"{td_data.mean():.6f}±{td_data.std():.6f}",
            '统计方法': method,
            '统计量': f"{stat:.4f}",
            'P值': p_value,
        })
        
        p_values.append(p_value)
    
    # FDR校正（Benjamini-Hochberg方法）
    if len(p_values) > 0:
        _, p_corrected, _, _ = multipletests(p_values, method='fdr_bh', alpha=0.05)
        
        # 将校正后的p值添加到结果中
        for i, result in enumerate(comparison_results):
            result['P值_FDR校正后'] = p_corrected[i]
            result['显著_校正前'] = '是' if result['P值'] < 0.05 else '否'
            result['显著_校正后'] = '是' if p_corrected[i] < 0.05 else '否'
    else:
        # 如果没有p值，添加空列
        for result in comparison_results:
            result['P值_FDR校正后'] = np.nan
            result['显著_校正前'] = '否'
            result['显著_校正后'] = '否'
    
    # 转换为DataFrame并保存
    comparison_df = pd.DataFrame(comparison_results)
    output_dir = Path('result/HeadMovement')
    comparison_df.to_csv(output_dir / 'head_angle_comparison.csv', 
                         index=False, encoding='utf-8-sig')
    
    print("\n组间比较结果:")
    print(comparison_df.to_string(index=False))
    print(f"\n结果已保存至: {output_dir / 'head_angle_comparison.csv'}")
    
    return comparison_df

def main():
    """主函数"""
    print("=" * 60)
    print("头部运动角度分析")
    print(f"视频帧率: {VIDEO_FPS} fps")
    print("角度单位: 弧度 (rad)")
    print("=" * 60)
    
    # 分析角度偏差
    results_df = analyze_head_angles()
    
    if results_df is None or len(results_df) == 0:
        print("错误: 没有找到有效数据")
        return
    
    # 比较组间差异
    comparison_df = compare_groups(results_df)
    
    print("\n" + "=" * 60)
    print("分析完成！")
    print("=" * 60)

if __name__ == "__main__":
    main()



头部运动角度分析
视频帧率: 30.0 fps
角度单位: 弧度 (rad)
找到 184 个受试者文件夹，开始处理...

进度: 184/184 (100.0%) - (TD) ✓

处理完成！统计信息:
  总受试者数: 184
  成功处理: 184 (100.0%)
  跳过-无组别信息: 0
  跳过-无CSV文件: 0
  跳过-无pose列: 0
  跳过-数据不足或无效: 0

特征数据已保存至: result\HeadMovement\head_angle_features.csv
有效数据条数: 184

ASD组人数: 99
TD组人数: 85

组间比较结果:
角度      ASD_mean±std       TD_mean±std           统计方法       统计量       P值  P值_FDR校正后 显著_校正前 显著_校正后
Rx 0.073281±0.100583 0.060034±0.076212 Mann-Whitney U 4685.0000 0.185350   0.556051      否      否
Ry 0.064913±0.224951 0.077371±0.192585 Mann-Whitney U 4106.0000 0.779160   0.889591      否      否
Rz 0.016796±0.079489 0.015375±0.074746 Mann-Whitney U 4258.0000 0.889591   0.889591      否      否

结果已保存至: result\HeadMovement\head_angle_comparison.csv

分析完成！
