<a href="https://colab.research.google.com/github/JSJeong-me/GPT-Insights/blob/main/Tunnel/11-DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## “과부하 상황에서도 일정 수준의 실시간성 유지”를 목표로 한 DQN 기반 자원관리 전략

환경(Env)은 시계열을 따라 진행하며, 액션으로 CCTV_Load / Lighting_Controller / Dimming_Controller를 ±10%씩 조정합니다.

보상은 지연시간(Delay_Time_ms) 최소화, 안전 임계( CPU≥85, MEM≥90, Fire/Emergency ) 회피, **서비스 유지(너무 과격한 셰딩 억제)**를 함께 반영합니다.

Anomaly==1이거나 지연이 너무 커지면 에피소드가 종료됩니다.

타깃 네트워크/리플레이 버퍼/허브 로스(Huber)/그라디언트 클리핑 등 DQN 기본기를 갖춘 템플릿

In [4]:
# Create a ready-to-run PyTorch DQN template that learns a resource-management policy
# using the provided /mnt/data/Edge_Device_Training_Data.csv dataset.
#
# The environment is lightweight and gym-free to avoid extra dependencies.
# It treats actions as simple throttling/boosting of controllable loads
# (CCTV_Load, Lighting_Controller, Dimming_Controller) with clamps.
#
# Reward prioritizes: (1) keeping latency low (Delay_Time_ms), (2) avoiding
# anomaly conditions (CPU>=85, MEM>=90, Fire/Emergency), and (3) preserving
# service/throughput (discouraging aggressive shedding).
#
# Save script to /mnt/data/dqn_resource_manager.py for the user to download.

import os, textwrap, json, pandas as pd, numpy as np

script_path = "/content/dqn_resource_manager.py"

code = r'''
import math
import random
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Tuple, List, Dict, Any

import torch
import torch.nn as nn
import torch.optim as optim

# ------------------------------
# Config
# ------------------------------
CSV_PATH = "/content/Edge_Device_Training_Data.csv"
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# Control knobs and thresholds (tune as needed)
CPU_HIGH = 85.0
MEM_HIGH = 90.0
LAT_TARGET_MS = 80.0     # "real-time-ish" target
LAT_HARD_MAX_MS = 250.0

# Action magnitudes
ADJ_STEP = 0.10          # 10% increments

# Columns used
CTRL_COLS = ["CCTV_Load", "Lighting_Controller", "Dimming_Controller"]
BIN_COLS  = ["Generator", "Load_Switch", "Circuit_Breaker", "Jet_Fan"]  # kept as state only
SAFETY_COLS = ["Fire_Signal", "Emergency_Alarm"]
CORE_COLS = ["CPU_usage", "Memory_usage", "Delay_Time_ms"]
USE_COLS = ["CPU_usage","Memory_usage","ATD","ATS","Fire_Signal","Leak_Detector",
            "Dimming_Controller","Dimming_Addon","Generator","Transformer",
            "Load_Switch","Circuit_Breaker","Emergency_Alarm","Jet_Fan",
            "Lighting_Controller","Anemometer","CCTV_Load","Delay_Time_ms","Anomaly"]

# ------------------------------
# Utility: MinMax scaler (per column)
# ------------------------------
class MinMax:
    def __init__(self):
        self.ranges = {}

    def fit(self, df: pd.DataFrame, cols: List[str]):
        for c in cols:
            v = df[c].astype(float).values
            mn, mx = float(np.min(v)), float(np.max(v))
            if math.isclose(mx, mn):
                mx = mn + 1.0
            self.ranges[c] = (mn, mx)

    def transform_row(self, row: pd.Series, cols: List[str]) -> np.ndarray:
        out = []
        for c in cols:
            mn, mx = self.ranges[c]
            x = float(row[c])
            out.append((x - mn) / (mx - mn))
        return np.array(out, dtype=np.float32)

# ------------------------------
# Lightweight Env
# ------------------------------
class EdgeEnv:
    """
    A simple seq env over the time-series.
    Next state advances 1 step in time. Actions modify 'controllable' loads
    before evaluating reward. The actual dataset values are used as baseline,
    and actions apply a heuristic delta (clamped). This is a pragmatic simulator.
    """
    def __init__(self, df: pd.DataFrame):
        self.df = df.reset_index(drop=True)
        self.idx = 0
        self.n = len(self.df)
        self.ctrl_cols = CTRL_COLS
        self.state_cols = USE_COLS.copy()
        self.scaler = MinMax()
        self.scaler.fit(self.df, self.state_cols)

        # 7 discrete actions:
        # 0: no-op
        # 1: CCTV_Load -10%
        # 2: Lighting_Controller -10%
        # 3: Dimming_Controller -10%
        # 4: CCTV_Load +10%
        # 5: Lighting_Controller +10%
        # 6: Dimming_Controller +10%
        self.action_space_n = 7
        self.observation_space = len(self.state_cols)

        # heuristic coefficients: how control deltas affect CPU and Delay (tunable)
        self.alpha_cpu = 0.20    # CPU changes per -10% CCTV/Lighting/Dimming (fractional)
        self.beta_lat  = 8.0     # ms change per -10% control
        self.gamma_thr = 0.001   # reward for preserving throughput

    def _get_obs(self) -> np.ndarray:
        row = self.df.iloc[self.idx]
        return self.scaler.transform_row(row, self.state_cols)

    def _apply_action(self, row: pd.Series, a: int) -> pd.Series:
        row = row.copy()
        # compute proportional delta for each control
        delta = {c: 0.0 for c in self.ctrl_cols}
        if a == 1: delta["CCTV_Load"] = -ADJ_STEP
        elif a == 2: delta["Lighting_Controller"] = -ADJ_STEP
        elif a == 3: delta["Dimming_Controller"] = -ADJ_STEP
        elif a == 4: delta["CCTV_Load"] = +ADJ_STEP
        elif a == 5: delta["Lighting_Controller"] = +ADJ_STEP
        elif a == 6: delta["Dimming_Controller"] = +ADJ_STEP

        # apply to control columns with clamping [0,100] for percentages
        for c, d in delta.items():
            base = float(row[c])
            newv = base * (1.0 + d)
            newv = max(0.0, min(100.0, newv))
            row[c] = newv

        # Heuristic effect on CPU and Delay:
        # more shedding (negative delta) -> lower CPU and lower latency
        shed = -(delta["CCTV_Load"] + delta["Lighting_Controller"] + delta["Dimming_Controller"])  # positive if shedding
        cpu = float(row["CPU_usage"]) * (1.0 - self.alpha_cpu * shed)
        mem = float(row["Memory_usage"])  # assuming memory reacts slower
        delay = float(row["Delay_Time_ms"]) - self.beta_lat * shed

        # clamp reasonable bounds
        row["CPU_usage"] = max(0.0, min(100.0, cpu))
        row["Delay_Time_ms"] = max(0.0, delay)

        return row

    def reset(self, start_idx: int = None) -> np.ndarray:
        self.idx = start_idx if start_idx is not None else random.randint(0, max(0, self.n-2))
        return self._get_obs()

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict[str, Any]]:
        cur = self.df.iloc[self.idx]
        cur_adj = self._apply_action(cur, action)

        # Reward design
        cpu, mem = cur_adj["CPU_usage"], cur_adj["Memory_usage"]
        fire, alarm = int(cur_adj["Fire_Signal"]), int(cur_adj["Emergency_Alarm"])
        delay = cur_adj["Delay_Time_ms"]
        anomaly = int(cur_adj["Anomaly"])

        # priorities: keep delay below target, avoid anomaly, keep throughput (sum controls)
        r = 0.0
        # Latency shaping
        r += - max(0.0, (delay - LAT_TARGET_MS)) / 25.0
        # Safety penalties
        if cpu >= CPU_HIGH: r -= 1.0
        if mem >= MEM_HIGH: r -= 1.0
        if fire == 1: r -= 2.0
        if alarm == 1: r -= 2.0
        # Anomaly penalty and early stop
        if anomaly == 1: r -= 3.0

        # Throughput proxy (prefer higher service)
        thr = (cur_adj["CCTV_Load"] + cur_adj["Lighting_Controller"] + cur_adj["Dimming_Controller"]) / 300.0
        r += self.gamma_thr * thr

        done = False
        if anomaly == 1 or delay > LAT_HARD_MAX_MS or self.idx >= self.n - 2:
            done = True

        # advance time
        self.idx += 1
        obs = self._get_obs()
        info = {"cpu": float(cpu), "mem": float(mem), "delay": float(delay), "anomaly": anomaly}
        return obs, float(r), done, info

# ------------------------------
# DQN Components
# ------------------------------
class QNet(nn.Module):
    def __init__(self, in_dim: int, out_dim: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
            nn.Linear(128, out_dim),
        )

    def forward(self, x):
        return self.net(x)

@dataclass
class ReplayBuf:
    cap: int
    s: List[np.ndarray] = None
    a: List[int] = None
    r: List[float] = None
    ns: List[np.ndarray] = None
    d: List[bool] = None

    def __post_init__(self):
        self.s, self.a, self.r, self.ns, self.d = [], [], [], [], []

    def push(self, s, a, r, ns, d):
        if len(self.s) >= self.cap:
            self.s.pop(0); self.a.pop(0); self.r.pop(0); self.ns.pop(0); self.d.pop(0)
        self.s.append(s); self.a.append(a); self.r.append(r); self.ns.append(ns); self.d.append(d)

    def sample(self, batch: int):
        idx = np.random.choice(len(self.s), size=batch, replace=False)
        S = torch.tensor(np.array([self.s[i] for i in idx]), dtype=torch.float32)
        A = torch.tensor(np.array([self.a[i] for i in idx]), dtype=torch.int64).unsqueeze(1)
        R = torch.tensor(np.array([self.r[i] for i in idx]), dtype=torch.float32).unsqueeze(1)
        NS= torch.tensor(np.array([self.ns[i] for i in idx]), dtype=torch.float32)
        D = torch.tensor(np.array([self.d[i] for i in idx]), dtype=torch.float32).unsqueeze(1)
        return S, A, R, NS, D

def train_dqn(episodes=10, steps_per_ep=512, batch=64, gamma=0.99,
              eps_start=1.0, eps_end=0.05, eps_decay=0.995,
              lr=1e-3, buf_cap=20000, target_sync=200):
    # Load data
    df = pd.read_csv(CSV_PATH)
    # Basic cleaning/sorting
    ts_col = "Unnamed: 0" if "Unnamed: 0" in df.columns else None
    if ts_col is not None:
        try:
            df[ts_col] = pd.to_datetime(df[ts_col])
            df = df.sort_values(ts_col)
        except Exception:
            pass
    # Keep columns
    missing = [c for c in USE_COLS if c not in df.columns]
    if missing:
        raise ValueError(f"Missing columns in CSV: {missing}")
    df = df[USE_COLS].copy()

    env = EdgeEnv(df)
    q = QNet(env.observation_space, env.action_space_n)
    tq = QNet(env.observation_space, env.action_space_n)
    tq.load_state_dict(q.state_dict())

    opt = optim.AdamW(q.parameters(), lr=lr)
    loss_fn = nn.SmoothL1Loss()
    rb = ReplayBuf(buf_cap)

    eps = eps_start
    global_step = 0
    stats = {"ep_reward": [], "final_delay": [], "final_anomaly": []}

    for ep in range(episodes):
        s = env.reset()
        ep_r = 0.0
        last_info = {}
        for t in range(steps_per_ep):
            global_step += 1
            if random.random() < eps:
                a = random.randrange(env.action_space_n)
            else:
                with torch.no_grad():
                    qs = q(torch.tensor(s, dtype=torch.float32).unsqueeze(0))
                    a = int(torch.argmax(qs, dim=1).item())

            ns, r, done, info = env.step(a)
            rb.push(s, a, r, ns, done)
            s = ns
            ep_r += r
            last_info = info

            # train
            if len(rb.s) >= batch:
                S, A, R, NS, D = rb.sample(batch)
                with torch.no_grad():
                    max_next = tq(NS).max(1, keepdim=True)[0]
                    y = R + gamma * (1.0 - D) * max_next
                qvals = q(S).gather(1, A)
                loss = loss_fn(qvals, y)
                opt.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(q.parameters(), 1.0)
                opt.step()

                if global_step % target_sync == 0:
                    tq.load_state_dict(q.state_dict())

            if done:
                break

        eps = max(eps_end, eps * eps_decay)
        stats["ep_reward"].append(ep_r)
        stats["final_delay"].append(float(last_info.get("delay", np.nan)))
        stats["final_anomaly"].append(int(last_info.get("anomaly", 0)))
        print(f"[EP {ep+1:03d}] reward={ep_r:.2f} eps={eps:.3f} delay={stats['final_delay'][-1]:.1f} anom={stats['final_anomaly'][-1]}")

    # Save artifacts
    torch.save(q.state_dict(), "/content/data/dqn_resource_manager.pt")
    with open("/content/data/dqn_training_stats.json", "w") as f:
        import json; json.dump(stats, f, indent=2)
    return stats

if __name__ == "__main__":
    # quick smoke run (adjust episodes for real training)
    stats = train_dqn(episodes=5, steps_per_ep=512)
    print("Training finished. Stats:", {k: (v[-3:] if isinstance(v, list) else v) for k,v in stats.items()})
'''

with open(script_path, "w", encoding="utf-8") as f:
    f.write(code)

print(f"Script written to {script_path}")
print("Files that will be created after training:")
print("- /content/data/dqn_resource_manager.pt (trained model weights)")
print("- /content/data/dqn_training_stats.json (rewards/metrics)")


Script written to /content/dqn_resource_manager.py
Files that will be created after training:
- /content/data/dqn_resource_manager.pt (trained model weights)
- /content/data/dqn_training_stats.json (rewards/metrics)


In [6]:
!mkdir /content/data

In [7]:
!python /content/dqn_resource_manager.py

[EP 001] reward=-5.85 eps=0.995 delay=70.6 anom=1
[EP 002] reward=-14.74 eps=0.990 delay=153.2 anom=1
[EP 003] reward=-5.90 eps=0.985 delay=77.6 anom=1
[EP 004] reward=-27.52 eps=0.980 delay=141.7 anom=1
[EP 005] reward=-4.68 eps=0.975 delay=97.1 anom=1
Training finished. Stats: {'ep_reward': [-5.899677546413392, -27.515160439117018, -4.68222968570311], 'final_delay': [77.60492737015795, 141.66382881500138, 97.0663466186037], 'final_anomaly': [1, 1, 1]}


## https://chatgpt.com/s/t_68a00b47df348191b3062091eb1f5623