## 설정 & 경로

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
DATA_PATH_TEMPLATE = "/content/drive/MyDrive/Colab Notebooks/mHealth_subject{sid}.log"

SAMPLE_RATE_HZ = 50
WIN = 64
STRIDE = 64
MAJ_THRESH = 0.80

N_CLASSES = 12
IN_CHANNELS = 23

CONV_FILTERS = 8
KERNEL_SIZE = 3
POOL_SIZE = 2
HIDDEN_UNITS = 16

EPOCHS = 2
BATCH_SIZE = 16
LR = 0.01
LABEL_SMOOTHING = 0.05

# CV folds (subject-based)
FOLDS = [(1,2), (3,4), (5,6), (7,8), (9,10)]

## 데이터 로딩 및 통합 테이블 구성

In [3]:
# ==== Cell 2: Pure-Python Data Loading & "Table" builder ====
def read_subject_file(path, sid):
    rows = []
    try:
        with open(path, 'r') as f:
            for line in f:
                s = line.strip()
                if not s:
                    continue
                parts = s.split()
                if len(parts) < 24:
                    continue
                feats = []
                ok = True
                for i in range(23):
                    try:
                        feats.append(float(parts[i]))
                    except:
                        ok = False
                        break
                if not ok:
                    continue
                try:
                    lbl = int(float(parts[23]))
                except:
                    continue
                rows.append((feats, lbl, sid))
    except FileNotFoundError:
        print(f"[WARN] File not found for subject {sid}: {path}")
    return rows

def load_all_subjects(path_template, sids=range(1,11)):
    table = []
    for sid in sids:
        path = path_template.format(sid=sid)
        rows = read_subject_file(path, sid)
        for feats, lbl, sub in rows:
            table.append({'features': feats, 'label': lbl, 'subject': sub})
    print(f"[INFO] Loaded {len(table)} raw rows from {len(list(sids))} subjects.")
    return table

def peek_table(table, n=3):
    for i, r in enumerate(table[:n]):
        print(i, "subject=", r['subject'], "label=", r['label'], "x[:5]=", r['features'][:5])

table = load_all_subjects(DATA_PATH_TEMPLATE, sids=range(1,11))
peek_table(table, 3)


[INFO] Loaded 1215745 raw rows from 10 subjects.
0 subject= 1 label= 0 x[:5]= [-9.8184, 0.009971, 0.29563, 0.0041863, 0.0041863]
1 subject= 1 label= 0 x[:5]= [-9.8489, 0.52404, 0.37348, 0.0041863, 0.016745]
2 subject= 1 label= 0 x[:5]= [-9.6602, 0.18185, 0.43742, 0.016745, 0.037677]


## Label==0 제거 + 윈도우링(다수라벨 규칙) + 표준화 옵션

In [4]:
# ==== Cell 3: Filtering label==0, windowing, and (optional) normalization ====
def make_windows(examples, win=100, stride=50, drop_label0=True, maj_thresh=0.80):
    by_subj = {}
    for r in examples:
        by_subj.setdefault(r['subject'], []).append(r)

    Xw, Yw = [], []
    for sid, seq in by_subj.items():
        L = len(seq)
        i = 0
        while i + win <= L:
            chunk = seq[i:i+win]
            labels = [r['label'] for r in chunk]
            counts = {}
            for l in labels:
                counts[l] = counts.get(l, 0) + 1
            maj = max(counts.keys(), key=lambda k: counts[k])
            ratio = counts[maj] / float(win)

            if drop_label0 and maj == 0:
                i += stride
                continue
            if ratio < maj_thresh:
                i += stride
                continue

            win_feats = []
            for r in chunk:
                win_feats.append(r['features'])

            if maj == 0:
                i += stride
                continue
            y = maj - 1

            Xw.append(win_feats)
            Yw.append(y)
            i += stride

    return Xw, Yw

def standardize_per_channel(train_X, valid_or_test_X):
    C = IN_CHANNELS
    sums  = [0.0]*C
    sums2 = [0.0]*C
    counts = [0]*C

    for w in train_X:
        for t in w:
            for c in range(C):
                v = t[c]
                sums[c]  += v
                sums2[c] += v*v
                counts[c] += 1

    means = [ (sums[c]/counts[c] if counts[c]>0 else 0.0) for c in range(C) ]
    stds  = []
    for c in range(C):
        if counts[c] == 0:
            stds.append(1.0)
        else:
            mean = means[c]
            var  = (sums2[c]/counts[c]) - (mean*mean)
            stds.append( (var**0.5) if var>1e-12 else 1.0 )

    def apply(X):
        X2 = []
        for w in X:
            w2 = []
            for t in w:
                t2 = [(t[c]-means[c])/stds[c] for c in range(C)]
                w2.append(t2)
            X2.append(w2)
        return X2

    return apply(train_X), apply(valid_or_test_X)

print("[INFO] Windowing on full table (will be done per fold later after split).")


[INFO] Windowing on full table (will be done per fold later after split).


## 데이터 분할 유틸: 주체 기반 CV 스플릿

In [5]:
# ==== Cell 4: Subject-wise CV Split utilities ====
def split_by_subject(table, test_subjects):
    train = [r for r in table if r['subject'] not in test_subjects]
    test  = [r for r in table if r['subject'] in test_subjects]
    return train, test

def build_fold_data(table, test_pair, win=WIN, stride=STRIDE, maj_thresh=MAJ_THRESH):
    train_raw, test_raw = split_by_subject(table, test_pair)
    Xtr, Ytr = make_windows(train_raw, win=win, stride=stride, drop_label0=True, maj_thresh=maj_thresh)
    Xte, Yte = make_windows(test_raw,  win=win, stride=stride, drop_label0=True, maj_thresh=maj_thresh)
    Xtr, Xte = standardize_per_channel(Xtr, Xte)
    return Xtr, Ytr, Xte, Yte


## 순수 파이썬 1D-CNN 레이어
- Conv/ReLU/MaxPool/Flatten/Dense/Softmax

In [6]:
# ==== Cell 5: Pure-Python 1D-CNN implementation (forward/backward) ====
import math
import random

def zeros(shape):
    if len(shape) == 1:
        return [0.0]*shape[0]
    return [zeros(shape[1:]) for _ in range(shape[0])]

def randn(shape, scale=0.01):
    import random, math
    def g():
        u1 = max(1e-12, random.random())
        u2 = random.random()
        return math.sqrt(-2.0*math.log(u1))*math.cos(2*math.pi*u2)
    if len(shape) == 1:
        return [g()*scale for _ in range(shape[0])]
    return [randn(shape[1:], scale) for _ in range(shape[0])]

class Conv1D:
    def __init__(self, in_ch, out_ch, ksize):
        self.in_ch = in_ch
        self.out_ch = out_ch
        self.ksize = ksize
        self.W = randn((out_ch, in_ch, ksize), scale=0.05)
        self.b = randn((out_ch,), scale=0.0)
        self.gW = zeros((out_ch, in_ch, ksize))
        self.gb = zeros((out_ch,))
        self.x = None

    def forward(self, x):  # x: [T, C]
        T = len(x)
        k = self.ksize
        outT = T - k + 1
        y = [[0.0]*self.out_ch for _ in range(outT)]
        for t in range(outT):
            for oc in range(self.out_ch):
                s = self.b[oc]
                for ic in range(self.in_ch):
                    for kk in range(k):
                        s += self.W[oc][ic][kk] * x[t+kk][ic]
                y[t][oc] = s
        self.x = x
        self.y = y
        return y

    def backward(self, dy, lr):
        x = self.x
        T = len(x)
        k = self.ksize
        outT = T - k + 1
        for oc in range(self.out_ch):
            self.gb[oc] = 0.0
            for ic in range(self.in_ch):
                for kk in range(k):
                    self.gW[oc][ic][kk] = 0.0
        dx = [[0.0]*self.in_ch for _ in range(T)]
        for t in range(outT):
            for oc in range(self.out_ch):
                g = dy[t][oc]
                self.gb[oc] += g
                for ic in range(self.in_ch):
                    for kk in range(k):
                        self.gW[oc][ic][kk] += g * self.x[t+kk][ic]
                        dx[t+kk][ic] += g * self.W[oc][ic][kk]
        for oc in range(self.out_ch):
            self.b[oc] -= lr * self.gb[oc]
            for ic in range(self.in_ch):
                for kk in range(k):
                    self.W[oc][ic][kk] -= lr * self.gW[oc][ic][kk]
        return dx

class ReLU:
    def __init__(self):
        self.mask = None
    def forward(self, x):  # x: [T, C]
        self.mask = [[1.0 if v>0 else 0.0 for v in row] for row in x]
        return [[v if v>0 else 0.0 for v in row] for row in x]
    def backward(self, dy, lr):
        return [[dy[t][c]*self.mask[t][c] for c in range(len(dy[0]))] for t in range(len(dy))]

class MaxPool1D:
    def __init__(self, pool):
        self.pool = pool
        self.idx = None
    def forward(self, x):  # x: [T, C]
        T = len(x)
        C = len(x[0])
        p = self.pool
        outT = T // p
        y = [[0.0]*C for _ in range(outT)]
        self.idx = [[0]*C for _ in range(outT)]
        self.x = x
        for t in range(outT):
            for c in range(C):
                best = -1e18
                bi = 0
                for i in range(p):
                    v = x[t*p+i][c]
                    if v > best:
                        best = v
                        bi = i
                y[t][c] = best
                self.idx[t][c] = bi
        self.y = y
        return y
    def backward(self, dy, lr):
        T = len(self.x)
        C = len(self.x[0])
        p = self.pool
        outT = len(dy)
        dx = [[0.0]*C for _ in range(T)]
        for t in range(outT):
            for c in range(C):
                bi = self.idx[t][c]
                dx[t*p+bi][c] += dy[t][c]
        return dx

class Flatten:
    def __init__(self):
        self.shape = None
    def forward(self, x): # x: [T, C]
        self.shape = (len(x), len(x[0]))
        out = []
        for t in x:
            for v in t:
                out.append(v)
        return out  # [T*C]
    def backward(self, dy, lr):
        T, C = self.shape
        x = []
        idx = 0
        for _ in range(T):
            row = []
            for _ in range(C):
                row.append(dy[idx]); idx += 1
            x.append(row)
        return x

class Dense:
    def __init__(self, in_dim, out_dim):
        self.W = randn((out_dim, in_dim), scale=0.05)
        self.b = randn((out_dim,), scale=0.0)
        self.gW = zeros((out_dim, in_dim))
        self.gb = zeros((out_dim,))
        self.x = None
    def forward(self, x):  # x: [D]
        self.x = x
        y = [self.b[o] + sum(self.W[o][i]*x[i] for i in range(len(x))) for o in range(len(self.W))]
        return y  # logits
    def backward(self, dlogits, lr):
        D_out = len(self.W)
        D_in  = len(self.W[0])
        dx = [0.0]*D_in
        for o in range(D_out):
            g = dlogits[o]
            self.gb[o] += g
            for i in range(D_in):
                self.gW[o][i] += g * self.x[i]
                dx[i] += g * self.W[o][i]
        for o in range(D_out):
            self.b[o] -= lr * self.gb[o]
            self.gb[o] = 0.0
            for i in range(D_in):
                self.W[o][i] -= lr * self.gW[o][i]
                self.gW[o][i] = 0.0
        return dx

def softmax(logits):
    m = max(logits)
    exps = [math.exp(v - m) for v in logits]
    s = sum(exps)
    return [e/s for e in exps]

def cross_entropy_with_label_smoothing(probs, y, num_classes, eps=0.0):
    K = num_classes
    t = [eps/(K-1)]*K
    t[y] = 1.0 - eps
    loss = 0.0
    for k in range(K):
        loss -= t[k] * (math.log(max(1e-12, probs[k])))
    dlogits = [probs[k] - t[k] for k in range(K)]
    return loss, dlogits


## 모델 조립 & 트레이너

In [7]:
# ==== Cell 6: Build model (Conv->ReLU->Pool->Flatten->Dense->Softmax) and Trainer ====
import math
import random

class TinyCNN:
    def __init__(self, in_channels, conv_filters, kernel, pool, hidden, n_classes):
        self.conv = Conv1D(in_channels, conv_filters, kernel)
        self.relu = ReLU()
        self.pool = MaxPool1D(pool)
        self.flat = Flatten()
        T_in = WIN
        conv_T = T_in - kernel + 1
        pool_T = conv_T // pool
        flat_dim = pool_T * conv_filters
        self.fc = Dense(flat_dim, hidden)
        self.out = Dense(hidden, n_classes)

    def forward(self, x_seq):
        z = self.conv.forward(x_seq)
        z = self.relu.forward(z)
        z = self.pool.forward(z)
        z = self.flat.forward(z)
        z = self.fc.forward(z)
        z = [max(v, -50) for v in z]  # mild clamp (원래 코드 그대로)
        z = self.out.forward(z)       # logits
        return z

    def backward(self, dlogits, lr):
        dz = self.out.backward(dlogits, lr)
        dz = self.fc.backward(dz, lr)
        dz = self.flat.backward(dz, lr)
        dz = self.pool.backward(dz, lr)
        dz = self.relu.backward(dz, lr)
        dz = self.conv.backward(dz, lr)
        return dz

def iterate_minibatches(X, Y, batch_size):
    idx = list(range(len(X)))
    random.shuffle(idx)
    for i in range(0, len(idx), batch_size):
        b = idx[i:i+batch_size]
        yield [X[j] for j in b], [Y[j] for j in b]

def train_epoch(model, X, Y, lr, label_smoothing=0.05):
    total_loss = 0.0
    n = 0
    correct = 0

    idx = list(range(len(X)))
    random.shuffle(idx)

    seen = 0
    for i in idx:
        x_i, y_i = X[i], Y[i]
        logits = model.forward(x_i)
        probs  = softmax(logits)
        loss, dlogits = cross_entropy_with_label_smoothing(probs, y_i, N_CLASSES, eps=label_smoothing)
        total_loss += loss
        n += 1
        if probs.index(max(probs)) == y_i:
            correct += 1
        model.backward(dlogits, lr)

        seen += 1
        if seen % 512 == 0:
            print(f"    progress: {seen}/{len(X)} samples")

    acc = (correct/float(n)) if n>0 else 0.0
    return total_loss/max(1,n), acc

def evaluate(model, X, Y):
    correct = 0
    n = 0
    K = N_CLASSES
    cm = [[0 for _ in range(K)] for _ in range(K)]
    for i in range(len(X)):
        logits = model.forward(X[i])
        probs = softmax(logits)
        yhat = probs.index(max(probs))
        cm[Y[i]][yhat] += 1
        if yhat == Y[i]:
            correct += 1
        n += 1
    acc = correct / float(n) if n>0 else 0.0
    return acc, cm

def print_confusion_matrix(cm):
    K = len(cm)
    print("Confusion Matrix (rows=True, cols=Pred):")
    for r in range(K):
        print("y={:2d}: ".format(r), cm[r])


## 5-Fold 주체 CV 실행

In [9]:
# ==== Cell 7: Run 5-fold subject-wise CV (EarlyStop + LR decay + F1 report) ====
def copy_params(model):
    import copy
    def deep(x): return copy.deepcopy(x)
    return {
        'conv_W': deep(model.conv.W), 'conv_b': deep(model.conv.b),
        'fc_W':   deep(model.fc.W),   'fc_b':   deep(model.fc.b),
        'out_W':  deep(model.out.W),  'out_b':  deep(model.out.b),
    }

def load_params(model, p):
    # in-place copy
    for oc in range(len(model.conv.W)):
        for ic in range(len(model.conv.W[0])):
            for k in range(len(model.conv.W[0][0])):
                model.conv.W[oc][ic][k] = p['conv_W'][oc][ic][k]
        model.conv.b[oc] = p['conv_b'][oc]
    for o in range(len(model.fc.W)):
        for i in range(len(model.fc.W[0])):
            model.fc.W[o][i] = p['fc_W'][o][i]
        model.fc.b[o] = p['fc_b'][o]
    for o in range(len(model.out.W)):
        for i in range(len(model.out.W[0])):
            model.out.W[o][i] = p['out_W'][o][i]
        model.out.b[o] = p['out_b'][o]

def run_cv(table):
    MAX_EPOCHS = 5
    PATIENCE   = 2
    LR_DECAY   = 0.5

    # 리포트용
    from sklearn.metrics import classification_report, f1_score, precision_recall_fscore_support, accuracy_score, confusion_matrix

    fold_results = []
    for fold_id, pair in enumerate(FOLDS, start=1):
        print("\n==============================")
        print(f"Fold {fold_id}: Test subjects = {pair}")
        Xtr, Ytr, Xte, Yte = build_fold_data(table, pair, win=WIN, stride=STRIDE, maj_thresh=MAJ_THRESH)
        print(f"Train windows: {len(Xtr)} | Test windows: {len(Xte)}")
        if len(Xtr) == 0 or len(Xte) == 0:
            print("[WARN] No data in this fold (maybe files missing). Skipping.")
            continue

        model = TinyCNN(IN_CHANNELS, CONV_FILTERS, KERNEL_SIZE, POOL_SIZE, HIDDEN_UNITS, N_CLASSES)
        lr = LR
        best_acc = -1.0
        best_params = None
        bad_epochs = 0

        for ep in range(1, MAX_EPOCHS+1):
            loss, tr_acc = train_epoch(model, Xtr, Ytr, lr, label_smoothing=LABEL_SMOOTHING)
            te_acc, _ = evaluate(model, Xte, Yte)
            print(f"  Epoch {ep:02d} | lr={lr:.5f} | loss={loss:.4f} | train_acc={tr_acc*100:.2f}% | test_acc={te_acc*100:.2f}%")

            if te_acc > best_acc:
                best_acc = te_acc
                best_params = copy_params(model)
                bad_epochs = 0
            else:
                bad_epochs += 1
                # 향상 없으면 lr 감소
                lr *= LR_DECAY

            if bad_epochs >= PATIENCE:
                print(f"  Early stop at epoch {ep} (best_acc={best_acc*100:.2f}%)")
                break

        # 베스트로 복구 후 평가/리포트
        if best_params is not None:
            load_params(model, best_params)

        # 최종 예측 생성
        y_pred = []
        for w in Xte:
            logits = model.forward(w)
            probs = softmax(logits)
            y_pred.append(int(probs.index(max(probs))))

        # 정확도/혼동행렬/리포트
        acc_final = accuracy_score(Yte, y_pred)
        cm = confusion_matrix(Yte, y_pred, labels=list(range(N_CLASSES)))
        fold_results.append(acc_final)
        print(f"[Fold {fold_id}] Best Test Accuracy: {acc_final*100:.2f}%")
        print_confusion_matrix(cm)  # 텍스트 혼동행렬

        # === F1 리포트 추가 ===
        class_names = globals().get("CLASS_NAMES", [str(i) for i in range(N_CLASSES)])
        macro_p, macro_r, macro_f1, _ = precision_recall_fscore_support(
            Yte, y_pred, labels=list(range(N_CLASSES)), average='macro', zero_division=0
        )
        weighted_p, weighted_r, weighted_f1, _ = precision_recall_fscore_support(
            Yte, y_pred, labels=list(range(N_CLASSES)), average='weighted', zero_division=0
        )

        print("\n=== Summary (Fold {} ) ===".format(fold_id))
        print(f"Accuracy          : {acc_final*100:6.2f}%")
        print(f"Macro Precision   : {macro_p*100:6.2f}%")
        print(f"Macro Recall      : {macro_r*100:6.2f}%")
        print(f"Macro F1          : {macro_f1*100:6.2f}%")
        print(f"Weighted Precision: {weighted_p*100:6.2f}%")
        print(f"Weighted Recall   : {weighted_r*100:6.2f}%")
        print(f"Weighted F1       : {weighted_f1*100:6.2f}%")

        print("\n=== Per-class (sklearn) ===")
        print(classification_report(
            Yte, y_pred,
            labels=list(range(N_CLASSES)),
            target_names=class_names,
            zero_division=0
        ))

    if fold_results:
        avg = sum(fold_results)/len(fold_results)
        print("\n===== CV Summary =====")
        for i, a in enumerate(fold_results, start=1):
            print(f"Fold {i}: {a*100:.2f}%")
        print(f"Mean: {avg*100:.2f}%")
    else:
        print("[INFO] No folds computed.")

# Launch CV
run_cv(table)



Fold 1: Test subjects = (1, 2)
Train windows: 4240 | Test windows: 1101
    progress: 512/4240 samples
    progress: 1024/4240 samples
    progress: 1536/4240 samples
    progress: 2048/4240 samples
    progress: 2560/4240 samples
    progress: 3072/4240 samples
    progress: 3584/4240 samples
    progress: 4096/4240 samples
  Epoch 01 | lr=0.01000 | loss=1.0801 | train_acc=73.35% | test_acc=68.76%
    progress: 512/4240 samples
    progress: 1024/4240 samples
    progress: 1536/4240 samples
    progress: 2048/4240 samples
    progress: 2560/4240 samples
    progress: 3072/4240 samples
    progress: 3584/4240 samples
    progress: 4096/4240 samples
  Epoch 02 | lr=0.01000 | loss=0.5574 | train_acc=95.90% | test_acc=83.02%
    progress: 512/4240 samples
    progress: 1024/4240 samples
    progress: 1536/4240 samples
    progress: 2048/4240 samples
    progress: 2560/4240 samples
    progress: 3072/4240 samples
    progress: 3584/4240 samples
    progress: 4096/4240 samples
  Epoch 03 |