In [1]:
import os
import glob
import random
import numpy as np
import pandas as pd

# ---------------------------------------------------------------------
# 1) Strict Seeding
# ---------------------------------------------------------------------
def set_strict_seed(seed: int):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)

# ---------------------------------------------------------------------
# 2) Data Loading (RAW)
# ---------------------------------------------------------------------
def load_mhealth_dataset_raw(data_dir, target_activities_map, column_names):
    """
    Returns:
      full_dataset[subj_key][act_name] = raw_df (all columns except activity_id)
    """
    full_dataset = {}
    file_list = sorted(glob.glob(os.path.join(data_dir, "mHealth_subject*.log")))
    if not file_list:
        print(f"[Warning] No mHealth logs found in {data_dir}")
        return {}

    for file_path in file_list:
        file_name = os.path.basename(file_path)
        subj_part = file_name.split('.')[0]
        try:
            subj_id_num = int(''.join(filter(str.isdigit, subj_part)))
            subj_key = f"subject{subj_id_num}"
        except:
            subj_key = subj_part

        try:
            df = pd.read_csv(file_path, sep="\t", header=None)
            df = df.iloc[:, :len(column_names)]
            df.columns = column_names

            subj_data = {}
            for label_code, activity_name in target_activities_map.items():
                activity_df = df[df['activity_id'] == label_code].copy()
                if not activity_df.empty:
                    subj_data[activity_name] = activity_df.drop(columns=['activity_id'])
            full_dataset[subj_key] = subj_data
        except Exception as e:
            print(f"Error loading {file_name}: {e}")
            pass

    return full_dataset

def prepare_trial_list_raw(label_config, full_data, target_map, feature_map):
    """
    label_config: list of (subject, act_id, gt_count)
    Returns trials with RAW signal:
      trial["raw"] = (T,C) float32
    """
    trial_list = []
    for subj, act_id, gt_count in label_config:
        act_name = target_map.get(act_id)
        feats = feature_map.get(act_id)

        if subj in full_data and act_name in full_data[subj]:
            raw_df = full_data[subj][act_name][feats]
            raw_np = raw_df.values.astype(np.float32)

            trial_list.append({
                "raw": raw_np,               # (T,C)
                "count": float(gt_count),    # GT total count
                "subj": subj,
                "act_id": int(act_id),
                "meta": f"{subj}_{act_name}",
            })
        else:
            # silently skip
            pass
    return trial_list

# ---------------------------------------------------------------------
# 3) Integrator Baseline (alpha-only, beta=0)
# ---------------------------------------------------------------------
def compute_energy_integral_raw(x_raw: np.ndarray, fs: int, eps=1e-12) -> float:
    """
    x_raw: (T,C) raw float32
    - DC removal per-channel
    - energy integral: (1/fs) * sum_t sum_c (x_dc^2)
    """
    x = np.asarray(x_raw, dtype=np.float32)
    x = x - x.mean(axis=0, keepdims=True)
    E = float(np.sum(x * x) / max(float(fs), eps))
    return E

def fit_linear_energy_calibrator_alpha_only(train_trials, fs: int, ridge: float = 1e-8):
    """
    Fit alpha with beta=0: y ≈ alpha*E
      alpha = (E^T y) / (E^T E + ridge)
    """
    if len(train_trials) == 0:
        return 0.0

    Es, ys = [], []
    for tr in train_trials:
        E = compute_energy_integral_raw(tr["raw"], fs=fs)
        Es.append(E)
        ys.append(float(tr["count"]))

    Es = np.asarray(Es, dtype=np.float64)
    ys = np.asarray(ys, dtype=np.float64)

    num = float(np.sum(Es * ys))
    den = float(np.sum(Es * Es) + ridge)
    return float(num / den)

def baselineE2_predict_count(alpha_only: float, x_raw: np.ndarray, fs: int) -> float:
    E = compute_energy_integral_raw(x_raw, fs=fs)
    return float(alpha_only * E)

# ---------------------------------------------------------------------
# 4) Metrics
# ---------------------------------------------------------------------
def summarize_errors(preds, gts):
    preds = np.asarray(preds, dtype=np.float64)
    gts   = np.asarray(gts, dtype=np.float64)

    diffs = preds - gts
    maes  = np.abs(diffs)
    mapes = maes / (np.abs(gts) + 1e-6) * 100.0

    out = {
        "MAE_mean": float(maes.mean()),
        "MAE_std":  float(maes.std(ddof=1)) if len(maes) > 1 else 0.0,
        "MAPE_mean": float(mapes.mean()),
        "MAPE_std":  float(mapes.std(ddof=1)) if len(mapes) > 1 else 0.0,
        "Bias_mean": float(diffs.mean()),
        "Bias_std":  float(diffs.std(ddof=1)) if len(diffs) > 1 else 0.0,
        "n": int(len(preds)),
    }
    return out

# ---------------------------------------------------------------------
# 5) Main: Table VI 3 scenarios only
# ---------------------------------------------------------------------
def main():
    CONFIG = {
        "seed": 42,
        "data_dir": "/content/drive/MyDrive/Colab Notebooks/HAR_data/MHEALTHDATASET",
        "fs": 50,

        "COLUMN_NAMES": [
            'acc_chest_x', 'acc_chest_y', 'acc_chest_z',
            'ecg_1', 'ecg_2',
            'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
            'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
            'mag_ankle_x', 'mag_ankle_y', 'mag_ankle_z',
            'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
            'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z',
            'mag_arm_x', 'mag_arm_y', 'mag_arm_z',
            'activity_id'
        ],

        "TARGET_ACTIVITIES_MAP": {
            6: 'Waist bends forward',
            7: 'Frontal elevation of arms',
            8: 'Knees bending',
            10: 'Jogging',
            12: 'Jump front & back'
        },

        # 동일 입력 차원 유지용 (원 코드 그대로)
        "ACT_FEATURE_MAP": {
            6: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            7: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            8: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            10: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                 'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                 'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                 'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                 'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            12: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                 'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                 'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                 'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                 'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
        },

        # COUNT_TABLE[act_id][subject] = GT count
        "COUNT_TABLE": {
            6: {
                "subject1": 21, "subject2": 19, "subject3": 21, "subject4": 20, "subject5": 20,
                "subject6": 20, "subject7": 20, "subject8": 21, "subject9": 21, "subject10": 20,
            },
            7: {
                "subject1": 20, "subject2": 20, "subject3": 20, "subject4": 20, "subject5": 20,
                "subject6": 20, "subject7": 20, "subject8": 19, "subject9": 19, "subject10": 20,
            },
            8: {
                "subject1": 20, "subject2": 21, "subject3": 21, "subject4": 19, "subject5": 20,
                "subject6": 20, "subject7": 21, "subject8": 21, "subject9": 21, "subject10": 21,
            },
            10: {
                "subject1": 157, "subject2": 161, "subject3": 154, "subject4": 154, "subject5": 160,
                "subject6": 156, "subject7": 153, "subject8": 160, "subject9": 166, "subject10": 156,
            },
            12: {
                "subject1": 20, "subject2": 22, "subject3": 21, "subject4": 21, "subject5": 20,
                "subject6": 21, "subject7": 19, "subject8": 20, "subject9": 20, "subject10": 20,
            },
        },

        # ✅ Table VI scenarios only (Train -> Test)
        "TABLE_VI_SCENARIOS": [
            ("Success case",        7,  8),   # Frontal elevation -> Knees bending
            ("Partial failure",     6, 12),   # Waist bends -> Jump front & back
            ("Complete breakdown",  8, 10),   # Knees bending -> Jogging
        ],
    }

    set_strict_seed(CONFIG["seed"])

    # load data
    full_data = load_mhealth_dataset_raw(
        CONFIG["data_dir"],
        CONFIG["TARGET_ACTIVITIES_MAP"],
        CONFIG["COLUMN_NAMES"]
    )
    if not full_data:
        print("[ERROR] dataset load failed")
        return

    subjects = [f"subject{i}" for i in range(1, 11)]
    fs = CONFIG["fs"]

    # helper: build labels for an act_id
    def build_labels(act_id):
        labels = []
        for s in subjects:
            if act_id not in CONFIG["COUNT_TABLE"]:
                continue
            if s not in CONFIG["COUNT_TABLE"][act_id]:
                continue
            labels.append((s, act_id, CONFIG["COUNT_TABLE"][act_id][s]))
        return labels

    print("\n" + "=" * 110)
    print("Integrator baseline (Energy integral + alpha-only calibration), evaluated on Table VI scenarios")
    print("=" * 110)

    # run 3 scenarios
    for case_type, train_act, test_act in CONFIG["TABLE_VI_SCENARIOS"]:
        train_name = CONFIG["TARGET_ACTIVITIES_MAP"][train_act]
        test_name  = CONFIG["TARGET_ACTIVITIES_MAP"][test_act]

        # prepare trials
        train_trials = prepare_trial_list_raw(
            build_labels(train_act),
            full_data,
            CONFIG["TARGET_ACTIVITIES_MAP"],
            CONFIG["ACT_FEATURE_MAP"]
        )
        test_trials = prepare_trial_list_raw(
            build_labels(test_act),
            full_data,
            CONFIG["TARGET_ACTIVITIES_MAP"],
            CONFIG["ACT_FEATURE_MAP"]
        )

        if len(train_trials) == 0 or len(test_trials) == 0:
            print(f"[Skip] {train_name} -> {test_name} (missing trials)")
            continue

        # fit alpha on TRAIN (beta=0)
        alpha = fit_linear_energy_calibrator_alpha_only(train_trials, fs=fs, ridge=1e-8)

        # predict on TEST (all subjects)
        preds, gts = [], []
        for tr in test_trials:
            pred = baselineE2_predict_count(alpha, tr["raw"], fs=fs)
            gt = float(tr["count"])
            preds.append(pred)
            gts.append(gt)

        summary = summarize_errors(preds, gts)

        print(f"\n[{case_type}] {train_name} -> {test_name}")
        print(f"  alpha={alpha:.6e} | n={summary['n']}")
        print(f"  MAE (mean±std):  {summary['MAE_mean']:.3f} ± {summary['MAE_std']:.3f}")
        print(f"  MAPE% (mean±std): {summary['MAPE_mean']:.2f} ± {summary['MAPE_std']:.2f}")
        print(f"  Bias (mean±std): {summary['Bias_mean']:.3f} ± {summary['Bias_std']:.3f}")

    print("\n" + "=" * 110)


if __name__ == "__main__":
    main()



Integrator baseline (Energy integral + alpha-only calibration), evaluated on Table VI scenarios

[Success case] Frontal elevation of arms -> Knees bending
  alpha=4.348327e-03 | n=10
  MAE (mean±std):  9.991 ± 2.786
  MAPE% (mean±std): 48.56 ± 12.64
  Bias (mean±std): -9.991 ± 2.786

[Partial failure] Waist bends forward -> Jump front & back
  alpha=6.295517e-03 | n=10
  MAE (mean±std):  54.727 ± 10.184
  MAPE% (mean±std): 268.63 ± 49.40
  Bias (mean±std): 54.727 ± 10.184

[Complete breakdown] Knees bending -> Jogging
  alpha=8.060321e-03 | n=10
  MAE (mean±std):  64.715 ± 25.632
  MAPE% (mean±std): 41.18 ± 16.62
  Bias (mean±std): 64.715 ± 25.632



In [2]:
import os
import glob
import random
import numpy as np
import pandas as pd

# ---------------------------------------------------------------------
# 1) Strict Seeding
# ---------------------------------------------------------------------
def set_strict_seed(seed: int):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)

# ---------------------------------------------------------------------
# 2) Data Loading (RAW)
# ---------------------------------------------------------------------
def load_mhealth_dataset_raw(data_dir, target_activities_map, column_names):
    """
    Returns:
      full_dataset[subj_key][act_name] = raw_df (all columns except activity_id)
    """
    full_dataset = {}
    file_list = sorted(glob.glob(os.path.join(data_dir, "mHealth_subject*.log")))
    if not file_list:
        print(f"[Warning] No mHealth logs found in {data_dir}")
        return {}

    for file_path in file_list:
        file_name = os.path.basename(file_path)
        subj_part = file_name.split('.')[0]
        try:
            subj_id_num = int(''.join(filter(str.isdigit, subj_part)))
            subj_key = f"subject{subj_id_num}"
        except:
            subj_key = subj_part

        try:
            df = pd.read_csv(file_path, sep="\t", header=None)
            df = df.iloc[:, :len(column_names)]
            df.columns = column_names

            subj_data = {}
            for label_code, activity_name in target_activities_map.items():
                activity_df = df[df['activity_id'] == label_code].copy()
                if not activity_df.empty:
                    subj_data[activity_name] = activity_df.drop(columns=['activity_id'])
            full_dataset[subj_key] = subj_data
        except Exception as e:
            print(f"Error loading {file_name}: {e}")
            pass

    return full_dataset


def prepare_trial_list_raw(label_config, full_data, target_map, feature_map):
    """
    label_config: list of (subject, act_id, gt_count)
    Returns trials with RAW signal:
      trial["raw"] = (T,C) float32
    """
    trial_list = []
    for subj, act_id, gt_count in label_config:
        act_name = target_map.get(act_id)
        feats = feature_map.get(act_id)

        if subj in full_data and act_name in full_data[subj]:
            raw_df = full_data[subj][act_name][feats]
            raw_np = raw_df.values.astype(np.float32)

            trial_list.append({
                "raw": raw_np,               # (T,C)
                "count": float(gt_count),    # GT total count
                "subj": subj,
                "act_id": int(act_id),
                "meta": f"{subj}_{act_name}",
            })
        else:
            pass

    return trial_list


# ---------------------------------------------------------------------
# 3) Integrator Baseline (alpha+beta)
# ---------------------------------------------------------------------
def compute_energy_integral_raw(x_raw: np.ndarray, fs: int, eps=1e-12) -> float:
    """
    x_raw: (T,C) raw float32
    - DC removal per-channel
    - energy integral: (1/fs) * sum_t sum_c (x_dc^2)
    """
    x = np.asarray(x_raw, dtype=np.float32)
    x = x - x.mean(axis=0, keepdims=True)
    E = float(np.sum(x * x) / max(float(fs), eps))
    return E


def fit_linear_energy_calibrator_alpha_beta(train_trials, fs: int, ridge: float = 1e-8):
    """
    Fit (alpha, beta) in: y ≈ alpha*E + beta
    Closed-form ridge on normal eq:
      theta = (X^T X + ridge*I)^-1 X^T y
      where X = [E, 1]
    Returns: (alpha, beta)
    """
    if len(train_trials) == 0:
        return 0.0, 0.0

    Es, ys = [], []
    for tr in train_trials:
        E = compute_energy_integral_raw(tr["raw"], fs=fs)
        Es.append(E)
        ys.append(float(tr["count"]))

    Es = np.asarray(Es, dtype=np.float64)  # (N,)
    ys = np.asarray(ys, dtype=np.float64)  # (N,)

    # X: (N,2) = [E, 1]
    X = np.stack([Es, np.ones_like(Es)], axis=1)

    # ridge
    XtX = X.T @ X
    XtX = XtX + ridge * np.eye(2, dtype=np.float64)
    Xty = X.T @ ys

    theta = np.linalg.solve(XtX, Xty)  # (2,)
    alpha = float(theta[0])
    beta  = float(theta[1])
    return alpha, beta


def baseline_predict_count(alpha: float, beta: float, x_raw: np.ndarray, fs: int) -> float:
    E = compute_energy_integral_raw(x_raw, fs=fs)
    return float(alpha * E + beta)


# ---------------------------------------------------------------------
# 4) Metrics
# ---------------------------------------------------------------------
def summarize_errors(preds, gts):
    preds = np.asarray(preds, dtype=np.float64)
    gts   = np.asarray(gts, dtype=np.float64)

    diffs = preds - gts
    maes  = np.abs(diffs)
    mapes = maes / (np.abs(gts) + 1e-6) * 100.0

    out = {
        "MAE_mean": float(maes.mean()),
        "MAE_std":  float(maes.std(ddof=1)) if len(maes) > 1 else 0.0,
        "MAPE_mean": float(mapes.mean()),
        "MAPE_std":  float(mapes.std(ddof=1)) if len(mapes) > 1 else 0.0,
        "Bias_mean": float(diffs.mean()),
        "Bias_std":  float(diffs.std(ddof=1)) if len(diffs) > 1 else 0.0,
        "n": int(len(preds)),
    }
    return out


# ---------------------------------------------------------------------
# 5) Main: Table VI 3 scenarios only
# ---------------------------------------------------------------------
def main():
    CONFIG = {
        "seed": 42,
        "data_dir": "/content/drive/MyDrive/Colab Notebooks/HAR_data/MHEALTHDATASET",
        "fs": 50,

        "COLUMN_NAMES": [
            'acc_chest_x', 'acc_chest_y', 'acc_chest_z',
            'ecg_1', 'ecg_2',
            'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
            'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
            'mag_ankle_x', 'mag_ankle_y', 'mag_ankle_z',
            'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
            'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z',
            'mag_arm_x', 'mag_arm_y', 'mag_arm_z',
            'activity_id'
        ],

        "TARGET_ACTIVITIES_MAP": {
            6: 'Waist bends forward',
            7: 'Frontal elevation of arms',
            8: 'Knees bending',
            10: 'Jogging',
            12: 'Jump front & back'
        },

        # 동일 입력 차원 유지용
        "ACT_FEATURE_MAP": {
            6: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            7: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            8: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            10: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                 'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                 'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                 'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                 'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            12: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                 'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                 'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                 'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                 'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
        },

        "COUNT_TABLE": {
            6: {
                "subject1": 21, "subject2": 19, "subject3": 21, "subject4": 20, "subject5": 20,
                "subject6": 20, "subject7": 20, "subject8": 21, "subject9": 21, "subject10": 20,
            },
            7: {
                "subject1": 20, "subject2": 20, "subject3": 20, "subject4": 20, "subject5": 20,
                "subject6": 20, "subject7": 20, "subject8": 19, "subject9": 19, "subject10": 20,
            },
            8: {
                "subject1": 20, "subject2": 21, "subject3": 21, "subject4": 19, "subject5": 20,
                "subject6": 20, "subject7": 21, "subject8": 21, "subject9": 21, "subject10": 21,
            },
            10: {
                "subject1": 157, "subject2": 161, "subject3": 154, "subject4": 154, "subject5": 160,
                "subject6": 156, "subject7": 153, "subject8": 160, "subject9": 166, "subject10": 156,
            },
            12: {
                "subject1": 20, "subject2": 22, "subject3": 21, "subject4": 21, "subject5": 20,
                "subject6": 21, "subject7": 19, "subject8": 20, "subject9": 20, "subject10": 20,
            },
        },

        # Table VI scenarios only (Train -> Test)
        "TABLE_VI_SCENARIOS": [
            ("Success case",        7,  8),   # Frontal elevation -> Knees bending
            ("Partial failure",     6, 12),   # Waist bends -> Jump front & back
            ("Complete breakdown",  8, 10),   # Knees bending -> Jogging
        ],
    }

    set_strict_seed(CONFIG["seed"])

    full_data = load_mhealth_dataset_raw(
        CONFIG["data_dir"],
        CONFIG["TARGET_ACTIVITIES_MAP"],
        CONFIG["COLUMN_NAMES"]
    )
    if not full_data:
        print("[ERROR] dataset load failed")
        return

    subjects = [f"subject{i}" for i in range(1, 11)]
    fs = CONFIG["fs"]

    def build_labels(act_id):
        labels = []
        for s in subjects:
            if act_id not in CONFIG["COUNT_TABLE"]:
                continue
            if s not in CONFIG["COUNT_TABLE"][act_id]:
                continue
            labels.append((s, act_id, CONFIG["COUNT_TABLE"][act_id][s]))
        return labels

    print("\n" + "=" * 110)
    print("Integrator baseline (Energy integral + alpha+beta calibration), evaluated on Table VI scenarios")
    print("=" * 110)

    for case_type, train_act, test_act in CONFIG["TABLE_VI_SCENARIOS"]:
        train_name = CONFIG["TARGET_ACTIVITIES_MAP"][train_act]
        test_name  = CONFIG["TARGET_ACTIVITIES_MAP"][test_act]

        train_trials = prepare_trial_list_raw(
            build_labels(train_act),
            full_data,
            CONFIG["TARGET_ACTIVITIES_MAP"],
            CONFIG["ACT_FEATURE_MAP"]
        )
        test_trials = prepare_trial_list_raw(
            build_labels(test_act),
            full_data,
            CONFIG["TARGET_ACTIVITIES_MAP"],
            CONFIG["ACT_FEATURE_MAP"]
        )

        if len(train_trials) == 0 or len(test_trials) == 0:
            print(f"[Skip] {train_name} -> {test_name} (missing trials)")
            continue

        alpha, beta = fit_linear_energy_calibrator_alpha_beta(train_trials, fs=fs, ridge=1e-8)

        preds, gts = [], []
        for tr in test_trials:
            pred = baseline_predict_count(alpha, beta, tr["raw"], fs=fs)
            gt = float(tr["count"])
            preds.append(pred)
            gts.append(gt)

        summary = summarize_errors(preds, gts)

        print(f"\n[{case_type}] {train_name} -> {test_name}")
        print(f"  alpha={alpha:.6e} | beta={beta:.6e} | n={summary['n']}")
        print(f"  MAE (mean±std):   {summary['MAE_mean']:.3f} ± {summary['MAE_std']:.3f}")
        print(f"  MAPE% (mean±std): {summary['MAPE_mean']:.2f} ± {summary['MAPE_std']:.2f}")
        print(f"  Bias (mean±std):  {summary['Bias_mean']:.3f} ± {summary['Bias_std']:.3f}")

    print("\n" + "=" * 110)


if __name__ == "__main__":
    main()



Integrator baseline (Energy integral + alpha+beta calibration), evaluated on Table VI scenarios

[Success case] Frontal elevation of arms -> Knees bending
  alpha=-5.992391e-05 | beta=2.005494e+01 | n=10
  MAE (mean±std):   0.767 ± 0.466
  MAPE% (mean±std): 3.71 ± 2.23
  Bias (mean±std):  -0.590 ± 0.697

[Partial failure] Waist bends forward -> Jump front & back
  alpha=1.490611e-04 | beta=1.984672e+01 | n=10
  MAE (mean±std):   1.376 ± 0.552
  MAPE% (mean±std): 6.85 ± 2.99
  Bias (mean±std):  1.226 ± 0.860

[Complete breakdown] Knees bending -> Jogging
  alpha=-4.056960e-04 | beta=2.148045e+01 | n=10
  MAE (mean±std):   147.414 ± 4.025
  MAPE% (mean±std): 93.48 ± 0.78
  Bias (mean±std):  -147.414 ± 4.025



In [2]:
import os
import glob
import random
import numpy as np
import pandas as pd

from scipy.signal import find_peaks

# ---------------------------------------------------------------------
# 1) Strict Seeding
# ---------------------------------------------------------------------
def set_strict_seed(seed: int):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)


# ---------------------------------------------------------------------
# 2) Data Loading (RAW)
# ---------------------------------------------------------------------
def load_mhealth_dataset_raw(data_dir, target_activities_map, column_names):
    """
    Returns:
      full_dataset[subj_key][act_name] = raw_df (all columns except activity_id)
    """
    full_dataset = {}
    file_list = sorted(glob.glob(os.path.join(data_dir, "mHealth_subject*.log")))
    if not file_list:
        print(f"[Warning] No mHealth logs found in {data_dir}")
        return {}

    for file_path in file_list:
        file_name = os.path.basename(file_path)
        subj_part = file_name.split('.')[0]
        try:
            subj_id_num = int(''.join(filter(str.isdigit, subj_part)))
            subj_key = f"subject{subj_id_num}"
        except:
            subj_key = subj_part

        try:
            df = pd.read_csv(file_path, sep="\t", header=None)
            df = df.iloc[:, :len(column_names)]
            df.columns = column_names

            subj_data = {}
            for label_code, activity_name in target_activities_map.items():
                activity_df = df[df['activity_id'] == label_code].copy()
                if not activity_df.empty:
                    subj_data[activity_name] = activity_df.drop(columns=['activity_id'])
            full_dataset[subj_key] = subj_data
        except Exception as e:
            print(f"Error loading {file_name}: {e}")
            pass

    return full_dataset


def prepare_trial_list_raw(label_config, full_data, target_map, feature_map):
    """
    label_config: list of (subject, act_id, gt_count)
    Returns trials with RAW signal:
      trial["raw"] = (T,C) float32
    """
    trial_list = []
    for subj, act_id, gt_count in label_config:
        act_name = target_map.get(act_id)
        feats = feature_map.get(act_id)

        if subj in full_data and act_name in full_data[subj]:
            raw_df = full_data[subj][act_name][feats]
            raw_np = raw_df.values.astype(np.float32)

            trial_list.append({
                "raw": raw_np,               # (T,C)
                "count": float(gt_count),    # GT total count
                "subj": subj,
                "act_id": int(act_id),
                "meta": f"{subj}_{act_name}",
            })
        else:
            pass
    return trial_list


# ---------------------------------------------------------------------
# 3) Integrator Baseline (alpha-only, beta=0) + (alpha+beta)
# ---------------------------------------------------------------------
def compute_energy_integral_raw(x_raw: np.ndarray, fs: int, eps=1e-12) -> float:
    """
    x_raw: (T,C) raw float32
    - DC removal per-channel
    - energy integral: (1/fs) * sum_t sum_c (x_dc^2)
    """
    x = np.asarray(x_raw, dtype=np.float32)
    x = x - x.mean(axis=0, keepdims=True)
    E = float(np.sum(x * x) / max(float(fs), eps))
    return E


def fit_linear_energy_calibrator_alpha_only(train_trials, fs: int, ridge: float = 1e-8):
    """
    Fit alpha with beta=0: y ≈ alpha*E
      alpha = (E^T y) / (E^T E + ridge)
    """
    if len(train_trials) == 0:
        return 0.0

    Es, ys = [], []
    for tr in train_trials:
        E = compute_energy_integral_raw(tr["raw"], fs=fs)
        Es.append(E)
        ys.append(float(tr["count"]))

    Es = np.asarray(Es, dtype=np.float64)
    ys = np.asarray(ys, dtype=np.float64)

    num = float(np.sum(Es * ys))
    den = float(np.sum(Es * Es) + ridge)
    return float(num / den)


def baselineE2_predict_count_alpha_only(alpha_only: float, x_raw: np.ndarray, fs: int) -> float:
    E = compute_energy_integral_raw(x_raw, fs=fs)
    return float(alpha_only * E)


def fit_linear_energy_calibrator_alpha_beta(train_trials, fs: int, ridge: float = 1e-8):
    """
    Fit alpha,beta: y ≈ alpha*E + beta  (ridge on alpha only)
    Closed-form via normal equations for 2 params.
    """
    if len(train_trials) == 0:
        return 0.0, 0.0

    Es, ys = [], []
    for tr in train_trials:
        E = compute_energy_integral_raw(tr["raw"], fs=fs)
        Es.append(E)
        ys.append(float(tr["count"]))

    E = np.asarray(Es, dtype=np.float64)
    y = np.asarray(ys, dtype=np.float64)

    # Solve:
    # [sum(E^2)+ridge, sum(E)] [alpha] = [sum(Ey)]
    # [sum(E),          n   ] [beta ]   [sum(y)]
    sEE = float(np.sum(E * E) + ridge)
    sE  = float(np.sum(E))
    sEy = float(np.sum(E * y))
    sy  = float(np.sum(y))
    n   = float(len(y))

    A = np.array([[sEE, sE],
                  [sE,  n ]], dtype=np.float64)
    b = np.array([sEy, sy], dtype=np.float64)

    sol = np.linalg.solve(A, b)
    alpha, beta = float(sol[0]), float(sol[1])
    return alpha, beta


def baselineE2_predict_count_alpha_beta(alpha: float, beta: float, x_raw: np.ndarray, fs: int) -> float:
    E = compute_energy_integral_raw(x_raw, fs=fs)
    return float(alpha * E + beta)


# ---------------------------------------------------------------------
# 4) Peak Counter Baseline (train-only global threshold)
# ---------------------------------------------------------------------
def _scalar_magnitude(x_raw: np.ndarray) -> np.ndarray:
    """
    x_raw: (T,C) -> scalar magnitude (T,)
    Uses L2 norm across channels after per-channel mean removal.
    """
    x = np.asarray(x_raw, dtype=np.float32)
    x = x - x.mean(axis=0, keepdims=True)
    mag = np.sqrt(np.sum(x * x, axis=1) + 1e-12)
    return mag.astype(np.float32)


def _smooth_gaussian_1d_np(x: np.ndarray, sigma: float) -> np.ndarray:
    """
    Simple gaussian smoothing without scipy.ndimage dependency.
    sigma in samples.
    """
    x = np.asarray(x, dtype=np.float32)
    if sigma <= 0:
        return x

    # kernel size ~ 6*sigma
    radius = int(max(1, round(3.0 * sigma)))
    kx = np.arange(-radius, radius + 1, dtype=np.float32)
    kernel = np.exp(-(kx * kx) / (2.0 * float(sigma) * float(sigma)))
    kernel /= np.sum(kernel) + 1e-12

    # reflect pad conv
    xp = np.pad(x, (radius, radius), mode="reflect")
    y = np.convolve(xp, kernel, mode="valid")
    return y.astype(np.float32)


def fit_peak_threshold_global(train_trials, fs: int, smooth_sigma_sec=0.12, thr_frac=0.35):
    """
    Global peak height threshold determined from TRAIN trials:
      thr = thr_frac * median( max(smoothed_mag) over trials )
    """
    if len(train_trials) == 0:
        return 0.0

    sigma = float(smooth_sigma_sec) * float(fs)
    max_vals = []
    for tr in train_trials:
        mag = _scalar_magnitude(tr["raw"])
        mag_s = _smooth_gaussian_1d_np(mag, sigma=sigma)
        max_vals.append(float(np.max(mag_s)))

    med_max = float(np.median(np.asarray(max_vals, dtype=np.float64)))
    thr = float(thr_frac * med_max)
    return thr


def peak_counter_predict_count(x_raw: np.ndarray, fs: int, height_thr: float,
                               smooth_sigma_sec=0.12, min_dist_sec=0.25,
                               prominence_frac=0.10):
    """
    Count peaks on smoothed magnitude:
      - height: global threshold
      - distance: min_dist_sec
      - prominence: prominence_frac * height_thr (stabilize)
    """
    mag = _scalar_magnitude(x_raw)
    sigma = float(smooth_sigma_sec) * float(fs)
    mag_s = _smooth_gaussian_1d_np(mag, sigma=sigma)

    distance = int(max(1, round(float(min_dist_sec) * float(fs))))
    prominence = float(prominence_frac) * float(max(height_thr, 1e-6))

    peaks, _ = find_peaks(mag_s, height=height_thr, distance=distance, prominence=prominence)
    return float(len(peaks))


# ---------------------------------------------------------------------
# 5) FFT Counter Baseline (dominant frequency)
# ---------------------------------------------------------------------
def fft_counter_predict_count(x_raw: np.ndarray, fs: int, fmin=0.3, fmax=5.0,
                              smooth_sigma_sec=0.12):
    """
    Estimate reps via dominant frequency:
      mag -> smooth -> FFT -> pick peak freq in [fmin,fmax]
      count_hat = f_peak * duration
    """
    mag = _scalar_magnitude(x_raw)
    sigma = float(smooth_sigma_sec) * float(fs)
    mag_s = _smooth_gaussian_1d_np(mag, sigma=sigma)

    T = int(mag_s.shape[0])
    if T < 8:
        return 0.0

    dur = float(T) / float(fs)

    # Hann window
    w = np.hanning(T).astype(np.float32)
    xw = (mag_s - float(np.mean(mag_s))) * w

    # rFFT
    X = np.fft.rfft(xw.astype(np.float64))
    freqs = np.fft.rfftfreq(T, d=1.0 / float(fs))
    power = (X.real * X.real + X.imag * X.imag)

    # band mask
    mask = (freqs >= float(fmin)) & (freqs <= float(fmax))
    if not np.any(mask):
        return 0.0

    freqs_b = freqs[mask]
    power_b = power[mask]
    idx = int(np.argmax(power_b))
    f_peak = float(freqs_b[idx])

    count_hat = f_peak * dur
    return float(count_hat)


# ---------------------------------------------------------------------
# 6) Metrics
# ---------------------------------------------------------------------
def summarize_errors(preds, gts):
    preds = np.asarray(preds, dtype=np.float64)
    gts   = np.asarray(gts, dtype=np.float64)

    diffs = preds - gts
    maes  = np.abs(diffs)
    mapes = maes / (np.abs(gts) + 1e-6) * 100.0

    out = {
        "MAE_mean": float(maes.mean()),
        "MAE_std":  float(maes.std(ddof=1)) if len(maes) > 1 else 0.0,
        "MAPE_mean": float(mapes.mean()),
        "MAPE_std":  float(mapes.std(ddof=1)) if len(mapes) > 1 else 0.0,
        "Bias_mean": float(diffs.mean()),
        "Bias_std":  float(diffs.std(ddof=1)) if len(diffs) > 1 else 0.0,
        "n": int(len(preds)),
    }
    return out


def print_summary_block(method_name, summary, extra_line=None, indent="  "):
    if extra_line is not None:
        print(f"{indent}{extra_line}")
    print(f"{indent}{method_name} | n={summary['n']}")
    print(f"{indent}MAE (mean±std):   {summary['MAE_mean']:.3f} ± {summary['MAE_std']:.3f}")
    print(f"{indent}MAPE% (mean±std): {summary['MAPE_mean']:.2f} ± {summary['MAPE_std']:.2f}")
    print(f"{indent}Bias (mean±std):  {summary['Bias_mean']:.3f} ± {summary['Bias_std']:.3f}")


# ---------------------------------------------------------------------
# 7) Main: Table VI 3 scenarios only
# ---------------------------------------------------------------------
def main():
    CONFIG = {
        "seed": 42,
        "data_dir": "/content/drive/MyDrive/Colab Notebooks/HAR_data/MHEALTHDATASET",
        "fs": 50,

        "COLUMN_NAMES": [
            'acc_chest_x', 'acc_chest_y', 'acc_chest_z',
            'ecg_1', 'ecg_2',
            'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
            'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
            'mag_ankle_x', 'mag_ankle_y', 'mag_ankle_z',
            'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
            'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z',
            'mag_arm_x', 'mag_arm_y', 'mag_arm_z',
            'activity_id'
        ],

        "TARGET_ACTIVITIES_MAP": {
            6: 'Waist bends forward',
            7: 'Frontal elevation of arms',
            8: 'Knees bending',
            10: 'Jogging',
            12: 'Jump front & back'
        },

        "ACT_FEATURE_MAP": {
            6: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            7: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            8: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            10: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                 'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                 'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                 'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                 'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
            12: ['acc_chest_x', 'acc_chest_y', 'acc_chest_z',
                 'acc_ankle_x', 'acc_ankle_y', 'acc_ankle_z',
                 'gyro_ankle_x', 'gyro_ankle_y', 'gyro_ankle_z',
                 'acc_arm_x', 'acc_arm_y', 'acc_arm_z',
                 'gyro_arm_x', 'gyro_arm_y', 'gyro_arm_z'],
        },

        "COUNT_TABLE": {
            6: {
                "subject1": 21, "subject2": 19, "subject3": 21, "subject4": 20, "subject5": 20,
                "subject6": 20, "subject7": 20, "subject8": 21, "subject9": 21, "subject10": 20,
            },
            7: {
                "subject1": 20, "subject2": 20, "subject3": 20, "subject4": 20, "subject5": 20,
                "subject6": 20, "subject7": 20, "subject8": 19, "subject9": 19, "subject10": 20,
            },
            8: {
                "subject1": 20, "subject2": 21, "subject3": 21, "subject4": 19, "subject5": 20,
                "subject6": 20, "subject7": 21, "subject8": 21, "subject9": 21, "subject10": 21,
            },
            10: {
                "subject1": 157, "subject2": 161, "subject3": 154, "subject4": 154, "subject5": 160,
                "subject6": 156, "subject7": 153, "subject8": 160, "subject9": 166, "subject10": 156,
            },
            12: {
                "subject1": 20, "subject2": 22, "subject3": 21, "subject4": 21, "subject5": 20,
                "subject6": 21, "subject7": 19, "subject8": 20, "subject9": 20, "subject10": 20,
            },
        },

        # Table VI scenarios only (Train -> Test)
        "TABLE_VI_SCENARIOS": [
            ("Success case",        7,  8),   # Frontal elevation -> Knees bending
            ("Partial failure",     6, 12),   # Waist bends -> Jump front & back
            ("Complete breakdown",  8, 10),   # Knees bending -> Jogging
        ],

        # Peak / FFT hyperparams (fixed, no per-activity tuning)
        "PEAK_smooth_sigma_sec": 0.12,
        "PEAK_thr_frac": 0.35,
        "PEAK_min_dist_sec": 0.25,
        "PEAK_prom_frac": 0.10,

        "FFT_fmin": 0.3,
        "FFT_fmax": 5.0,
        "FFT_smooth_sigma_sec": 0.12,
    }

    set_strict_seed(CONFIG["seed"])

    full_data = load_mhealth_dataset_raw(
        CONFIG["data_dir"],
        CONFIG["TARGET_ACTIVITIES_MAP"],
        CONFIG["COLUMN_NAMES"]
    )
    if not full_data:
        print("[ERROR] dataset load failed")
        return

    subjects = [f"subject{i}" for i in range(1, 11)]
    fs = CONFIG["fs"]

    def build_labels(act_id):
        labels = []
        for s in subjects:
            if act_id not in CONFIG["COUNT_TABLE"]:
                continue
            if s not in CONFIG["COUNT_TABLE"][act_id]:
                continue
            labels.append((s, act_id, CONFIG["COUNT_TABLE"][act_id][s]))
        return labels

    print("\n" + "=" * 110)
    print("Classic baselines (Integrator / Peak / FFT), evaluated on Table VI 3 scenarios")
    print("=" * 110)

    for case_type, train_act, test_act in CONFIG["TABLE_VI_SCENARIOS"]:
        train_name = CONFIG["TARGET_ACTIVITIES_MAP"][train_act]
        test_name  = CONFIG["TARGET_ACTIVITIES_MAP"][test_act]

        train_trials = prepare_trial_list_raw(
            build_labels(train_act),
            full_data,
            CONFIG["TARGET_ACTIVITIES_MAP"],
            CONFIG["ACT_FEATURE_MAP"]
        )
        test_trials = prepare_trial_list_raw(
            build_labels(test_act),
            full_data,
            CONFIG["TARGET_ACTIVITIES_MAP"],
            CONFIG["ACT_FEATURE_MAP"]
        )

        if len(train_trials) == 0 or len(test_trials) == 0:
            print(f"[Skip] {train_name} -> {test_name} (missing trials)")
            continue

        # ---- Integrator calibration on TRAIN
        alpha_only = fit_linear_energy_calibrator_alpha_only(train_trials, fs=fs, ridge=1e-8)
        alpha_ab, beta_ab = fit_linear_energy_calibrator_alpha_beta(train_trials, fs=fs, ridge=1e-8)

        # ---- Peak threshold on TRAIN (global)
        peak_thr = fit_peak_threshold_global(
            train_trials, fs=fs,
            smooth_sigma_sec=CONFIG["PEAK_smooth_sigma_sec"],
            thr_frac=CONFIG["PEAK_thr_frac"]
        )

        # ---- Evaluate on TEST (all subjects)
        preds_int_a, preds_int_ab, preds_peak, preds_fft, gts = [], [], [], [], []
        for tr in test_trials:
            x_raw = tr["raw"]
            gt = float(tr["count"])

            preds_int_a.append(baselineE2_predict_count_alpha_only(alpha_only, x_raw, fs=fs))
            preds_int_ab.append(baselineE2_predict_count_alpha_beta(alpha_ab, beta_ab, x_raw, fs=fs))

            preds_peak.append(
                peak_counter_predict_count(
                    x_raw, fs=fs, height_thr=peak_thr,
                    smooth_sigma_sec=CONFIG["PEAK_smooth_sigma_sec"],
                    min_dist_sec=CONFIG["PEAK_min_dist_sec"],
                    prominence_frac=CONFIG["PEAK_prom_frac"]
                )
            )

            preds_fft.append(
                fft_counter_predict_count(
                    x_raw, fs=fs,
                    fmin=CONFIG["FFT_fmin"], fmax=CONFIG["FFT_fmax"],
                    smooth_sigma_sec=CONFIG["FFT_smooth_sigma_sec"]
                )
            )

            gts.append(gt)

        s_int_a  = summarize_errors(preds_int_a,  gts)
        s_int_ab = summarize_errors(preds_int_ab, gts)
        s_peak   = summarize_errors(preds_peak,   gts)
        s_fft    = summarize_errors(preds_fft,    gts)

        print(f"\n[{case_type}] {train_name} -> {test_name}")
        print(f"  Train-calibration stats: alpha_only={alpha_only:.6e} | alpha={alpha_ab:.6e}, beta={beta_ab:.6e} | peak_thr={peak_thr:.6e}")
        print_summary_block("Integrator (alpha-only)", s_int_a,  indent="  ")
        print_summary_block("Integrator (alpha+beta)", s_int_ab, indent="  ")
        print_summary_block("Peak Counter",            s_peak,   indent="  ")
        print_summary_block("FFT Counter",             s_fft,    indent="  ")

    print("\n" + "=" * 110)


if __name__ == "__main__":
    main()



Classic baselines (Integrator / Peak / FFT), evaluated on Table VI 3 scenarios

[Success case] Frontal elevation of arms -> Knees bending
  Train-calibration stats: alpha_only=4.348327e-03 | alpha=-5.992398e-05, beta=2.005494e+01 | peak_thr=4.373187e+00
  Integrator (alpha-only) | n=10
  MAE (mean±std):   9.991 ± 2.786
  MAPE% (mean±std): 48.56 ± 12.64
  Bias (mean±std):  -9.991 ± 2.786
  Integrator (alpha+beta) | n=10
  MAE (mean±std):   0.767 ± 0.466
  MAPE% (mean±std): 3.71 ± 2.23
  Bias (mean±std):  -0.590 ± 0.697
  Peak Counter | n=10
  MAE (mean±std):   23.900 ± 7.047
  MAPE% (mean±std): 117.49 ± 38.35
  Bias (mean±std):  23.900 ± 7.047
  FFT Counter | n=10
  MAE (mean±std):   16.900 ± 5.763
  MAPE% (mean±std): 82.24 ± 27.83
  Bias (mean±std):  16.700 ± 6.378

[Partial failure] Waist bends forward -> Jump front & back
  Train-calibration stats: alpha_only=6.295517e-03 | alpha=1.490610e-04, beta=1.984672e+01 | peak_thr=3.656669e+00
  Integrator (alpha-only) | n=10
  MAE (mean±std