<a href="https://colab.research.google.com/github/Heoyuna0819/machine_learning/blob/main/Mhealth_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
import os
import pandas as pd

# 데이터셋 경로 설정
DATA_DIR = "/content/drive/MyDrive/MHEALTHDATASET"

# UCI MHEALTH dataset 컬럼 정의
COLS = [
    "chest_acc_x","chest_acc_y","chest_acc_z",
    "ecg_1","ecg_2",
    "ankle_acc_x","ankle_acc_y","ankle_acc_z",
    "ankle_gyro_x","ankle_gyro_y","ankle_gyro_z",
    "ankle_mag_x","ankle_mag_y","ankle_mag_z",
    "arm_acc_x","arm_acc_y","arm_acc_z",
    "arm_gyro_x","arm_gyro_y","arm_gyro_z",
    "arm_mag_x","arm_mag_y","arm_mag_z",
    "label"
]

# 모든 subject 데이터 읽어서 합치기
dfs = []
for sid in range(1, 11):  # subject 1~10
    file_path = os.path.join(DATA_DIR, f"mHealth_subject{sid}.log")
    df = pd.read_csv(file_path, sep="\t", header=None, names=COLS)
    df["subject"] = sid   # subject 번호 추가
    dfs.append(df)

# 하나의 DataFrame으로 합치기
full_df = pd.concat(dfs, ignore_index=True)

# Null 클래스(라벨=0)는 제거
full_df = full_df[full_df["label"] != 0].reset_index(drop=True)

print(full_df.shape)
print(full_df.head())

(343195, 25)
   chest_acc_x  chest_acc_y  chest_acc_z     ecg_1     ecg_2  ankle_acc_x  \
0      -9.7788      0.55690      1.19750  0.008373 -0.033490       2.6493   
1      -9.7733      0.27880      0.73036 -0.025118 -0.025118       2.4157   
2      -9.8609      0.11561      0.79988  0.025118  0.016745       2.3865   
3      -9.7409      0.17652      0.88957  0.180010  0.129770       2.3758   
4      -9.7821      0.21637      0.90368  0.092098  0.046049       2.3239   

   ankle_acc_y  ankle_acc_z  ankle_gyro_x  ankle_gyro_y  ...  arm_acc_y  \
0      -9.4517      0.37683      -0.20965      -0.88931  ...    -9.0618   
1      -9.5306      0.40179      -0.20965      -0.88931  ...    -9.2048   
2      -9.5991      0.48141      -0.20037      -0.86867  ...    -9.1945   
3      -9.5997      0.42919      -0.20037      -0.86867  ...    -9.1746   
4      -9.5406      0.40038      -0.20037      -0.86867  ...    -9.2039   

   arm_acc_z  arm_gyro_x  arm_gyro_y  arm_gyro_z  arm_mag_x  arm_mag_y  \

In [14]:
import numpy as np

# 윈도우 크기 & stride
FS = 50          # 주파수 50Hz
WIN = 2 * FS     # 2초 = 100 샘플
STRIDE = WIN // 2  # 절반 겹치기 = 50 샘플

# feature 컬럼 (라벨과 subject 제외)
FEATURE_COLS = [c for c in full_df.columns if c not in ["label", "subject"]]

def make_windows_by_subject(df, min_ratio=0.0):
    Xs, ys, subs = [], [], []

    # subject별로 안전하게 자르기
    for sid, part in df.groupby("subject", sort=True):
        arr = part[FEATURE_COLS].values.astype(np.float32)  # (N_s, 23)
        labels = part["label"].values.astype(np.int32)      # (N_s,)
        n = len(part)
        i = 0
        while i + WIN <= n:
            w_labels = labels[i:i+WIN]
            w_vals = arr[i:i+WIN]

            # 최빈값(label)과 비율
            binc = np.bincount(w_labels)
            maj = np.argmax(binc)
            maj_ratio = binc[maj] / WIN

            if maj_ratio >= min_ratio:
                Xs.append(w_vals)
                ys.append(maj)
                subs.append(sid)

            i += STRIDE

    X = np.array(Xs)             # (num_windows, 100, 23)
    y = np.array(ys, dtype=int)  # (num_windows,)
    s = np.array(subs, dtype=int)
    return X, y, s

# 사용 예시: 순도 조건 없이 모두 채택
X_all, y_all, s_all = make_windows_by_subject(full_df, min_ratio=0.0)

print("X_all shape:", X_all.shape)
print("y_all shape:", y_all.shape)
print("subject unique:", np.unique(s_all))
# 라벨 1~12 분포
counts = np.bincount(y_all)
print("라벨 분포(1~12):", counts[1:13])


X_all shape: (6849, 100, 23)
y_all shape: (6849,)
subject unique: [ 1  2  3  4  5  6  7  8  9 10]
라벨 분포(1~12): [610 615 615 614 608 567 587 587 614 614 616 202]


In [15]:
import numpy as np

# 5개 그룹
pairs = [(1,2), (3,4), (5,6), (7,8), (9,10)]

def split_by_subject(X, y, s, test_pair):
    """
    X: 윈도우 입력 데이터 (shape: [N, T, D])
    y: 라벨 (shape: [N])
    s: subject 번호 (shape: [N])
    test_pair: (예: (1,2)) 테스트 subject 번호 쌍
    """
    test_mask = np.isin(s, test_pair)   # 테스트셋에 해당하는 subject만 True
    X_train, y_train = X[~test_mask], y[~test_mask]
    X_test,  y_test  = X[test_mask],  y[test_mask]
    return X_train, y_train, X_test, y_test

# 다섯 fold 순환
for i, pair in enumerate(pairs, 1):
    X_tr, y_tr, X_te, y_te = split_by_subject(X_all, y_all, s_all, pair)
    print(f"Fold {i} | Test subjects={pair}")
    print("  Train set:", X_tr.shape, y_tr.shape)
    print("  Test set :", X_te.shape, y_te.shape)


Fold 1 | Test subjects=(1, 2)
  Train set: (5438, 100, 23) (5438,)
  Test set : (1411, 100, 23) (1411,)
Fold 2 | Test subjects=(3, 4)
  Train set: (5438, 100, 23) (5438,)
  Test set : (1411, 100, 23) (1411,)
Fold 3 | Test subjects=(5, 6)
  Train set: (5529, 100, 23) (5529,)
  Test set : (1320, 100, 23) (1320,)
Fold 4 | Test subjects=(7, 8)
  Train set: (5500, 100, 23) (5500,)
  Test set : (1349, 100, 23) (1349,)
Fold 5 | Test subjects=(9, 10)
  Train set: (5491, 100, 23) (5491,)
  Test set : (1358, 100, 23) (1358,)


In [16]:
import numpy as np

# 5개 그룹
pairs = [(1,2), (3,4), (5,6), (7,8), (9,10)]

def split_by_subject(X, y, s, test_pair):
    test_mask = np.isin(s, test_pair)
    X_train, y_train = X[~test_mask], y[~test_mask]
    X_test,  y_test  = X[test_mask],  y[test_mask]
    return X_train, y_train, X_test, y_test

def standardize(train_X, test_X):

    # 훈련 데이터 기준으로 mean/std 계산
    mean = train_X.mean(axis=(0,1), keepdims=True)
    std = train_X.std(axis=(0,1), keepdims=True)
    std[std == 0] = 1.0

    # 표준화 적용
    train_X = (train_X - mean) / std
    test_X  = (test_X  - mean) / std
    return train_X, test_X, mean, std

# 5-Fold 교차검증용 루프
for i, pair in enumerate(pairs, 1):
    X_tr, y_tr, X_te, y_te = split_by_subject(X_all, y_all, s_all, pair)
    X_tr, X_te, mean, std = standardize(X_tr, X_te)

    print(f"Fold {i} | Test subjects={pair}")
    print("  Train set:", X_tr.shape, y_tr.shape)
    print("  Test set :", X_te.shape, y_te.shape)
    print("  평균:", np.round(mean.mean(), 3), " | 표준편차:", np.round(std.mean(), 3))
    print("-" * 50)


Fold 1 | Test subjects=(1, 2)
  Train set: (5438, 100, 23) (5438,)
  Test set : (1411, 100, 23) (1411,)
  평균: -1.097  | 표준편차: 14.805
--------------------------------------------------
Fold 2 | Test subjects=(3, 4)
  Train set: (5438, 100, 23) (5438,)
  Test set : (1411, 100, 23) (1411,)
  평균: -1.105  | 표준편차: 14.749
--------------------------------------------------
Fold 3 | Test subjects=(5, 6)
  Train set: (5529, 100, 23) (5529,)
  Test set : (1320, 100, 23) (1320,)
  평균: -1.076  | 표준편차: 14.514
--------------------------------------------------
Fold 4 | Test subjects=(7, 8)
  Train set: (5500, 100, 23) (5500,)
  Test set : (1349, 100, 23) (1349,)
  평균: -1.137  | 표준편차: 14.941
--------------------------------------------------
Fold 5 | Test subjects=(9, 10)
  Train set: (5491, 100, 23) (5491,)
  Test set : (1358, 100, 23) (1358,)
  평균: -1.045  | 표준편차: 14.942
--------------------------------------------------


In [17]:
import numpy as np

# ---------- 유틸 ----------
def set_seed(seed=42):
    rng = np.random.RandomState(seed)
    return rng

def one_hot(y, num_classes):
    # y는 1~12 → 내부적으로 0~11로 변환
    oh = np.zeros((y.shape[0], num_classes), dtype=np.float32)
    oh[np.arange(y.shape[0]), y - 1] = 1.0
    return oh

def softmax(x):
    x = x - x.max(axis=1, keepdims=True)
    e = np.exp(x)
    return e / (e.sum(axis=1, keepdims=True) + 1e-12)

def cross_entropy(probs, y_onehot):
    # 평균 CE
    return -np.mean(np.sum(y_onehot * np.log(probs + 1e-12), axis=1))

def macro_f1(y_true, y_pred, num_classes=12):
    # y_true=1~12, y_pred=1~12
    f1s = []
    for c in range(1, num_classes+1):
        tp = np.sum((y_pred == c) & (y_true == c))
        fp = np.sum((y_pred == c) & (y_true != c))
        fn = np.sum((y_pred != c) & (y_true == c))
        prec = tp / (tp + fp + 1e-12)
        rec  = tp / (tp + fn + 1e-12)
        f1 = 2*prec*rec/(prec+rec+1e-12)
        f1s.append(f1)
    return float(np.mean(f1s))

# ---------- 레이어들 (Conv1D / ReLU / MaxPool1D / Dense) ----------
class Conv1D:
    # 입력: (N, T, C_in)
    # 파라미터: k(커널크기), C_in, C_out(filters)
    # 출력: (N, T_out=T-k+1, C_out)  (stride=1, valid)
    def __init__(self, k, c_in, c_out, rng):
        lim = np.sqrt(6.0/(k*c_in + c_out))
        self.W = rng.uniform(-lim, lim, size=(k, c_in, c_out)).astype(np.float32)
        self.b = np.zeros((c_out,), dtype=np.float32)
        # Adam 상태
        self.mW = np.zeros_like(self.W); self.vW = np.zeros_like(self.W)
        self.mb = np.zeros_like(self.b); self.vb = np.zeros_like(self.b)

    def forward(self, x):
        self.x = x  # (N,T,C_in)
        N, T, C_in = x.shape
        k, _, C_out = self.W.shape
        T_out = T - k + 1
        out = np.zeros((N, T_out, C_out), dtype=np.float32)
        # naive loop (충분히 작아서 수용 가능)
        for t in range(T_out):
            # x_slice: (N, k, C_in)
            xs = x[:, t:t+k, :]
            # (N, k, C_in) * (k, C_in, C_out) → (N, C_out)
            # tensordot over (k,C_in)
            out[:, t, :] = np.tensordot(xs, self.W, axes=([1,2],[0,1])) + self.b
        return out

    def backward(self, dout):
        # dout: (N, T_out, C_out)
        x = self.x
        N, T, C_in = x.shape
        k, _, C_out = self.W.shape
        T_out = T - k + 1

        dW = np.zeros_like(self.W)
        db = np.zeros_like(self.b)
        dx = np.zeros_like(x)

        # db
        db = dout.sum(axis=(0,1))

        # dW, dx
        for t in range(T_out):
            xs = x[:, t:t+k, :]                     # (N,k,C_in)
            # dW += sum over N of xs^T * dout[:,t,:]
            # (k,C_in,C_out) += tensordot((N,k,C_in),(N,C_out))
            dW += np.tensordot(xs, dout[:, t, :], axes=([0],[0]))
            # dx slice
            # dx[:, t:t+k, :] += sum over C_out of dout[:,t,:] * W
            # (N,k,C_in) += tensordot((N,C_out),(k,C_in,C_out)) over C_out
            dx[:, t:t+k, :] += np.tensordot(dout[:, t, :], self.W, axes=([1],[2]))

        self.dW, self.db = dW, db
        return dx

class ReLU:
    def forward(self, x):
        self.mask = (x > 0)
        return x * self.mask
    def backward(self, dout):
        return dout * self.mask

class MaxPool1D:
    # pool=2, stride=2
    def __init__(self, pool=2, stride=2):
        self.pool = pool
        self.stride = stride

    def forward(self, x):
        # x: (N, T, C)
        self.x = x
        N, T, C = x.shape
        assert T % self.pool == 0 or (T - self.pool) % self.stride == 0
        # valid pooling assuming divisible by stride
        T_out = 1 + (T - self.pool)//self.stride
        out = np.zeros((N, T_out, C), dtype=np.float32)
        self.argmax = np.zeros((N, T_out, C), dtype=np.int32)
        for t in range(T_out):
            start = t * self.stride
            end = start + self.pool
            window = x[:, start:end, :]   # (N, pool, C)
            # max over time-axis
            idx = np.argmax(window, axis=1)  # (N,C)
            out[:, t, :] = window[np.arange(N)[:,None], idx, np.arange(C)[None,:]]
            self.argmax[:, t, :] = start + idx
        return out

    def backward(self, dout):
        # dout: (N, T_out, C)
        N, T, C = self.x.shape
        _, T_out, _ = dout.shape
        dx = np.zeros_like(self.x)
        for t in range(T_out):
            idx = self.argmax[:, t, :]  # (N,C) 절대 인덱스
            # 각 채널마다 해당 위치로 그라디언트 흘려줌
            for n in range(N):
                dx[n, idx[n, :], np.arange(C)] += dout[n, t, :]
        return dx

class Flatten:
    def forward(self, x):
        self.x_shape = x.shape
        return x.reshape(x.shape[0], -1)
    def backward(self, dout):
        return dout.reshape(self.x_shape)

class Dense:
    def __init__(self, in_dim, out_dim, rng):
        lim = np.sqrt(6.0/(in_dim + out_dim))
        self.W = rng.uniform(-lim, lim, size=(in_dim, out_dim)).astype(np.float32)
        self.b = np.zeros((out_dim,), dtype=np.float32)
        self.mW = np.zeros_like(self.W); self.vW = np.zeros_like(self.W)
        self.mb = np.zeros_like(self.b); self.vb = np.zeros_like(self.b)

    def forward(self, x):
        self.x = x  # (N, in_dim)
        return x @ self.W + self.b

    def backward(self, dout):
        # dout: (N, out_dim)
        self.dW = self.x.T @ dout
        self.db = dout.sum(axis=0)
        dx = dout @ self.W.T
        return dx

# ---------- 모델 (Conv→ReLU→Pool→Conv→ReLU→Pool→Flatten→Dense→Softmax) ----------
class SimpleCNN:
    def __init__(self, T=100, D=23, num_classes=12, rng=None):
        self.rng = set_seed(42) if rng is None else rng
        # 하이퍼파라미터(필요시 조절)
        self.conv1 = Conv1D(k=5, c_in=D,   c_out=32, rng=self.rng)
        self.relu1 = ReLU()
        self.pool1 = MaxPool1D(pool=2, stride=2)  # T: 100→96→48

        self.conv2 = Conv1D(k=3, c_in=32,  c_out=64, rng=self.rng)
        self.relu2 = ReLU()
        self.pool2 = MaxPool1D(pool=2, stride=2)  # 48→46→23

        # 출력 시퀀스 길이 계산:
        # conv1: 100 - 5 + 1 = 96
        # pool1: 96 / 2 = 48
        # conv2: 48 - 3 + 1 = 46
        # pool2: 46 / 2 = 23
        T_out = 23
        self.flatten = Flatten()
        self.fc = Dense(in_dim=T_out*64, out_dim=num_classes, rng=self.rng)

        # Adam 설정
        self.t = 0
        self.lr = 1e-3; self.beta1=0.9; self.beta2=0.999; self.eps=1e-8
        self.clip = 5.0  # gradient clipping

    def forward(self, X):
        z = self.conv1.forward(X)
        z = self.relu1.forward(z)
        z = self.pool1.forward(z)

        z = self.conv2.forward(z)
        z = self.relu2.forward(z)
        z = self.pool2.forward(z)

        z = self.flatten.forward(z)
        logits = self.fc.forward(z)
        return logits

    def backward(self, dlogits):
        dz = self.fc.backward(dlogits)
        dz = self.flatten.backward(dz)

        dz = self.pool2.backward(dz)
        dz = self.relu2.backward(dz)
        dz = self.conv2.backward(dz)

        dz = self.pool1.backward(dz)
        dz = self.relu1.backward(dz)
        dz = self.conv1.backward(dz)
        return dz

    def _adam_update(self, param, grad, m, v):
        self.t += 1
        m[:] = self.beta1*m + (1-self.beta1)*grad
        v[:] = self.beta2*v + (1-self.beta2)*(grad*grad)
        mhat = m / (1 - self.beta1**self.t)
        vhat = v / (1 - self.beta2**self.t)
        param -= self.lr * mhat / (np.sqrt(vhat) + self.eps)

    def step(self):
        # gradient clipping
        for g in [self.conv1.dW, self.conv1.db, self.conv2.dW, self.conv2.db, self.fc.dW, self.fc.db]:
            np.clip(g, -self.clip, self.clip, out=g)

        # conv1
        self._adam_update(self.conv1.W, self.conv1.dW, self.conv1.mW, self.conv1.vW)
        self._adam_update(self.conv1.b, self.conv1.db, self.conv1.mb, self.conv1.vb)
        # conv2
        self._adam_update(self.conv2.W, self.conv2.dW, self.conv2.mW, self.conv2.vW)
        self._adam_update(self.conv2.b, self.conv2.db, self.conv2.mb, self.conv2.vb)
        # fc
        self._adam_update(self.fc.W, self.fc.dW, self.fc.mW, self.fc.vW)
        self._adam_update(self.fc.b, self.fc.db, self.fc.mb, self.fc.vb)

    def fit(self, Xtr, ytr, Xte, yte, epochs=5, batch=64, verbose=True):
        num_classes = 12
        N = Xtr.shape[0]
        indices = np.arange(N)
        for ep in range(1, epochs+1):
            self.rng.shuffle(indices)
            losses = []
            for i in range(0, N, batch):
                idx = indices[i:i+batch]
                xb = Xtr[idx]                   # (B, 100, 23)
                yb = ytr[idx]                   # (B,)
                yb_oh = one_hot(yb, num_classes)

                logits = self.forward(xb)       # (B, 12)
                probs = softmax(logits)
                loss = cross_entropy(probs, yb_oh)
                losses.append(loss)

                # dL/dlogits = probs - y_onehot
                dlogits = (probs - yb_oh) / yb.shape[0]
                self.backward(dlogits)
                self.step()

            # 에폭 끝 성능
            te_pred = self.predict(Xte)
            f1 = macro_f1(yte, te_pred, num_classes=num_classes)
            if verbose:
                print(f"[Epoch {ep}] loss={np.mean(losses):.4f}  macroF1(te)={f1:.4f}")
        return f1

    def predict(self, X):
        logits = self.forward(X)
        probs = softmax(logits)
        preds = np.argmax(probs, axis=1) + 1  # 0~11 → 1~12
        return preds

In [19]:
# ---------- 교차검증 실행 (표준화 포함) ----------
def run_cnn_crossval(X_all, y_all, s_all, pairs, epochs=5, batch=64, lr=1e-3):
    f1s = []
    for i, pair in enumerate(pairs, 1):
        X_tr, y_tr, X_te, y_te = split_by_subject(X_all, y_all, s_all, pair)
        # 표준화 (train 기준)
        X_tr, X_te, mean, std = standardize(X_tr, X_te)

        # 모델 생성 및 러닝레이트 설정
        model = SimpleCNN(T=X_tr.shape[1], D=X_tr.shape[2], num_classes=12)
        model.lr = lr

        print(f"\n=== Fold {i} | Test subjects={pair} ===")
        f1 = model.fit(X_tr, y_tr, X_te, y_te, epochs=epochs, batch=batch, verbose=True)
        f1s.append(f1)

        # 폴드별 요약
        te_pred = model.predict(X_te)
        acc = np.mean(te_pred == y_te)
        print(f"Fold {i} done. Acc={acc:.4f}  Macro-F1={f1:.4f}")

    print("\n=== Summary ===")
    print("Per-fold Macro-F1:", [f"{v:.4f}" for v in f1s])
    print("Mean Macro-F1:", f"{np.mean(f1s):.4f}")
    return f1s, float(np.mean(f1s))


In [20]:
f1s, mean_f1 = run_cnn_crossval(X_all, y_all, s_all, pairs, epochs=5, batch=64, lr=1e-3)
print("\n최종 평균 Macro-F1:", mean_f1)


=== Fold 1 | Test subjects=(1, 2) ===
[Epoch 1] loss=0.5677  macroF1(te)=0.7420
[Epoch 2] loss=0.0302  macroF1(te)=0.7557
[Epoch 3] loss=0.0129  macroF1(te)=0.7511
[Epoch 4] loss=0.0055  macroF1(te)=0.7686
[Epoch 5] loss=0.0013  macroF1(te)=0.7561
Fold 1 done. Acc=0.7853  Macro-F1=0.7561

=== Fold 2 | Test subjects=(3, 4) ===
[Epoch 1] loss=0.6280  macroF1(te)=0.8229
[Epoch 2] loss=0.0483  macroF1(te)=0.8199
[Epoch 3] loss=0.0188  macroF1(te)=0.8823
[Epoch 4] loss=0.0086  macroF1(te)=0.8087
[Epoch 5] loss=0.0032  macroF1(te)=0.8433
Fold 2 done. Acc=0.8668  Macro-F1=0.8433

=== Fold 3 | Test subjects=(5, 6) ===
[Epoch 1] loss=0.5643  macroF1(te)=0.7600
[Epoch 2] loss=0.0471  macroF1(te)=0.7858
[Epoch 3] loss=0.0135  macroF1(te)=0.8028
[Epoch 4] loss=0.0078  macroF1(te)=0.8236
[Epoch 5] loss=0.0026  macroF1(te)=0.8156
Fold 3 done. Acc=0.7955  Macro-F1=0.8156

=== Fold 4 | Test subjects=(7, 8) ===
[Epoch 1] loss=0.6010  macroF1(te)=0.7302
[Epoch 2] loss=0.0702  macroF1(te)=0.7355
[Epoch 