In [28]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import find_peaks, savgol_filter, peak_widths
import os

In [29]:
DATA_DIR = '/content/drive/MyDrive/Colab Notebooks/HAR_Research_Project/transition/data/MHEALTHDATASET'
BASE_OUTPUT_DIR = '/content/drive/MyDrive/Colab Notebooks/HAR_Research_Project/transition/results'
TODAY_DATE = "20251206"
FS = 50

In [30]:
# 전체 Activity 정의
# is_repetitive: True (반복 동작 - 피크 검출 수행), False (일반 동작 - 파형만 관찰)
ALL_ACTIVITY_CONFIG = {
    1: {'name': 'Standing still', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},
    2: {'name': 'Sitting and relaxing', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},
    3: {'name': 'Lying down', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},
    4: {'name': 'Walking', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},
    5: {'name': 'Climbing stairs', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},

    # --- 반복 동작 (GT 생성 대상) ---
    6: {'name': 'Waist bends forward', 'sensors': ['acx', 'acy', 'acz'], 'is_repetitive': True, # Chest
        'min_dist_sec': 2.0, 'prominence_scale': 0.5, 'smooth_window': 51, 'rel_height': 0.6},

    7: {'name': 'Frontal elevation of arms', 'sensors': ['arx', 'ary', 'arz'], 'is_repetitive': True, # R-Arm
        'min_dist_sec': 2.0, 'prominence_scale': 0.3, 'smooth_window': 51, 'rel_height': 0.6},

    8: {'name': 'Knees bending (Crouching)', 'sensors': ['glx', 'gly', 'glz'], 'is_repetitive': True, # L-Ankle (Gyro)
        'min_dist_sec': 2.0, 'prominence_scale': 0.5, 'smooth_window': 51, 'rel_height': 0.6},

    9: {'name': 'Cycling', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},
    10: {'name': 'Jogging', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},
    11: {'name': 'Running', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': False},

    12: {'name': 'Jump front & back', 'sensors': ['alx', 'aly', 'alz'], 'is_repetitive': True, # L-Ankle
         'min_dist_sec': 0.65, 'prominence_scale': 1.5, 'smooth_window': 15, 'rel_height': 0.6}
}

In [31]:
def load_mhealth_logs(data_dir):
    column_names = [
        'acx', 'acy', 'acz', 'ecg1', 'ecg2',
        'alx', 'aly', 'alz', 'glx', 'gly', 'glz', 'mlx', 'mly', 'mlz',
        'arx', 'ary', 'arz', 'grx', 'gry', 'grz', 'mrx', 'mry', 'mrz',
        'Activity'
    ]
    all_data = []
    print(f"Loading data from {data_dir}...")

    # mHealth_subject1.log ~ 10.log
    for subject_id in range(1, 11):
        file_name = f"mHealth_subject{subject_id}.log"
        file_path = os.path.join(data_dir, file_name)

        if not os.path.exists(file_path):
            continue

        try:
            df = pd.read_csv(file_path, header=None, sep='\s+')
            if df.shape[1] != 24:
                print(f"Skipping {file_name}: Invalid columns {df.shape[1]}")
                continue

            df.columns = column_names
            df['subject'] = subject_id
            all_data.append(df)

        except Exception as e:
            print(f"Error reading {file_name}: {e}")

    if not all_data:
        print("데이터를 찾을 수 없습니다.")
        return None

    final_df = pd.concat(all_data, ignore_index=True)
    print(f"총 {len(final_df)} 샘플 로드 완료.")
    return final_df

  df = pd.read_csv(file_path, header=None, sep='\s+')


In [32]:
def get_sensor_pairs(config_sensors):
    """
    설정된 센서 리스트(예: ['glx',...])를 보고 부위(Body Part)를 파악한 뒤,
    해당 부위의 Acc와 Gyro 컬럼명을 모두 반환함.
    """
    ref = config_sensors[0] # 기준 센서

    acc_cols = []
    gyro_cols = []
    body_part = "Unknown"

    # Chest Sensor
    if 'ac' in ref:
        body_part = "Chest"
        acc_cols = ['acx', 'acy', 'acz']
        gyro_cols = [] # mHealth Chest에는 Gyro 없음

    # Left Ankle Sensor
    elif 'al' in ref or 'gl' in ref:
        body_part = "Left Ankle"
        acc_cols = ['alx', 'aly', 'alz']
        gyro_cols = ['glx', 'gly', 'glz']

    # Right Arm Sensor
    elif 'ar' in ref or 'gr' in ref:
        body_part = "Right Arm"
        acc_cols = ['arx', 'ary', 'arz']
        gyro_cols = ['grx', 'gry', 'grz']

    return body_part, acc_cols, gyro_cols

In [33]:
def visualize_all_activities_dual(df):
    for act_id, config in ALL_ACTIVITY_CONFIG.items():
        print(f"\nProcessing Activity [{act_id}]: {config['name']}...")

        act_df = df[df['Activity'] == act_id].copy()
        if act_df.empty:
            print("  - No data found.")
            continue

        subjects = act_df['subject'].unique()

        # 캔버스 설정: (Subject 수 * 2) 행
        # 각 Subject마다 위는 Acc, 아래는 Gyro
        fig, axes = plt.subplots(len(subjects) * 2, 1, figsize=(12, 4 * len(subjects)), sharex=True)
        # Subject가 1명일 경우 axes가 배열이 아닐 수 있어서 리스트로 변환
        if len(subjects) == 1: axes = [axes[0], axes[1]]

        fig.suptitle(f"Activity {act_id}: {config['name']} (Acc vs Gyro)", fontsize=16, y=1.01, fontweight='bold')

        # 해당 Activity에 사용된 부위(Body Part)의 모든 센서 가져오기
        body_part, acc_cols, gyro_cols = get_sensor_pairs(config['sensors'])

        for i, sub in enumerate(subjects):
            sub_data = act_df[act_df['subject'] == sub].reset_index(drop=True)

            # Axes 인덱싱 (2개씩 짝지어서 사용)
            ax_acc = axes[i * 2]
            ax_gyro = axes[i * 2 + 1]

            # ------------------------------------------------
            # [Plot 1] Accelerometer
            # ------------------------------------------------
            if acc_cols:
                # Magnitude 계산
                acc_mag = np.sqrt(sub_data[acc_cols[0]]**2 + sub_data[acc_cols[1]]**2 + sub_data[acc_cols[2]]**2)

                # Raw Signal (배경)
                ax_acc.plot(sub_data.index, acc_mag, color='black', alpha=0.5, linewidth=0.8, label='Raw Mag')

                # 반복 동작이면 Peak Detection 결과 표시
                if config['is_repetitive']:
                    smooth_win = config.get('smooth_window', 51)
                    smooth_signal = savgol_filter(acc_mag, window_length=smooth_win, polyorder=3)

                    # Config 파라미터 적용
                    prominence = config['prominence_scale'] * np.std(smooth_signal)
                    distance = int(config['min_dist_sec'] * FS)

                    peaks, _ = find_peaks(smooth_signal, distance=distance, prominence=prominence)

                    ax_acc.plot(sub_data.index, smooth_signal, color='blue', linewidth=1.5, label='Smoothed')
                    ax_acc.plot(peaks, smooth_signal[peaks], "x", color='red', markersize=8)
                    ax_acc.set_title(f"S{sub} - {body_part} Acc ({len(peaks)} peaks)", fontsize=11, fontweight='bold')
                else:
                    ax_acc.set_title(f"S{sub} - {body_part} Acc (Continuous)", fontsize=11, fontweight='bold')

                ax_acc.grid(True, alpha=0.3)
                if i == 0: ax_acc.legend(loc='upper right', fontsize='small')

            # ------------------------------------------------
            # [Plot 2] Gyroscope
            # ------------------------------------------------
            if gyro_cols:
                # Magnitude 계산
                gyro_mag = np.sqrt(sub_data[gyro_cols[0]]**2 + sub_data[gyro_cols[1]]**2 + sub_data[gyro_cols[2]]**2)

                ax_gyro.plot(sub_data.index, gyro_mag, color='gray', alpha=0.5, linewidth=0.8, label='Raw Mag')

                # 반복 동작이면 Peak Detection 결과 표시 (비교용)
                if config['is_repetitive']:
                    smooth_win = config.get('smooth_window', 51)
                    smooth_signal_g = savgol_filter(gyro_mag, window_length=smooth_win, polyorder=3)

                    # Gyro 기준 Prominence 재계산
                    prominence_g = config['prominence_scale'] * np.std(smooth_signal_g)
                    distance = int(config['min_dist_sec'] * FS)

                    peaks_g, _ = find_peaks(smooth_signal_g, distance=distance, prominence=prominence_g)

                    ax_gyro.plot(sub_data.index, smooth_signal_g, color='purple', linewidth=1.5, label='Smoothed')
                    ax_gyro.plot(peaks_g, smooth_signal_g[peaks_g], "x", color='orange', markersize=8)
                    ax_gyro.set_title(f"S{sub} - {body_part} Gyro ({len(peaks_g)} peaks)", fontsize=11)
                else:
                    ax_gyro.set_title(f"S{sub} - {body_part} Gyro (Continuous)", fontsize=11)

                ax_gyro.grid(True, alpha=0.3)
                if i == 0: ax_gyro.legend(loc='upper right', fontsize='small')

            else:
                # Gyro 데이터가 없는 경우 (Chest Sensor 등)
                ax_gyro.text(0.5, 0.5, "Gyroscope Data Not Available\n(Sensor not attached or missing)",
                             ha='center', va='center', fontsize=12, color='red', alpha=0.6)
                ax_gyro.set_title(f"S{sub} - {body_part} Gyro", fontsize=11)
                ax_gyro.grid(True, alpha=0.3)

        plt.tight_layout()
        # --- 파일 저장 로직 ---
        # 파일명 공백/특수문자 제거
        safe_name = config['name'].replace(' ', '_').replace('/', '_').replace('&', 'and')
        file_name = f"Act{act_id}_{safe_name}_{TODAY_DATE}.png"
        save_path = os.path.join(BASE_OUTPUT_DIR, file_name)

        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"   Saved: {file_name}")
        plt.show() # 화면에도 출력
        plt.close() # 메모리 해제

In [34]:
if __name__ == "__main__":
    # 데이터 로드
    df = load_mhealth_logs(DATA_DIR)

    if df is not None:
        # 전체 시각화 실행
        visualize_all_activities_dual(df)

Output hidden; open in https://colab.research.google.com to view.