In [1]:
# =========================
# Cell 0. 공용 설정
# =========================
import os, json, time, random, hashlib, pickle, subprocess, gc
import numpy as np

EXPERIMENT_ROOT = "experiments/Step_00"
DATASET_DIR = "dataset/preprocessed"
os.makedirs(EXPERIMENT_ROOT, exist_ok=True)

NAS_LOG_DIR = os.path.join(EXPERIMENT_ROOT, "nas_runs")
os.makedirs(NAS_LOG_DIR, exist_ok=True)
NAS_LOG_CSV = os.path.join(NAS_LOG_DIR, "results.csv")

SEED = 42
random.seed(SEED)
np.random.seed(SEED)

N_CLASSES = 15

print("[Init] root:", EXPERIMENT_ROOT)


[Init] root: experiments/Step_00


In [2]:
# =========================
# Cell 1. 데이터 split 준비 (label_map 백워드 호환)
# =========================
import pandas as pd
from sklearn.model_selection import train_test_split

label_csv_path = os.path.join(DATASET_DIR, "labels.csv")
split_file_path = os.path.join(EXPERIMENT_ROOT, "splits.pkl")

df = pd.read_csv(label_csv_path)
df['base_id'] = df['file'].apply(lambda x: x.rsplit('_', 1)[0])
base_df = df.drop_duplicates('base_id').copy()

# 안전한 할당 (SettingWithCopyWarning 회피)
base_df.loc[:, 'subject'] = base_df['base_id'].apply(lambda x: x.split('_')[-1])
base_df.loc[:, 'label_subject'] = base_df['label'].astype(str) + "_" + base_df['subject']

label_subject_map = dict(zip(base_df['base_id'], base_df['label_subject']))
label_map = dict(zip(base_df['base_id'], base_df['label']))

if not os.path.exists(split_file_path):
    base_ids = base_df['base_id'].tolist()
    stratify_ls = [label_subject_map[bid] for bid in base_ids]
    train_base, val_base = train_test_split(base_ids, test_size=0.1,
                                            stratify=stratify_ls, random_state=18)
    with open(split_file_path, 'wb') as f:
        pickle.dump({'train': train_base, 'val': val_base, 'label_map': label_map}, f)
    print("[Info] splits.pkl 생성 완료 (train/val/label_map)")
else:
    with open(split_file_path, 'rb') as f:
        split = pickle.load(f)
    train_base = split.get('train')
    val_base   = split.get('val')
    if 'label_map' in split and isinstance(split['label_map'], dict):
        label_map = split['label_map']
    else:
        # 예전 파일과의 호환: 새로 계산한 label_map 사용
        label_map = dict(zip(base_df['base_id'], base_df['label']))
        # 필요하면 파일 갱신
        with open(split_file_path, 'wb') as f:
            pickle.dump({'train': train_base, 'val': val_base, 'label_map': label_map}, f)
        print("[Warn] 기존 splits.pkl에 label_map이 없어 갱신했습니다.")

print(f"[Split] Train={len(train_base)} | Val={len(val_base)} | classes={len(set(label_map.values()))}")


[Info] splits.pkl 생성 완료 (train/val/label_map)
[Split] Train=4860 | Val=540 | classes=15


In [3]:
# =========================
# Cell 2. 탐색공간/유전 연산/제약
# =========================
SPACE_OUT = {
    1: [16, 24, 32],
    2: [40, 48, 56],
    3: [64, 72, 80],
    4: [88, 96, 104],
}
SPACE_REPEATS = [2, 2, 2, 2, 4, 4, 4, 6, 6]
SPACE_STRIDE  = [1, 1, 1, 2, 2]
SPACE_T = [2, 2, 2, 2, 4, 4, 4, 6, 6]
SPACE_BLOCK   = ['mbconv', 'fused']
SPACE_SE      = [True, False]
CONV1x1_OUT   = [112, 120, 128]

def sample_stage(si):
    return {
        "block_type": random.choice(SPACE_BLOCK),
        "out_channels": random.choice(SPACE_OUT[si]),
        "stride": random.choice(SPACE_STRIDE),
        "expand_ratio": random.choice(SPACE_T),
        "use_se": random.choice(SPACE_SE),
        "repeats": random.choice(SPACE_REPEATS),
    }

def sample_stream():
    return {
        "stage1": sample_stage(1),
        "stage2": sample_stage(2),
        "stage3": sample_stage(3),
        "stage4": sample_stage(4),
        "conv1x1_out": random.choice(CONV1x1_OUT),
    }

def sample_config():
    stream = sample_stream()
    return {"rtm": stream, "dtm": json.loads(json.dumps(stream))}

def enforce_stride_per_stream(cfg, min_s2=1):
    """각 스트림 최소 한 stage는 stride=2가 되도록 보정"""
    for stream in ["rtm", "dtm"]:
        if not any(cfg[stream][f"stage{i}"]["stride"] == 2 for i in range(1,5)):
            pick = random.choice([1,2,3,4])
            cfg[stream][f"stage{pick}"]["stride"] = 2
    return cfg

def cfg_hash(cfg):
    s = json.dumps(cfg, sort_keys=True)
    return hashlib.sha1(s.encode('utf-8')).hexdigest()

def crossover(cfgA, cfgB):
    # 부모 둘에서 child stream 생성
    child_stream = {}
    child_stream["conv1x1_out"] = random.choice(
        [cfgA["rtm"]["conv1x1_out"], cfgB["rtm"]["conv1x1_out"]]
    )
    for si in range(1, 5):
        pick_parent = random.choice([cfgA, cfgB])
        child_stream[f"stage{si}"] = dict(pick_parent["rtm"][f"stage{si}"])
    # 두 스트림 동일하게 복사
    return {"rtm": child_stream, "dtm": json.loads(json.dumps(child_stream))}


def mutate(cfg, prob=0.3):
    def mut_stage(st, si):
        if random.random() < prob:
            st["block_type"] = random.choice(SPACE_BLOCK)
        if random.random() < prob:
            st["out_channels"] = random.choice(SPACE_OUT[si])
        if random.random() < prob:
            st["stride"] = random.choice(SPACE_STRIDE)
        if random.random() < prob:
            st["expand_ratio"] = random.choice(SPACE_T)
        if random.random() < prob:
            st["use_se"] = random.choice(SPACE_SE)
        if random.random() < prob:
            st["repeats"] = random.choice(SPACE_REPEATS)
        return st

    stream = json.loads(json.dumps(cfg["rtm"]))  # deepcopy
    for si in range(1, 5):
        stream[f"stage{si}"] = mut_stage(stream[f"stage{si}"], si)
    if random.random() < prob:
        stream["conv1x1_out"] = random.choice(CONV1x1_OUT)

    return {"rtm": stream, "dtm": json.loads(json.dumps(stream))}


def sample_config_with_constraint():
    return enforce_stride_per_stream(sample_config())


In [None]:
# =========================
# (REPLACE) Cell 3. subprocess 실행 & 유틸
# =========================

import shutil

def prune_non_elites(gen_idx: int, records: list, elite_k: int = 4):
    """
    현재 세대 gen_idx에서 평가된 run 디렉터리들 중,
    Top-k를 제외하고 g{gen_idx:02d}_* 폴더를 삭제한다.
    (이전 세대 carry-over는 지우지 않음)
    """
    keep_hashes = {r["hash"] for r in records[:elite_k]}
    prefix = f"g{gen_idx:02d}_"
    removed = []

    for r in records:
        run_dir = r["run_dir"]
        base = os.path.basename(run_dir)

        # 안전장치: 현재 세대의 폴더만 (gXX_ 로 시작하는 것만) 건드림
        if not base.startswith(prefix):
            continue

        # Top-k는 보존
        if r["hash"] in keep_hashes:
            continue

        # NAS_LOG_DIR 바깥은 절대 건드리지 않기
        try:
            if os.path.isdir(run_dir) and os.path.commonpath([NAS_LOG_DIR, run_dir]) == NAS_LOG_DIR:
                shutil.rmtree(run_dir, ignore_errors=True)
                removed.append(base)
        except Exception as e:
            print(f"[GC] skip {base}: {e}")

    print(f"[GC] Gen {gen_idx}: removed {len(removed)} run dirs (non-top{elite_k})")
    if removed:
        # 너무 많으면 앞 일부만 출력
        preview = ", ".join(removed[:8]) + (" ..." if len(removed) > 8 else "")
        print(f"     {preview}")



def run_cfg_in_subprocess(cfg, run_dir, train_base, val_base, label_map):
    os.makedirs(run_dir, exist_ok=True)
    # 이전 결과 파일 제거
    result_path = os.path.join(run_dir, "result.json")
    if os.path.exists(result_path):
        try: os.remove(result_path)
        except Exception: pass

    # pickle 파일 쓰기
    with open(os.path.join(run_dir,"train.pkl"),"wb") as f: pickle.dump(train_base,f)
    with open(os.path.join(run_dir,"val.pkl"),"wb") as f:   pickle.dump(val_base,f)
    with open(os.path.join(run_dir,"label.pkl"),"wb") as f: pickle.dump(label_map,f)

    cmd = [
        "python","train","train_one_cfg.py",
        "--cfg_json", json.dumps(cfg),
        "--train_ids", os.path.join(run_dir,"train.pkl"),
        "--val_ids", os.path.join(run_dir,"val.pkl"),
        "--label_map", os.path.join(run_dir,"label.pkl"),
        "--out_dir", run_dir
    ]

    print(f"\n[Run-START] {os.path.basename(run_dir)}")
    print(json.dumps(cfg, sort_keys=True))

    # 실행 (stderr도 캡쳐해 보여줌)
    result = subprocess.run(cmd, capture_output=True, text=True)

    if result.stderr:
        # BestSaver 로그/경고/에러가 stderr로 찍힙니다
        print("[Subprocess STDERR]\n" + result.stderr)

    # 결과 파일 읽기
    if not os.path.exists(result_path):
        print("[Subprocess Error] result.json이 생성되지 않았습니다.")
        return None

    with open(result_path, "r", encoding="utf-8") as f:
        payload = json.load(f)

    if payload.get("status") != "ok":
        print("[Subprocess Error] status!=ok :", payload)
        return None

    out = {
        "val_acc": payload["val_acc"],
        "val_loss": payload["val_loss"],
        "params": payload["params"],
        "train_sec": payload["train_sec"],
    }

    print(f"[Run-DONE ] {os.path.basename(run_dir)} | time={out['train_sec']:.1f}s | acc={out['val_acc']:.4f} | loss={out['val_loss']:.4f} | params={out['params']:,}")
    print("-"*80)
    return out



def compute_fitness(val_acc, params, train_sec, alpha_p=1e-3, beta_t=1e-4):
    params_m = params / 1e6
    return float(val_acc - alpha_p * params_m - beta_t * train_sec)

def append_log(row: dict, csv_path=NAS_LOG_CSV):
    df = pd.DataFrame([row])
    if os.path.exists(csv_path):
        df0 = pd.read_csv(csv_path)
        df = pd.concat([df0, df], ignore_index=True)
    df.to_csv(csv_path, index=False)

def print_leaderboard(records, gen_idx, topk=12):
    df = pd.DataFrame([{
        "rank": i+1,
        "hash8": r["hash"][:8],
        "fitness": r["fitness"],
        "acc": r["val_acc"],
        "loss": r["val_loss"],
        "params": r["params"],
        "sec": r["train_sec"],
    } for i, r in enumerate(records[:topk])])
    print(f"\n[Gen {gen_idx}] Leaderboard (Top {min(topk, len(records))})")
    print(df.to_string(index=False))


In [5]:
# =========================
# Cell 4. 진화형 NAS 루프
# =========================
POP_SIZE = 12
N_GEN = 20
ELITE_K = 4
CHILDREN_K = 6
IMMIGRANTS_K = 2
MUT_PROB = 0.5

evaluated = {}     # hash -> record
history_best = []  # per-gen best

def ensure_unique_from(fn_make_cfg, blacklist, max_tries=200):
    tried = 0
    while tried < max_tries:
        cfg = fn_make_cfg()
        h = cfg_hash(cfg)
        if h not in blacklist:
            return cfg, h
        tried += 1
    # 계속 충돌하면 완전 랜덤으로
    cfg = sample_config_with_constraint()
    h = cfg_hash(cfg)
    return cfg, h

def evaluate_cfg(cfg, h, gen_idx):
    run_dir = os.path.join(NAS_LOG_DIR, f"g{gen_idx:02d}_{h[:8]}")
    if h in evaluated:
        return evaluated[h]
    out = run_cfg_in_subprocess(cfg, run_dir, train_base, val_base, label_map)
    if out is None:
        return None
    fit = compute_fitness(out["val_acc"], out["params"], out["train_sec"])
    rec = {
        "hash": h,
        "gen": gen_idx,
        "val_acc": out["val_acc"],
        "val_loss": out["val_loss"],
        "params": out["params"],
        "train_sec": out["train_sec"],
        "fitness": fit,
        "run_dir": run_dir,
        "cfg_json": json.dumps(cfg, sort_keys=True)
    }
    evaluated[h] = rec
    append_log(rec)
    return rec


# ===== Generation 0 =====
population = []
seen = set()
print("\n========= Generation 0 START (pop=12, init random) =========")
while len(population) < POP_SIZE:
    cfg = sample_config_with_constraint()
    h = cfg_hash(cfg)
    if h not in seen and h not in evaluated:
        population.append((cfg, h))
        seen.add(h)

records = []
for cfg, h in population:
    rec = evaluate_cfg(cfg, h, 0)
    if rec: records.append(rec)

records.sort(key=lambda x: (x["fitness"], x["val_acc"], -x["val_loss"]), reverse=True)
print_leaderboard(records, gen_idx=0)

prune_non_elites(0, records, ELITE_K)

best = records[0]
history_best.append(best)

# ===== Generations 1..N_GEN-1 =====
for gen in range(1, N_GEN):
    print(f"\n========= Generation {gen} START (pop={POP_SIZE}) =========")
    elites = records[:ELITE_K]

    # children 6
    children = []
    blacklist = set(evaluated.keys())
    attempts = 0
    while len(children) < CHILDREN_K and attempts < 1000:
        attempts += 1
        pa, pb = random.sample(elites, 2)
        ca = json.loads(pa["cfg_json"])
        cb = json.loads(pb["cfg_json"])
        child = mutate(crossover(ca, cb), prob=MUT_PROB)
        ch = cfg_hash(child)
        if ch not in blacklist and ch not in {h for _, h in children} and ch not in {e["hash"] for e in elites}:
            children.append((child, ch))
        else:
            # 충돌이 계속되면 랜덤 이주자 대체
            rcfg, rh = ensure_unique_from(sample_config_with_constraint,
                                          blacklist | {h for _, h in children} | {e["hash"] for e in elites})
            children.append((rcfg, rh))

    # immigrants 2
    immigrants = []
    while len(immigrants) < IMMIGRANTS_K:
        cfg, h = ensure_unique_from(sample_config_with_constraint,
                                    set(evaluated.keys()) | {x[1] for x in children} | {e["hash"] for e in elites})
        immigrants.append((cfg, h))

    # next population (elites carry-over + children + immigrants)
    next_population = []
    for e in elites:
        cfg_e = json.loads(e["cfg_json"])
        next_population.append((cfg_e, e["hash"]))
    next_population.extend(children)
    next_population.extend(immigrants)

    # evaluate (skip elites)
    records = []
    elite_hashes = {e["hash"] for e in elites}
    for cfg, h in next_population:
        if h in elite_hashes:
            records.append(evaluated[h])  # carry-over
        else:
            rec = evaluate_cfg(cfg, h, gen)
            if rec: records.append(rec)

    records.sort(key=lambda x: (x["fitness"], x["val_acc"], -x["val_loss"]), reverse=True)
    print_leaderboard(records, gen_idx=gen)

    prune_non_elites(gen, records, ELITE_K)


    best = records[0]
    history_best.append(best)

# ===== Final best =====
global_best = max(evaluated.values(), key=lambda x: (x["fitness"], x["val_acc"], -x["val_loss"]))
print("\n[Final Best]")
print(f"hash={global_best['hash']}, gen={global_best['gen']}")
print(f"fitness={global_best['fitness']:.6f}, acc={global_best['val_acc']:.4f}, loss={global_best['val_loss']:.4f}, params={global_best['params']:,}, sec={global_best['train_sec']:.1f}")
print(f"run_dir={global_best['run_dir']}")
print("\nBest Config JSON")
print(json.dumps(json.loads(global_best["cfg_json"]), indent=2))




[Run-START] g00_4874c2e7
{"dtm": {"conv1x1_out": 128, "stage1": {"block_type": "mbconv", "expand_ratio": 2, "out_channels": 16, "repeats": 2, "stride": 1, "use_se": true}, "stage2": {"block_type": "mbconv", "expand_ratio": 2, "out_channels": 56, "repeats": 2, "stride": 2, "use_se": false}, "stage3": {"block_type": "mbconv", "expand_ratio": 2, "out_channels": 64, "repeats": 6, "stride": 1, "use_se": true}, "stage4": {"block_type": "mbconv", "expand_ratio": 4, "out_channels": 104, "repeats": 6, "stride": 2, "use_se": true}}, "rtm": {"conv1x1_out": 128, "stage1": {"block_type": "mbconv", "expand_ratio": 2, "out_channels": 16, "repeats": 2, "stride": 1, "use_se": true}, "stage2": {"block_type": "mbconv", "expand_ratio": 2, "out_channels": 56, "repeats": 2, "stride": 2, "use_se": false}, "stage3": {"block_type": "mbconv", "expand_ratio": 2, "out_channels": 64, "repeats": 6, "stride": 1, "use_se": true}, "stage4": {"block_type": "mbconv", "expand_ratio": 4, "out_channels": 104, "repeats": 

In [6]:
# =========================
# Cell 5. 최종 Top-4 결과
# =========================
df = pd.read_csv(NAS_LOG_CSV)

# fitness 기준 내림차순 정렬
df_sorted = df.sort_values(by=["fitness", "val_acc", "val_loss"], ascending=[False, False, True])

# 상위 4개만 출력
top4 = df_sorted.head(10)

print("\n[Final Top-4 Models from results.csv]")
print(top4.to_string(index=False))



[Final Top-4 Models from results.csv]
                                    hash  gen  val_acc  val_loss  params  train_sec  fitness                                   run_dir                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           