In [22]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy import signal
from numpy import linalg as LA
import os


### 実験設定


In [23]:
PARTICIPANTS = ['oba', 'ono', 'pon', 'kuno', 'john', 'konan', 
                'obara', 'fukuzawa', 'kiuchi', 'yanaze', 'adachi', 'iwasaki']
MASSES = [60.9, 63.8, 68.7, 65.9, 77, 74, 63.8, 64.5, 73.2, 53.5, 70.7, 47.9]
CONDITIONS = ['h', 'm', 'l']

# サンプリング周波数
DEVICE_FREQ = 100
MOCAP_FREQ = 250
FORCE_FREQ = 1000

# フィルタ設定
CUTOFF_FREQ = 6
FILTER_ORDER = 4

### 1. データ読み込み関数

In [24]:
def load_participant_data(participant, condition, mass):
    """
    指定された実験協力者・条件のデータを読み込む
    
    Parameters:
        participant: 実験協力者名
        condition: 実験条件 ('h', 'm', 'l')
        mass: 体重 [kg]
    
    Returns:
        dict: 各種データフレームと体重情報
    """
    print(f"\n{'='*60}")
    print(f"Processing: {participant} - Condition: {condition.upper()} (Mass: {mass} kg)")
    print(f"{'='*60}")
    
    # ファイルパス生成
    file_path_left = f"WearableDevices/{participant}_{condition}_left_foot_data.csv"
    file_path_right = f"WearableDevices/{participant}_{condition}_right_foot_data.csv"
    file_path_mocap = f"MotionCaptures/{participant}_{condition}_mocap.csv"
    file_path_force = f"3DGroundForces/{participant}_{condition}_force.csv"
    
    # データ読み込み
    df_left = pd.read_csv(file_path_left, header=0)
    df_right = pd.read_csv(file_path_right, header=0)
    df_mocap = pd.read_csv(file_path_mocap, header=[2, 5, 6])
    df_force = pd.read_csv(file_path_force, header=10, encoding='shift_jis')
    
    return {
        'left': df_left,
        'right': df_right,
        'mocap': df_mocap,
        'force': df_force,
        'mass': mass,
        'participant': participant,
        'condition': condition
    }

### 2. モーションキャプチャデータの列名整理

In [25]:
def clean_mocap_columns(df_mocap):
    """モーションキャプチャデータの列名を整理"""
    new_columns = []
    
    for col in df_mocap.columns:
        if col[2] == 'Frame':
            new_columns.append('Frame')
        elif 'Time' in col[2]:
            new_columns.append('Time (Seconds)')
        else:
            body_num = col[0].replace('Rigid Body', '').strip()
            name = f"{body_num}_{col[1]}_{col[2]}"
            new_columns.append(name)
    
    df_mocap.columns = new_columns
    return df_mocap

### 3. 床反力データの列名整理

In [26]:
def clean_force_columns(df_force):
    """床反力データの列名を英語に変換"""
    columns_mapping = {
        'Unnamed: 0': 'Time (Seconds)',
        '右-Fx': 'Right_Fx', '右-Fy': 'Right_Fy', '右-Fz': 'Right_Fz',
        '右-Mx': 'Right_Mx', '右-My': 'Right_My', '右-Mz': 'Right_Mz',
        '右-COPx': 'Right_COPx', '右-COPy': 'Right_COPy',
        '左-Fx': 'Left_Fx', '左-Fy': 'Left_Fy', '左-Fz': 'Left_Fz',
        '左-Mx': 'Left_Mx', '左-My': 'Left_My', '左-Mz': 'Left_Mz',
        '左-COPx': 'Left_COPx', '左-COPy': 'Left_COPy',
    }
    return df_force.rename(columns=columns_mapping)

 ### 4. リサンプリング


In [27]:
def process_resampling(df_input, sampling_interval=10):
    """
    データのリサンプリング，軸反転，単位変換を実行
    
    Parameters:
        df_input: 入力データフレーム
        sampling_interval: サンプリング間隔 [ms]
    """
    df = df_input.copy()
    
    # 欠損値補完
    exclude_cols = ['Marker']
    target_cols = df.columns.difference(exclude_cols)
    df[target_cols] = df[target_cols].interpolate(method='linear', axis=0)
    
    # 新しい時間軸作成
    time_min = 0
    time_max = df['ElapsedTime'].max()
    new_time = np.arange(time_min, time_max, sampling_interval)
    df_resampled = pd.DataFrame({'ElapsedTime': new_time})
    
    # マーカー列の処理
    if exclude_cols:
        df_markers = df[['ElapsedTime'] + exclude_cols].dropna(subset=exclude_cols, how='all').copy()
        df_markers['ElapsedTime_rounded'] = (df_markers['ElapsedTime'] / sampling_interval).round() * sampling_interval
        df_markers = df_markers.drop_duplicates(subset=['ElapsedTime_rounded'])
        
        df_resampled['MergeKey'] = df_resampled['ElapsedTime'].round().astype(int)
        df_markers['MergeKey'] = df_markers['ElapsedTime_rounded'].round().astype(int)
        
        df_resampled = pd.merge(df_resampled, df_markers[['MergeKey'] + exclude_cols],
                                on='MergeKey', how='left')
        df_resampled = df_resampled.drop(columns=['MergeKey'])
    
    # 連続値データの補間
    columns_to_exclude = ['ElapsedTime'] + exclude_cols
    for column in df.columns:
        if column in columns_to_exclude:
            continue
        interpolator = interp1d(df['ElapsedTime'], df[column], 
                                kind='linear', fill_value='extrapolate')
        df_resampled[column] = interpolator(new_time)
    
    # 単位変換
    df_resampled['ElapsedTime'] = df_resampled['ElapsedTime'] / 1000
    df_resampled = df_resampled.rename(columns={'ElapsedTime': 'Time (Seconds)'})
    
    
    return df_resampled

### 5. ローパスフィルタ適用

In [28]:
def apply_lowpass_filter(data, cutoff, fs, order=4):
    """Butterworthローパスフィルタ"""
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)
    return signal.filtfilt(b, a, data, axis=0)

def process_smoothing_dataframe(df_input, fs=DEVICE_FREQ, cutoff=CUTOFF_FREQ, order=FILTER_ORDER):
    """データフレーム全体にフィルタを適用"""
    df_smooth = df_input.copy()
    
    exclude_keywords = ['Time (Seconds)', 'Marker']
    filter_target_cols = [col for col in df_smooth.columns 
                          if col not in exclude_keywords and 'Marker' not in col]
    
    # 欠損値補完
    df_smooth[filter_target_cols] = df_smooth[filter_target_cols].interpolate(
        method='linear', limit_direction='both')
    df_smooth[filter_target_cols] = df_smooth[filter_target_cols].fillna(0)
    
    # フィルタ適用
    df_smooth[filter_target_cols] = apply_lowpass_filter(
        df_smooth[filter_target_cols].values, cutoff=cutoff, fs=fs, order=order)
    
    # 圧力データのクリッピング（負値を0に）
    pressure_cols = [col for col in filter_target_cols if 'kPa' in col]
    if pressure_cols:
        df_pressure = df_smooth[pressure_cols]
        df_smooth[pressure_cols] = df_pressure.mask(df_pressure < 0, 0)
    
    return df_smooth

### 6. 関節角度計算

In [29]:
# マーカーインデックス定義
R_ILIUM_INDEX = 1
R_GREATER_TROCHANTER_INDEX = 2
R_KNEE_INDEX = 3
R_MALLEOLUS_INDEX = 4
R_TOE_INDEX = 5
L_ILIUM_INDEX = 6
L_GREATER_TROCHANTER_INDEX = 7
L_KNEE_INDEX = 8
L_MALLEOLUS_INDEX = 9
L_TOE_INDEX = 10
JOINT_NO = 6

class CalculateAngle:
    """関節角度計算クラス"""
    
    @staticmethod
    def _vec(A, B, C):
        return np.stack((A - B, C - B), axis=0)
    
    @staticmethod
    def _angle2d(a, b):
        inner = np.dot(a, b)
        norm_a = LA.norm(a)
        norm_b = LA.norm(b)
        
        if norm_a < 1e-4 or norm_b < 1e-4:
            return 0.0
        
        norm = norm_a * norm_b
        return np.degrees(np.arccos(np.clip(inner / norm, -1, 1)))
    
    def _angle3d(self, v):
        deg = np.zeros(3)
        pairs = [
            (slice(None, 2), 0),
            (slice(1, None), 1),
            ((2, 0), 2)
        ]
        
        for i, (sl, _) in enumerate(pairs):
            a = v[0][sl] if isinstance(sl, slice) else v[0][list(sl)]
            b = v[1][sl] if isinstance(sl, slice) else v[1][list(sl)]
            S = 0.5 * (b[0] * a[1] - b[1] * a[0])
            ang = self._angle2d(a, b)
            deg[i] = 360 - ang if S < 0 else ang
        return deg
    
    def angles(self, pos):
        pos = pos[:, [0, 2, 1]].copy()
        pos[:, 0] *= -1
        out = np.zeros((JOINT_NO, 3))
        idxs = [
            (R_ILIUM_INDEX - 1, R_GREATER_TROCHANTER_INDEX - 1, R_KNEE_INDEX - 1),
            (L_ILIUM_INDEX - 1, L_GREATER_TROCHANTER_INDEX - 1, L_KNEE_INDEX - 1),
            (R_GREATER_TROCHANTER_INDEX - 1, R_KNEE_INDEX - 1, R_MALLEOLUS_INDEX - 1),
            (L_GREATER_TROCHANTER_INDEX - 1, L_KNEE_INDEX - 1, L_MALLEOLUS_INDEX - 1),
            (R_KNEE_INDEX - 1, R_MALLEOLUS_INDEX - 1, R_TOE_INDEX - 1),
            (L_KNEE_INDEX - 1, L_MALLEOLUS_INDEX - 1, L_TOE_INDEX - 1),
        ]
        for i, (a, b, c) in enumerate(idxs):
            out[i] = self._angle3d(self._vec(pos[a], pos[b], pos[c]))
        return out

def calculate_angles_from_positions(df, target_cols):
    """データフレームから関節角度を計算"""
    calculator = CalculateAngle()
    positions_flat = df[target_cols].values
    n_frames = positions_flat.shape[0]
    
    if n_frames == 0:
        return np.empty((0, 18))
    
    positions_3d = positions_flat.reshape(n_frames, 10, 3)
    all_angles = []
    
    for frame_idx in range(n_frames):
        current_pos = positions_3d[frame_idx]
        angles = calculator.angles(current_pos)
        all_angles.append(angles.flatten())
    
    return np.array(all_angles)

def process_mocap_data_target_calibration(df_target, df_ref):
    """
    モーションキャプチャデータから関節角度を計算し，キャリブレーション補正を実施
    
    Parameters:
        df_target: 解析対象データ
        df_ref: タイミング特定用データ
    """
    target_cols = []
    for i in range(1, 11):
        prefix = f"{i:03}"
        target_cols.extend([f"{prefix}_Position_X", f"{prefix}_Position_Y", f"{prefix}_Position_Z"])
    
    # キャリブレーション時間の特定
    trigger_rows = df_ref[df_ref['Marker'] == 2]
    if trigger_rows.empty:
        raise ValueError("Marker == 2 が見つかりません")
    
    trigger_time = trigger_rows.iloc[0]['Time (Seconds)']
    start_time = trigger_time - 20.0
    end_time = trigger_time - 15.0
    
    # 基準姿勢データの抽出
    mask = (df_target['Time (Seconds)'] >= start_time) & (df_target['Time (Seconds)'] <= end_time)
    df_base_window = df_target.loc[mask].copy()
    
    if len(df_base_window) == 0:
        raise ValueError("基準区間にデータが存在しません")
    
    # 基準姿勢の計算
    ref_angles = calculate_angles_from_positions(df_base_window, target_cols)
    base_pose_mean = np.mean(ref_angles, axis=0)
    
    # 全データの角度計算と補正
    target_angles = calculate_angles_from_positions(df_target, target_cols)
    relative_angles = target_angles - base_pose_mean
    
    # DataFrame作成
    joint_names = ["Right_Hip", "Left_Hip", "Right_Knee", "Left_Knee", "Right_Ankle", "Left_Ankle"]
    plane_names = ["XY", "YZ", "ZX"]
    columns = []
    for joint in joint_names:
        for plane in plane_names:
            columns.append(f"{joint}_{plane}")
    
    df_result = pd.DataFrame(relative_angles, columns=columns)
    
    if 'Time (Seconds)' in df_target.columns:
        df_result.insert(0, 'Time (Seconds)', df_target['Time (Seconds)'].values)
    
    return df_result, base_pose_mean, columns

### 7. 床反力の正規化（体重比）[%BW]

In [30]:
def normalize_force_by_bodyweight(df_force, mass):
    """床反力を体重で正規化"""
    columns = ['Time (Seconds)', 'Right_Fx', 'Right_Fy', 'Right_Fz', 
               'Left_Fx', 'Left_Fy', 'Left_Fz']
    force_columns = ['Right_Fx', 'Right_Fy', 'Right_Fz', 'Left_Fx', 'Left_Fy', 'Left_Fz']
    
    body_weight = mass * 9.81
    df_normalized = df_force[columns].copy()
    df_normalized[force_columns] /= body_weight
    
    return df_normalized

### 8. 圧力データ (Pressure) -> Min-Max (0~1)
###    IMUデータ (その他)    -> Z-score (平均0, 分散1)

In [31]:
def process_hybrid_normalization(df_input, duration=300):
    """
    マーカー2から指定時間のデータを使用し、
    - 圧力データ (Pressure) -> Min-Max (0~1)
    - IMUデータ (その他)    -> Z-score (平均0, 分散1)
    に正規化する
    """
    df_norm = df_input.copy()
    df_norm.columns = df_norm.columns.str.replace('kPa', 'Pressure')
    
    exclude_keywords = ['Time (Seconds)', 'Marker']

    # ---------------------------------------------------------
    # 1. 列の分類 (圧力か、それ以外(IMU)か)
    # ---------------------------------------------------------
    # 'Pressure' という文字が含まれる列を抽出
    pressure_cols = [col for col in df_norm.columns if 'Pressure' in col]
    
    # それ以外の数値データ列をIMUとして扱う
    imu_cols = [col for col in df_norm.columns 
                if col not in exclude_keywords 
                and 'Marker' not in col 
                and col not in pressure_cols]
    
    # ---------------------------------------------------------
    # 2. 基準となる期間(300秒)のデータ抽出
    # ---------------------------------------------------------
    start_time = 0
    marker_cols = [col for col in df_norm.columns if 'Marker' in col]
    
    if marker_cols:
        marker_2_rows = df_norm[(df_norm[marker_cols] == 2).any(axis=1)]
        if not marker_2_rows.empty:
            start_time = marker_2_rows.iloc[0]['Time (Seconds)']
    
    end_time = start_time + duration
    mask = (df_norm['Time (Seconds)'] >= start_time) & (df_norm['Time (Seconds)'] <= end_time)
    df_stats_base = df_norm.loc[mask]
    
    # ---------------------------------------------------------
    # 3. IMUデータの正規化 -> Z-score
    # ---------------------------------------------------------
    if imu_cols:
        means = df_stats_base[imu_cols].mean()
        stds = df_stats_base[imu_cols].std()
        stds = stds.replace(0, 1) # 0除算回避
        
        # 全データに対して適用
        df_norm[imu_cols] = (df_norm[imu_cols] - means) / stds

    # ---------------------------------------------------------
    # 4. 足底圧力データの正規化 -> Min-Max (0~1)
    # ---------------------------------------------------------
    if pressure_cols:
        mins = df_stats_base[pressure_cols].min()
        maxs = df_stats_base[pressure_cols].max()
        ranges = maxs - mins
        ranges = ranges.replace(0, 1) # 0除算回避
        
        # 全データに対して適用
        df_norm[pressure_cols] = (df_norm[pressure_cols] - mins) / ranges

    # 戻り値: データフレームと、後で確認・逆変換できるように統計量も返す
    stats_info = {
        "imu_mean": means if imu_cols else None,
        "imu_std": stds if imu_cols else None,
        "press_min": mins if pressure_cols else None,
        "press_max": maxs if pressure_cols else None
    }
    
    return df_norm, stats_info

### 9. データ同期と結合

In [32]:
def calculate_fine_offset_pressure(df_target, df_ref, col_target_pressure_list, 
                                   col_ref_name, t_start, duration=300, fs=100):
    """足底圧と床反力の相互相関によるタイミング補正"""
    t_end = t_start + duration
    
    mask_tgt = (df_target['Time (Seconds)'] >= t_start) & (df_target['Time (Seconds)'] <= t_end)
    df_t = df_target.loc[mask_tgt].copy()
    
    mask_ref = (df_ref['Time (Seconds)'] >= t_start) & (df_ref['Time (Seconds)'] <= t_end)
    df_r = df_ref.loc[mask_ref].copy()
    
    if len(df_t) < fs or len(df_r) < fs:
        return 0.0
    
    t_min = max(df_t['Time (Seconds)'].min(), df_r['Time (Seconds)'].min())
    t_max = min(df_t['Time (Seconds)'].max(), df_r['Time (Seconds)'].max())
    
    if t_min >= t_max:
        return 0.0
    
    common_t = np.arange(t_min, t_max, 1.0/fs)
    
    pressure_sum = df_t[col_target_pressure_list].sum(axis=1)
    f_tgt = interp1d(df_t['Time (Seconds)'], pressure_sum, 
                     kind='linear', fill_value=0, bounds_error=False)
    sig_tgt = f_tgt(common_t)
    
    f_ref = interp1d(df_r['Time (Seconds)'], df_r[col_ref_name], 
                     kind='linear', fill_value=0, bounds_error=False)
    sig_ref = f_ref(common_t)
    
    sig_tgt_norm = (sig_tgt - np.mean(sig_tgt)) / (np.std(sig_tgt) + 1e-6)
    sig_ref_norm = (sig_ref - np.mean(sig_ref)) / (np.std(sig_ref) + 1e-6)
    
    correlation = signal.correlate(sig_ref_norm, sig_tgt_norm, mode='full')
    lags = signal.correlation_lags(len(sig_ref_norm), len(sig_tgt_norm), mode='full')
    best_lag = lags[np.argmax(correlation)]
    
    return best_lag / fs

def synchronize_merge_and_extract(df_left, df_right, df_angles, df_force, target_freq=100):
    """
    全データの同期・結合・抽出
    Marker 1で粗調整 → 足底圧で微調整 → 結合 → Marker 2から300秒抽出
    """
    # Marker 1による粗調整
    trigger_marker = 1
    marker_rows_l = df_left[df_left['Marker'] == trigger_marker]
    t_marker_left_1 = 0.0
    if not marker_rows_l.empty:
        t_marker_left_1 = marker_rows_l.iloc[0]['Time (Seconds)']
    
    df_right_rough = df_right.copy()
    marker_rows_r = df_right[df_right['Marker'] == trigger_marker]
    if not marker_rows_r.empty:
        t_marker_r_1 = marker_rows_r.iloc[0]['Time (Seconds)']
        offset_r_rough = t_marker_left_1 - t_marker_r_1
        df_right_rough['Time (Seconds)'] += offset_r_rough
    
    df_force_rough = df_force.copy()
    df_force_rough['Time (Seconds)'] += t_marker_left_1
    
    # Marker 2 + 300sによる微調整
    fine_tune_marker = 2
    duration = 300
    cols_Pressure_left = [f'Left_Pressure_{i}' for i in range(1, 9)]
    cols_Pressure_right = [f'Right_Pressure_{i}' for i in range(1, 9)]
    
    marker_rows_l_2 = df_left[df_left['Marker'] == fine_tune_marker]
    
    df_left_final = df_left.copy()
    df_right_final = df_right_rough.copy()
    df_angles_final = df_angles.copy()
    
    if not marker_rows_l_2.empty:
        t_start_fine = marker_rows_l_2.iloc[0]['Time (Seconds)']
        
        offset_l_fine = calculate_fine_offset_pressure(
            df_target=df_left, df_ref=df_force_rough, 
            col_target_pressure_list=cols_Pressure_left,
            col_ref_name='Left_Fz', t_start=t_start_fine, 
            duration=duration, fs=target_freq
        )
        
        offset_r_fine = calculate_fine_offset_pressure(
            df_target=df_right_rough, df_ref=df_force_rough, 
            col_target_pressure_list=cols_Pressure_right,
            col_ref_name='Right_Fz', t_start=t_start_fine, 
            duration=duration, fs=target_freq
        )
        
        df_left_final['Time (Seconds)'] += offset_l_fine
        df_angles_final['Time (Seconds)'] += offset_l_fine
        df_right_final['Time (Seconds)'] += offset_r_fine
    
    df_force_final = df_force_rough
    
    # 統合用時間軸の作成
    t_start = max(df_left_final['Time (Seconds)'].min(), 
                  df_right_final['Time (Seconds)'].min(),
                  df_angles_final['Time (Seconds)'].min(), 
                  df_force_final['Time (Seconds)'].min())
    t_end = min(df_left_final['Time (Seconds)'].max(), 
                df_right_final['Time (Seconds)'].max(),
                df_angles_final['Time (Seconds)'].max(), 
                df_force_final['Time (Seconds)'].max())
    
    common_time = np.arange(t_start, t_end, 1.0/target_freq)
    df_merged = pd.DataFrame({'Time (Seconds)': common_time})
    
    # リサンプリングと結合
    data_sources = {'L_Dev': df_left_final, 'R_Dev': df_right_final, 
                    'Mocap': df_angles_final, 'Force': df_force_final}
    
    for prefix, df_src in data_sources.items():
        time_col = 'Time (Seconds)' if 'Time (Seconds)' in df_src.columns else 'Time'
        numeric_cols = df_src.select_dtypes(include=[np.number]).columns
        cols_to_interp = [c for c in numeric_cols if c != time_col and 'Marker' not in c]
        
        if not cols_to_interp:
            continue
        
        f = interp1d(df_src[time_col], df_src[cols_to_interp], axis=0, 
                     kind='linear', fill_value="extrapolate")
        interp_data = f(common_time)
        df_temp = pd.DataFrame(interp_data, columns=cols_to_interp)
        df_merged = pd.concat([df_merged, df_temp], axis=1)
    
    df_merged = df_merged.loc[:, ~df_merged.columns.duplicated()]
    
    # Marker 2から300秒間の抽出
    marker_rows_sync = df_left_final[df_left_final['Marker'] == 2]
    
    if not marker_rows_sync.empty:
        synced_start_time = marker_rows_sync.iloc[0]['Time (Seconds)']
        synced_end_time = synced_start_time + 300.0
        
        df_analysis = df_merged[
            (df_merged['Time (Seconds)'] >= synced_start_time) & 
            (df_merged['Time (Seconds)'] <= synced_end_time)
        ].copy()
        
        df_analysis['Time (Seconds)'] = df_analysis['Time (Seconds)'] - synced_start_time
        df_analysis = df_analysis.reset_index(drop=True)
        
        return df_analysis
    else:
        return df_merged

### 10. ストライド検出と抽出

In [33]:
def detect_fz_heel_strikes(signal_array, threshold=0.05, min_dist_samples=40):
    """Fzの立ち上がり検出"""
    is_contact = signal_array > threshold
    rising_edge = np.diff(is_contact.astype(int), prepend=0) == 1
    potential_indices = np.where(rising_edge)[0]
    
    if len(potential_indices) == 0:
        return np.array([])
    
    true_indices = [potential_indices[0]]
    for idx in potential_indices[1:]:
        if idx - true_indices[-1] > min_dist_samples:
            true_indices.append(idx)
    
    return np.array(true_indices)

def slice_strides_with_constraints(df_input, target_col, side_name="Left", 
                                   threshold=0.05, fs=100, 
                                   min_duration=0.7, max_duration=1.8):
    """ストライド時間制約を満たす歩行のみ抽出"""
    signal = df_input[target_col].values
    time_array = df_input['Time (Seconds)'].values
    
    min_dist_samples = int(0.4 * fs)
    hs_indices = detect_fz_heel_strikes(signal, threshold=threshold, 
                                        min_dist_samples=min_dist_samples)
    
    valid_strides = []
    
    for i in range(1, len(hs_indices) - 2):
        start_idx = hs_indices[i]
        end_idx = hs_indices[i+1]
        
        start_t = time_array[start_idx]
        end_t = time_array[end_idx]
        duration = end_t - start_t
        
        if min_duration <= duration <= max_duration:
            stride_df = df_input.iloc[start_idx:end_idx].copy()
            valid_strides.append(stride_df)
    
    print(f"[{side_name}] Accepted Strides: {len(valid_strides)}")
    return valid_strides


### 11. ストライドの正規化

In [34]:
def normalize_strides(stride_list, target_cols, n_points=200):
    """各ストライドを0-100%に正規化"""
    normalized_dfs = []
    data_collector = []
    
    gait_cycle = np.linspace(0, 100, n_points)
    x_new = np.linspace(0, 1, n_points)
    
    for stride_df in stride_list:
        n_len = len(stride_df)
        x_old = np.linspace(0, 1, n_len)
        
        new_df = pd.DataFrame()
        new_df['Gait Cycle (%)'] = gait_cycle
        
        stride_matrix = []
        
        for col in target_cols:
            if col in stride_df.columns:
                y_old = stride_df[col].values
                f = interp1d(x_old, y_old, kind='linear', fill_value="extrapolate")
                y_new = f(x_new)
                new_df[col] = y_new
                stride_matrix.append(y_new)
            else:
                zeros = np.zeros(n_points)
                new_df[col] = zeros
                stride_matrix.append(zeros)
        
        normalized_dfs.append(new_df)
        data_collector.append(np.array(stride_matrix).T)
    
    if len(data_collector) > 0:
        ensemble_array = np.array(data_collector)
    else:
        ensemble_array = np.empty((0, n_points, len(target_cols)))
    
    return normalized_dfs, ensemble_array

### 12. 外れ値ストライドの除去

In [35]:
def filter_outlier_strides(ensemble_array, stride_dfs, n_sigmas=3, 
                           outlier_ratio_threshold=0.05):
    """集団から大きく乖離した外れ値ストライドを除外"""
    median_curve = np.median(ensemble_array, axis=0)
    std_curve = np.std(ensemble_array, axis=0)
    
    upper_limit = median_curve + (n_sigmas * std_curve)
    lower_limit = median_curve - (n_sigmas * std_curve)
    
    upper_limit_bc = upper_limit[np.newaxis, :, :]
    lower_limit_bc = lower_limit[np.newaxis, :, :]
    
    is_outlier_matrix = (ensemble_array > upper_limit_bc) | (ensemble_array < lower_limit_bc)
    
    total_points_per_stride = ensemble_array.shape[1] * ensemble_array.shape[2]
    outlier_counts = np.sum(is_outlier_matrix, axis=(1, 2))
    outlier_ratios = outlier_counts / total_points_per_stride
    
    keep_mask = outlier_ratios <= outlier_ratio_threshold
    
    clean_ensemble = ensemble_array[keep_mask]
    clean_dfs = [df for i, df in enumerate(stride_dfs) if keep_mask[i]]
    
    return clean_ensemble, clean_dfs, keep_mask

 ### 13. 左右データの統合処理

In [36]:
def merge_left_right_data(left_ensemble, right_ensemble, left_cols, right_cols):
    """
    左右のデータを統合し，左足データを適切に反転して同じデータとして扱う
    
    【反転処理の詳細】
    - 加速度: Left_Accel_X を -1倍
    - 角速度: Left_Gyro_Y, Left_Gyro_Z を -1倍
    - 関節角度: Left_{Joint}_ZX を -1倍（前額面の角度）
    - 床反力: Left_Fx を -1倍
    
    Returns:
        merged_ensemble: 統合された3次元配列 (左右合計ストライド数, 200, 特徴量数)
        merged_cols: 統合されたカラム名リスト（Left_/Right_プレフィックスを削除）
    """
    # 右足データはそのまま使用
    right_data = right_ensemble.copy()
    
    # 左足データをコピーして反転処理
    left_data = left_ensemble.copy()
    
    print("\n--- 左足データの反転処理 ---")
    
    # 1. 加速度X軸の反転
    if 'Left_Accel_X' in left_cols:
        idx = left_cols.index('Left_Accel_X')
        left_data[:, :, idx] *= -1
        print(f"  ✓ Left_Accel_X を反転 (index: {idx})")
    
    # 2. 角速度Y, Z軸の反転
    for axis in ['Y', 'Z']:
        col_name = f'Left_Gyro_{axis}'
        if col_name in left_cols:
            idx = left_cols.index(col_name)
            left_data[:, :, idx] *= -1
            print(f"  ✓ Left_Gyro_{axis} を反転 (index: {idx})")
    
    # 3. 関節角度ZX平面の反転
    for joint in ['Hip', 'Knee', 'Ankle']:
        col_name = f'Left_{joint}_ZX'
        if col_name in left_cols:
            idx = left_cols.index(col_name)
            left_data[:, :, idx] *= -1
            print(f"  ✓ Left_{joint}_ZX を反転 (index: {idx})")
    
    # 4. 床反力Fxの反転
    if 'Left_Fx' in left_cols:
        idx = left_cols.index('Left_Fx')
        left_data[:, :, idx] *= -1
        print(f"  ✓ Left_Fx を反転 (index: {idx})")
    
    # 左右データの結合
    merged_ensemble = np.concatenate([left_data, right_data], axis=0)
    
    # カラム名は左右で共通化（Left_/Right_プレフィックスを削除）
    # 例: Left_Accel_X, Right_Accel_X → Accel_X
    merged_cols = [col.replace('Left_', '').replace('Right_', '') for col in left_cols]
    
    print(f"\n統合完了:")
    print(f"  左足ストライド数: {left_ensemble.shape[0]}")
    print(f"  右足ストライド数: {right_ensemble.shape[0]}")
    print(f"  合計ストライド数: {merged_ensemble.shape[0]}")
    print(f"  特徴量数: {merged_ensemble.shape[2]}")
    
    return merged_ensemble, merged_cols

### 14. メイン処理パイプライン

In [37]:
def process_single_participant_condition(participant, condition, mass):
    """1人の1条件分のデータを処理"""
    
    # データ読み込み
    data = load_participant_data(participant, condition, mass)
    
    # 列名整理
    df_mocap = clean_mocap_columns(data['mocap'])
    df_force = clean_force_columns(data['force'])
    
    # リサンプリング
    df_left_processed = process_resampling(data['left'], sampling_interval=10)
    df_right_processed = process_resampling(data['right'], sampling_interval=10)
    
    # ローパスフィルタ
    df_left_smoothed = process_smoothing_dataframe(df_left_processed, fs=DEVICE_FREQ)
    df_right_smoothed = process_smoothing_dataframe(df_right_processed, fs=DEVICE_FREQ)
    df_mocap_smoothed = process_smoothing_dataframe(df_mocap, fs=MOCAP_FREQ)
    df_force_smoothed = process_smoothing_dataframe(df_force, fs=FORCE_FREQ)
    
    # 関節角度計算
    df_angles, _, _ = process_mocap_data_target_calibration(
        df_mocap_smoothed, df_left_smoothed)
    
    # 床反力の正規化
    df_normalized = normalize_force_by_bodyweight(df_force_smoothed, mass)
    
    # Zスコア正規化
    df_left_z, _ = process_hybrid_normalization(df_left_smoothed, duration=300)
    df_right_z, _ = process_hybrid_normalization(df_right_smoothed, duration=300)
    
    # データ同期と結合
    df_final = synchronize_merge_and_extract(
        df_left_z, df_right_z, df_angles, df_normalized, target_freq=100)
    
    # ストライド抽出
    left_strides = slice_strides_with_constraints(
        df_final, 'Left_Fz', side_name="Left", threshold=0.01)
    right_strides = slice_strides_with_constraints(
        df_final, 'Right_Fz', side_name="Right", threshold=0.01)
    
    # カラム定義
    cols_left = [
        'Left_Pressure_1', 'Left_Pressure_2', 'Left_Pressure_3', 'Left_Pressure_4', 
        'Left_Pressure_5', 'Left_Pressure_6', 'Left_Pressure_7', 'Left_Pressure_8',
        'Left_Accel_X', 'Left_Accel_Y', 'Left_Accel_Z', 
        'Left_Gyro_X', 'Left_Gyro_Y', 'Left_Gyro_Z', 
        'Left_Hip_XY', 'Left_Hip_YZ', 'Left_Hip_ZX', 
        'Left_Knee_XY', 'Left_Knee_YZ', 'Left_Knee_ZX', 
        'Left_Ankle_XY', 'Left_Ankle_YZ', 'Left_Ankle_ZX', 
        'Left_Fx', 'Left_Fy', 'Left_Fz'
    ]
    cols_right = [c.replace('Left', 'Right') for c in cols_left]
    
    # 正規化
    left_norm_dfs, left_ensemble = normalize_strides(left_strides, cols_left, n_points=200)
    right_norm_dfs, right_ensemble = normalize_strides(right_strides, cols_right, n_points=200)
    
    # 外れ値除去
    L_ens_clean, L_dfs_clean, _ = filter_outlier_strides(
        left_ensemble, left_norm_dfs, n_sigmas=3, outlier_ratio_threshold=0.01)
    R_ens_clean, R_dfs_clean, _ = filter_outlier_strides(
        right_ensemble, right_norm_dfs, n_sigmas=3, outlier_ratio_threshold=0.01)
    
    # 左右データの統合（反転処理を含む）
    merged_ensemble, merged_cols = merge_left_right_data(
        L_ens_clean, R_ens_clean, cols_left, cols_right)
    
    return {
        'participant': participant,
        'condition': condition,
        'mass': mass,
        'ensemble': merged_ensemble,
        'columns': merged_cols,
        'left_dfs': L_dfs_clean,
        'right_dfs': R_dfs_clean
    }


### 15. 全実験協力者・全条件の一括処理

In [38]:
def process_all_data():
    """全実験協力者・全条件のデータを処理"""
    all_results = []
    
    for i, participant in enumerate(PARTICIPANTS):
        mass = MASSES[i]
        
        for condition in CONDITIONS:
            try:
                result = process_single_participant_condition(participant, condition, mass)
                all_results.append(result)
                
                print(f"\n✓ 完了: {participant}_{condition}")
                print(f"  統合ストライド数: {result['ensemble'].shape[0]}")
                print(f"  特徴量数: {result['ensemble'].shape[2]}")
                
            except Exception as e:
                print(f"\n✗ エラー: {participant}_{condition}")
                print(f"  {str(e)}")
                continue
    
    return all_results

### 16. データセットの保存

In [39]:
def save_dataset(all_results, output_dir='processed_data'):
    """処理済みデータをnpz形式で保存"""
    os.makedirs(output_dir, exist_ok=True)

    # 1. 被験者名とIDの対応表を作成 (例: {'oba': 0, 'ono': 1, ...})
    # all_resultsからユニークな名前を取得してソートすることでIDを固定
    unique_participants = sorted(list(set(r['participant'] for r in all_results)))
    participant_map = {name: i for i, name in enumerate(unique_participants)}
    
    print(f"ID Map: {participant_map}")

    # 統合用リスト
    all_ensembles = []
    all_subject_ids = []  # ★ここに追加: IDを格納するリスト
    
    # 個別ファイルとして保存
    for result in all_results:
        filename = f"{result['participant']}_{result['condition']}_processed.npz"
        filepath = os.path.join(output_dir, filename)
        
        np.savez(
            filepath,
            ensemble=result['ensemble'],
            columns=result['columns'],
            participant=result['participant'],
            condition=result['condition'],
            mass=result['mass']
        )
        print(f"保存完了: {filepath}")
        
        # 統合用データの準備
        data = result['ensemble']
        name = result['participant']
        
        # ★ここが重要: データ数と同じ長さのID配列を作成
        # 例: データが337個でIDが0なら、[0, 0, ..., 0] (長さ337) を作る
        n_samples = data.shape[0]
        current_id = participant_map[name]
        ids = np.full(n_samples, current_id)
        
        all_ensembles.append(data)
        all_subject_ids.append(ids)
    
    # 全データを統合
    combined_ensemble = np.concatenate(all_ensembles, axis=0) # (N, 200, 26)
    combined_ids = np.concatenate(all_subject_ids, axis=0)    # (N, )
    
    # 保存
    combined_path = os.path.join(output_dir, 'all_data_combined.npz')
    np.savez(
        combined_path,
        ensemble=combined_ensemble,
        subject_ids=combined_ids,      # ★追加: 被験者ID配列
        columns=all_results[0]['columns'],
        id_map=participant_map         # 参考: IDと名前の対応表も保存しておくと便利
    )
    
    print(f"\\n統合データ保存完了: {combined_path}")
    print(f"総ストライド数 (ensemble): {combined_ensemble.shape}")
    print(f"総ID数 (subject_ids): {combined_ids.shape}")

### 実行

In [40]:
if __name__ == "__main__":
    # 全データ処理
    all_results = process_all_data()
    
    # データセット保存
    save_dataset(all_results)
    
    print("\n" + "="*60)
    print("全処理が完了しました")
    print("="*60)


Processing: oba - Condition: H (Mass: 60.9 kg)


KeyboardInterrupt: 