In [26]:
from pathlib import Path
import pandas as pd
import numpy as np
import json
import time
import random

PROJECT_ROOT = Path("..").resolve()
DATA_ROOT = PROJECT_ROOT / "data_csv"
CORE7_ROOT = PROJECT_ROOT / "artifact" / "core7"
CURIEUS_ROOT = DATA_ROOT / "curieus"

CORE7_ROOT.mkdir(parents=True, exist_ok=True)

RUN_TAG = "counterfactual_soft"
OUT_EVENT_LOG = CORE7_ROOT / f"core7_04_{RUN_TAG}_event_log.csv"
OUT_DIAG_LOG  = CORE7_ROOT / f"core7_04_{RUN_TAG}_diagnostics.csv"

OUT_EVENT_LOG, OUT_DIAG_LOG

(PosixPath('/Users/mac/Desktop/De/Developability_Data/core/artifact/core7/core7_04_counterfactual_soft_event_log.csv'),
 PosixPath('/Users/mac/Desktop/De/Developability_Data/core/artifact/core7/core7_04_counterfactual_soft_diagnostics.csv'))

In [27]:
ANTIBODIES = ["antibody_A", "antibody_B", "antibody_C"]

CASES = {
    "A_ALWAYS_ALLOW": {"governed": False},
    "B_GOVERNED": {"governed": True},
}

INTENDED_AXES = ["IMMUNO", "VISCOSITY", "CHARGE", "PROCESS"]

GLOBAL_SEED = 404
random.seed(GLOBAL_SEED)
np.random.seed(GLOBAL_SEED) # 실험 대상 고정

In [28]:
def load_baseline_risk(ab_key: str):
    base = CURIEUS_ROOT / ab_key

    risk = {
        "IMMUNO": "MEDIUM",
        "VISCOSITY": "MEDIUM",
        "CHARGE": "MEDIUM",
        "PROCESS": "MEDIUM",
    }

    visco_files = list(base.glob("*DeepViscosity*.csv"))
    if visco_files:
        df = pd.read_csv(visco_files[0])
        if not df.empty:
            risk["VISCOSITY"] = "HIGH"

    return risk # Baseline Risk 로딩

BASELINE_RISK = {ab: load_baseline_risk(ab) for ab in ANTIBODIES}
BASELINE_RISK

def build_mutation_pool(n=25):
    pool = []
    for i in range(n):
        pool.append({
            "candidate_id": f"M{i:03d}",
            "mutation_desc": f"CDR_mut_{i}",
            "intended_axis": random.choice(INTENDED_AXES),
        })
    return pool  # Mutation Pool

MUTATION_POOL = build_mutation_pool()
len(MUTATION_POOL)

25

In [29]:
def generate_event(baseline_risk, intended_axis):
    base_level = baseline_risk[intended_axis]

    if base_level == "HIGH":
        conflict_prob = 0.45
    elif base_level == "MEDIUM":
        conflict_prob = 0.20
    else:
        conflict_prob = 0.05

    conflict = np.random.rand() < conflict_prob

    severity = (
        "HIGH" if conflict and np.random.rand() < 0.3
        else "MEDIUM" if conflict
        else random.choice(["LOW", "MEDIUM"])
    )

    return {
        "event_type": intended_axis,
        "severity": severity,
        "conflict_flag": conflict,
    } # Counterfactual Event Generator

def update_state(prev_state, event, mutation_count):
    sod = prev_state["SoD"]
    soms = prev_state["SoMS"]

    sev_weight = {"LOW": 0.3, "MEDIUM": 0.7, "HIGH": 1.2}
    sod += sev_weight[event["severity"]] + 0.15 * mutation_count

    if event["conflict_flag"]:
        soms += 0.6
    else:
        soms = max(0, soms - 0.05)

    return {"SoD": sod, "SoMS": soms} # State Update (누적형 SoMS)

In [30]:
POLICY = {
    "VETO_SOMS": 3.5,
    "FREEZE_SOMS": 6.5,

    "VETO_CONFLICT_STREAK": 3,
    "FREEZE_VETO_STREAK": 3,

    "FORCED_VETO_STEP": 18,  # 비교용 보험
} # Counterfactual Policy (느리게 막힘)

def governance_decision(state, conflict_streak, veto_streak, step, case_id):
    # 1. 비교용 강제 veto (보험)
    if case_id == "B_GOVERNED" and step == POLICY["FORCED_VETO_STEP"]:
        return "VETO", "REASON_FORCED_COMPARISON"

    # 2. veto 누적 → freeze (기존 유지)
    if veto_streak >= POLICY["FREEZE_VETO_STREAK"]:
        return "FREEZE", "REASON_VETO_ACCUMULATION"

    # 3. SoMS 누적 → freeze (기존 유지)
    if state["SoMS"] >= POLICY["FREEZE_SOMS"]:
        return "FREEZE", "REASON_SOMS_CUMSUM"

    # 4. SoMS 임계 → veto (기존 유지)
    if state["SoMS"] >= POLICY["VETO_SOMS"]:
        return "VETO", "REASON_SOMS_THRESHOLD"

    # 5. ⭐ soft escalation (신규 추가)
    #    → 연속 conflict가 2번만 와도 veto
    #    → Core8에서 veto_streak 관측 가능
    if conflict_streak >= 2:
        return "VETO", "REASON_SOFT_CONFLICT_ESCALATION"

    # 6. 기본 허용
    return "ALLOW", ""

In [31]:
RUN_ID = f"core7_04_{int(time.time())}"

event_logs = []
diag_logs = []

N_STEPS = 30 # Main Simulation Loop

for case_id, case_cfg in CASES.items():
    for ab in ANTIBODIES:
        baseline = BASELINE_RISK[ab]

        state = {"SoD": 0.0, "SoMS": 0.0}
        mutation_count = 0
        conflict_streak = 0
        veto_streak = 0

        for step in range(N_STEPS):
            mutation = random.choice(MUTATION_POOL)

            event = generate_event(baseline, mutation["intended_axis"])
            state = update_state(state, event, mutation_count)

            conflict_streak = conflict_streak + 1 if event["conflict_flag"] else 0

            if case_cfg["governed"]:
                action, reason = governance_decision(
                    state, conflict_streak, veto_streak, step, case_id
                )
            else:
                action, reason = "ALLOW", ""

            if action == "VETO":
                veto_streak += 1
            else:
                veto_streak = 0

            blocked = action != "ALLOW"
            if action == "ALLOW":
                mutation_count += 1

            event_logs.append({
                "run_id": RUN_ID,
                "case_id": case_id,
                "antibody_id": ab,
                "step": step,
                "attempted": True,
                "action": action,
                "reason_code": reason,
                "SoD": round(state["SoD"], 3),
                "SoMS": round(state["SoMS"], 3),
                "conflict_flag": event["conflict_flag"],
                "mutation_desc": mutation["mutation_desc"],
                "intended_axis": mutation["intended_axis"],
                "blocked": blocked,
            })

            if action == "FREEZE":
                break

        diag_logs.append({
            "run_id": RUN_ID,
            "case_id": case_id,
            "antibody_id": ab,
            "n_steps": step + 1,
            "seed": GLOBAL_SEED,
            "policy": json.dumps(POLICY),
            "mutation_pool_size": len(MUTATION_POOL),
        })

In [32]:
event_df = pd.DataFrame(event_logs)
diag_df = pd.DataFrame(diag_logs)

event_df.to_csv(OUT_EVENT_LOG, index=False)
diag_df.to_csv(OUT_DIAG_LOG, index=False)

OUT_EVENT_LOG, OUT_DIAG_LOG

(PosixPath('/Users/mac/Desktop/De/Developability_Data/core/artifact/core7/core7_04_counterfactual_soft_event_log.csv'),
 PosixPath('/Users/mac/Desktop/De/Developability_Data/core/artifact/core7/core7_04_counterfactual_soft_diagnostics.csv'))