In [6]:
# =========================
# CELL 1 — Config & Paths (clean + judge-friendly)
# - Tách rõ: DATA/MODEL vs AUTOSCALING/SIM
# - Window-aware + Metric-aware (buffer/capacity)
# - Không phá các helper đã dùng ở cell sau
# =========================

import os, re, json, math
from datetime import datetime, timezone
from typing import Dict, Any, List, Tuple, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

np.random.seed(42)

# -------------------------
# Paths
# -------------------------
from pathlib import Path
import os, shutil

# Root luôn là thư mục notebooks (vì notebook nằm trong notebooks/)
PROJECT_ROOT = Path.cwd()   # => .../AUTOSCALING-ANALYSIS/notebooks

# (Optional) Nếu data đang nằm ở ../data thì copy vào notebooks/data để mọi thứ "nằm trong notebooks"
src_data = (PROJECT_ROOT / ".." / "data").resolve()
dst_data = (PROJECT_ROOT / "data").resolve()

if not (dst_data / "raw").exists() and (src_data / "raw").exists():
    dst_data.mkdir(parents=True, exist_ok=True)
    shutil.copytree(src_data, dst_data, dirs_exist_ok=True)
    print(f"✅ Copied data from {src_data} -> {dst_data}")

PROJECT_ROOT = str(PROJECT_ROOT)  # giữ kiểu string cho code hiện tại
OUT_02 = os.path.join(PROJECT_ROOT, "outputs", "02_eda")
OUT_03 = os.path.join(PROJECT_ROOT, "outputs", "03_features")
OUT_04 = os.path.join(PROJECT_ROOT, "outputs", "04_models")
OUT_04P = os.path.join(OUT_04, "predictions")
OUT_05 = os.path.join(PROJECT_ROOT, "outputs", "05_scaling")

for p in [OUT_02, OUT_03, OUT_04, OUT_04P, OUT_05]:
    os.makedirs(p, exist_ok=True)

# -------------------------
# Core helpers (keep as-is for other cells)
# -------------------------
def tag_minutes(tag: str) -> int:
    return {"1m": 1, "5m": 5, "15m": 15}[tag]

def steps_per_day(tag: str) -> int:
    return int(24 * 60 / tag_minutes(tag))

def steps_per_hour(tag: str) -> int:
    return int(60 / tag_minutes(tag))

def resolve_roll_windows(tag: str, roll_windows: List[str]) -> Dict[str, int]:
    sph = steps_per_hour(tag)
    spd = steps_per_day(tag)
    out = {}
    for w in roll_windows:
        if w == "1h":
            out[w] = 1 * sph
        elif w == "6h":
            out[w] = 6 * sph
        elif w == "1d":
            out[w] = 1 * spd
        else:
            raise ValueError(f"Unsupported roll window: {w}")
    return out

# -------------------------
# CFG (one source of truth)
# -------------------------
CFG: Dict[str, Any] = {
    # ===== Dataset =====
    "RAW_LOG_PATH": os.path.join(PROJECT_ROOT, "data", "access_log.txt"),  # optional
    "TAGS": ["1m", "5m", "15m"],
    "TIME_COL_RAW": "timestamp",
    "TIME_COL_BUCKET": "bucket_start",

    # Storm gap (problem statement)
    "STORM_START": pd.Timestamp("1995-08-01 14:52:01"),
    "STORM_END":   pd.Timestamp("1995-08-03 04:36:13"),

    # ===== Feature engineering =====
    "LAG_DAYS": [1,2,3,4,5,6,7],
    "ROLL_WINDOWS": ["1h","6h","1d"],
    "ROLL_USE_STD": True,
    "USE_CYCLIC": True,
    "HORIZON_STEPS": 1,
    "KEEP_RAW_EXTRA": [
        "unique_hosts","err_4xx","err_5xx","error_rate",
        "is_missing_bucket","is_gap_storm","is_gap_unknown"
    ],
    "REQUIRE_COLS": ["bucket_start","hits","bytes_sum","is_gap"],

    # ===== Modeling =====
    "TARGETS": ["hits", "bytes_sum"],
    "XGB_PARAMS": dict(
        booster="gbtree",
        n_estimators=5000,
        early_stopping_rounds=50,
        objective="reg:squarederror",
        max_depth=6,
        learning_rate=0.05,
        subsample=0.9,
        colsample_bytree=0.9,
        reg_lambda=1.0,
        random_state=42,
    ),
    "CV_SPLITS": 5,
    "CV_TEST_DAYS": 2,
    "CV_GAP_STEPS": 1,

    # ==========================================================
    # AUTOSCALING / SIMULATION CONFIG (Window-aware + Metric-aware)
    # ==========================================================
    "SCALING": {
        # bounds
        "min_instances": 2,
        "max_instances": 50,

        # unit cost
        "cost_per_instance_per_hour": 0.05,

        # window -> minutes
        "window_minutes": {"1m": 1, "5m": 5, "15m": 15},

        # --- Metric-aware safety buffer (tránh bytes_sum bị under-provision)
        # hits thường ổn với buffer vừa; bytes_sum hay burst => buffer cao hơn
        "safety_buffer_by_metric": {"hits": 0.3, "bytes_sum": 0.3},

        # --- Per-instance capacity (tune để required_instances có dao động đẹp)
        # NOTE: nếu muốn demo "predictive có phản ứng", hạ bytes_sum cap xuống
        "capacity_per_instance": {
            ("hits","1m"): 20, ("hits","5m"): 100, ("hits","15m"): 350,
            ("bytes_sum","1m"): 350_000, ("bytes_sum","5m"): 1_200_000, ("bytes_sum","15m"): 3_500_000,
        },

        # --- Step change per window (15m không nên nhảy quá lớn cho đẹp)
        "max_step_change_by_window": {"1m": 6, "5m": 10, "15m": 15},

        # --- Hysteresis per window (1m noise => high/low lớn hơn)
        # high: số cửa sổ liên tiếp vượt ngưỡng mới scale-out
        # low : số cửa sổ liên tiếp dưới ngưỡng mới scale-in
        "hysteresis_by_window": {
            "1m": {"high": 2, "low": 6, "in_margin": 0.18},
            "5m": {"high": 1, "low": 4, "in_margin": 0.15},
            "15m":{"high": 1, "low": 2, "in_margin": 0.12},
        },

        "predictive_deadband_by_window": {"1m": 0.5, "5m": 0.5, "15m": 0.5},

        # --- cooldown (tính theo phút, convert trong code)
        "cooldown_minutes": {"base": 8, "spike": 15},

        # --- provisioning per window
        "provisioning_by_window": {
            "1m": {"warmup_windows": 1, "min_uptime_windows": 6},
            "5m": {"warmup_windows": 1, "min_uptime_windows": 4},
            "15m":{"warmup_windows": 0, "min_uptime_windows": 2},
        },

        # --- Reactive (rescue) knobs
        "reactive": {
            "enabled": True,
            "overload_scale_out_immediate": True,
            "rescue_extra_instances": 3,
            "queue_low_fraction": 0.05,
            "queue_high_multiplier": 4.0,  # cao hơn để giảm false rescue => đẹp demo
        },

        # --- SLO / latency model (đơn giản hóa)
        "slo": {
            "base_latency_ms": 80.0,
            "alpha_latency_per_unit_queue": 0.15,
            "p95_latency_target_ms": 300.0,
        },

        # --- Anomaly detection (MAD) theo lookback giờ (convert trong code)
        "anomaly": {
            "enabled": True,
            "method": "mad",
            "lookback_hours": 2,
            "mad_k": 6.0,
            "min_points": 10,
            "max_flag_rate": 0.30,
        },

        # --- DDoS mode (force step per window)
        "ddos_mode": {
            "enabled": True,
            "force_scale_out_step_by_window": {"1m": 6, "5m": 10, "15m": 12},
            "max_instances_during_ddos": 50,
        },
    }
}

print("✅ Cell 1 done — paths ready + CFG ready (CFG['SCALING'] exists)")


✅ Cell 1 done — paths ready + CFG ready (CFG['SCALING'] exists)


In [7]:
# CELL 5 — Feature engineering segment-safe -> outputs/03_features/*
TIME_COL = "bucket_start"
GAP_COL  = "is_gap"
SEG_COL  = "segment_id"

from typing import List

def assert_required_cols(df: pd.DataFrame, cols: List[str], name: str):
    miss = [c for c in cols if c not in df.columns]
    if miss:
        raise ValueError(f"[{name}] missing required cols: {miss}")

def build_segment_id(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()
    is_gap = pd.to_numeric(d[GAP_COL], errors="coerce").fillna(0).astype("int8")
    d[GAP_COL] = is_gap

    is_ok = (is_gap == 0)
    prev_gap = is_gap.shift(1).fillna(1).astype("int8")
    new_seg = (is_ok & (prev_gap == 1)).astype("int8")
    seg = new_seg.cumsum().astype("int32")

    d[SEG_COL] = seg.where(is_ok, other=-1).astype("int32")
    return d

def add_ratio_features(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()
    hits = pd.to_numeric(d["hits"], errors="coerce").fillna(0.0).astype(float)
    bsum = pd.to_numeric(d["bytes_sum"], errors="coerce").fillna(0.0).astype(float)
    d["avg_bytes_per_req"] = bsum / np.maximum(hits, 1.0)
    return d

def create_time_features(df: pd.DataFrame, tag: str, ref_time: pd.Timestamp, use_cyclic: bool=True):
    d = df.copy()

    # keep timezone consistent
    t = pd.to_datetime(d[TIME_COL])
    ref_time = pd.to_datetime(ref_time)
    if getattr(t.dt, "tz", None) is not None and getattr(ref_time, "tzinfo", None) is None:
        ref_time = ref_time.tz_localize(t.dt.tz)
    elif getattr(t.dt, "tz", None) is None and getattr(ref_time, "tzinfo", None) is not None:
        t = t.dt.tz_localize(ref_time.tzinfo)

    d["hour"] = t.dt.hour.astype("int16")
    d["minute"] = t.dt.minute.astype("int16")
    d["dayofweek"] = t.dt.dayofweek.astype("int16")
    d["month"] = t.dt.month.astype("int16")
    d["dayofyear"] = t.dt.dayofyear.astype("int16")
    d["is_weekend"] = (d["dayofweek"] >= 5).astype("int8")

    step_seconds = tag_minutes(tag) * 60
    d["time_idx"] = ((t - ref_time).dt.total_seconds() / step_seconds).astype("int64")

    cols = ["hour","minute","dayofweek","month","dayofyear","is_weekend","time_idx"]
    if use_cyclic:
        hour = d["hour"].astype(float)
        dow  = d["dayofweek"].astype(float)
        d["hour_sin"] = np.sin(2*np.pi*hour/24.0)
        d["hour_cos"] = np.cos(2*np.pi*hour/24.0)
        d["dow_sin"]  = np.sin(2*np.pi*dow/7.0)
        d["dow_cos"]  = np.cos(2*np.pi*dow/7.0)
        cols += ["hour_sin","hour_cos","dow_sin","dow_cos"]
    return d, cols

def add_lags(df: pd.DataFrame, tag: str, target: str):
    d = df.copy()
    spd = steps_per_day(tag)
    lag_steps = [int(x * spd) for x in CFG["LAG_DAYS"]]
    pref = f"{target}_"

    ok = d[d[SEG_COL] >= 0].copy()
    gap = d[d[SEG_COL] < 0].copy()

    def _per_seg(g):
        g = g.sort_values(TIME_COL).copy()
        y = pd.to_numeric(g[target], errors="coerce").astype(float)
        for days, k in zip(CFG["LAG_DAYS"], lag_steps):
            g[f"{pref}lag_{days}d"] = y.shift(k)
        if len(CFG["LAG_DAYS"]) >= 2:
            d0, d1 = CFG["LAG_DAYS"][0], CFG["LAG_DAYS"][1]
            g[f"{pref}diff_lag_{d0}d_{d1}d"] = g[f"{pref}lag_{d0}d"] - g[f"{pref}lag_{d1}d"]
        return g

    ok = ok.groupby(SEG_COL, group_keys=False).apply(_per_seg)
    out = pd.concat([ok, gap], ignore_index=True).sort_values(TIME_COL).reset_index(drop=True)

    feat_cols = [f"{pref}lag_{d}d" for d in CFG["LAG_DAYS"]]
    if len(CFG["LAG_DAYS"]) >= 2:
        d0, d1 = CFG["LAG_DAYS"][0], CFG["LAG_DAYS"][1]
        feat_cols.append(f"{pref}diff_lag_{d0}d_{d1}d")
    return out, feat_cols

def add_rolling(df: pd.DataFrame, tag: str, target: str):
    d = df.copy()
    roll_map = resolve_roll_windows(tag, CFG["ROLL_WINDOWS"])
    pref = f"{target}_"

    ok = d[d[SEG_COL] >= 0].copy()
    gap = d[d[SEG_COL] < 0].copy()

    def _per_seg(g):
        g = g.sort_values(TIME_COL).copy()
        y = pd.to_numeric(g[target], errors="coerce").astype(float)
        y_shift = y.shift(1)  # prevent leakage
        for wname, win in roll_map.items():
            g[f"{pref}roll_mean_{wname}"] = y_shift.rolling(win, min_periods=win).mean()
            if CFG["ROLL_USE_STD"]:
                g[f"{pref}roll_std_{wname}"] = y_shift.rolling(win, min_periods=win).std()
        return g

    ok = ok.groupby(SEG_COL, group_keys=False).apply(_per_seg)
    out = pd.concat([ok, gap], ignore_index=True).sort_values(TIME_COL).reset_index(drop=True)

    cols = []
    for wname in roll_map.keys():
        cols.append(f"{pref}roll_mean_{wname}")
        if CFG["ROLL_USE_STD"]:
            cols.append(f"{pref}roll_std_{wname}")
    return out, cols

def add_labels(df: pd.DataFrame, target: str, label_col: str, horizon_steps: int):
    d = df.copy()
    ok = d[d[SEG_COL] >= 0].copy()
    gap = d[d[SEG_COL] < 0].copy()

    def _per_seg(g):
        g = g.sort_values(TIME_COL).copy()
        g[label_col] = pd.to_numeric(g[target], errors="coerce").astype(float).shift(-horizon_steps)
        return g

    ok = ok.groupby(SEG_COL, group_keys=False).apply(_per_seg)
    out = pd.concat([ok, gap], ignore_index=True).sort_values(TIME_COL).reset_index(drop=True)
    return out

# ===== helpers for loading ts3 produced by 01 notebook =====
TRAIN_DIR = os.environ.get("SAVE_TRAIN_DIR", "data/train")
TEST_DIR  = os.environ.get("SAVE_TEST_DIR",  "data/test")
def load_ts3(split, k):
    p = os.path.join(TRAIN_DIR if split=="train" else TEST_DIR, f"ts3_{k}.parquet")
    df = pd.read_parquet(p)
    df["bucket_start"] = pd.to_datetime(df["bucket_start"], utc=False)
    return df

# =========================
# Build features per tag
# =========================
build_rows = []

for tag in CFG["TAGS"]:
    tr = load_ts3("train", tag)
    te = load_ts3("test", tag)

    assert_required_cols(tr, CFG["REQUIRE_COLS"], f"train_{tag}")
    assert_required_cols(te, CFG["REQUIRE_COLS"], f"test_{tag}")

    tr = build_segment_id(tr)
    te = build_segment_id(te)

    # Train: exclude gaps
    tr_clean = tr[tr[GAP_COL] == 0].copy()
    tr_clean = add_ratio_features(tr_clean)

    # ref_time: earliest of train+test (consistent time_idx)
    ref_time = pd.to_datetime(pd.concat([tr[[TIME_COL]], te[[TIME_COL]]], ignore_index=True)[TIME_COL].min())

    tr_clean, time_cols = create_time_features(tr_clean, tag, ref_time, use_cyclic=CFG["USE_CYCLIC"])

    tr_clean, hits_lag   = add_lags(tr_clean, tag, "hits")
    tr_clean, hits_roll  = add_rolling(tr_clean, tag, "hits")
    tr_clean, bytes_lag  = add_lags(tr_clean, tag, "bytes_sum")
    tr_clean, bytes_roll = add_rolling(tr_clean, tag, "bytes_sum")
    tr_clean, ratio_lag  = add_lags(tr_clean, tag, "avg_bytes_per_req")
    tr_clean, ratio_roll = add_rolling(tr_clean, tag, "avg_bytes_per_req")

    h = int(CFG["HORIZON_STEPS"])
    tr_clean = add_labels(tr_clean, "hits",      "y_hits_next",       h)
    tr_clean = add_labels(tr_clean, "bytes_sum", "y_bytes_sum_next",  h)

    keep_extra = [c for c in CFG["KEEP_RAW_EXTRA"] if c in tr_clean.columns]

    hits_feat_cols  = time_cols + hits_lag + hits_roll
    bytes_feat_cols = time_cols + bytes_lag + bytes_roll + ratio_roll + hits_roll
    all_feat_cols   = sorted(set(hits_feat_cols + bytes_feat_cols))

    keep_cols_train = (
        [TIME_COL, GAP_COL, SEG_COL, "hits","bytes_sum","avg_bytes_per_req"]
        + keep_extra
        + ["y_hits_next","y_bytes_sum_next"]
        + all_feat_cols
    )

    before = len(tr_clean)
    tr_out = tr_clean[keep_cols_train].copy()
    tr_out = tr_out.dropna(subset=["y_hits_next","y_bytes_sum_next"]).reset_index(drop=True)
    after = len(tr_out)

    tr_out.to_parquet(os.path.join(OUT_03, f"xgb_train_{tag}.parquet"), index=False)

    # =========================
    # Test features: concat history + test -> compute -> slice back safely
    # =========================
    hist_and_test = pd.concat([tr, te], ignore_index=True).sort_values(TIME_COL).reset_index(drop=True)
    hist_and_test = build_segment_id(hist_and_test)
    hist_and_test = add_ratio_features(hist_and_test)
    hist_and_test, _ = create_time_features(hist_and_test, tag, ref_time, use_cyclic=CFG["USE_CYCLIC"])

    hist_and_test, _ = add_lags(hist_and_test, tag, "hits")
    hist_and_test, _ = add_rolling(hist_and_test, tag, "hits")
    hist_and_test, _ = add_lags(hist_and_test, tag, "bytes_sum")
    hist_and_test, _ = add_rolling(hist_and_test, tag, "bytes_sum")
    hist_and_test, _ = add_lags(hist_and_test, tag, "avg_bytes_per_req")
    hist_and_test, _ = add_rolling(hist_and_test, tag, "avg_bytes_per_req")

    # ✅ slice test back safely: tz-safe TIME_COL join using int64 key
    def _to_tznaive_dt64(s: pd.Series) -> pd.Series:
        s = pd.to_datetime(s, errors="coerce")
        if getattr(s.dt, "tz", None) is not None:
            s = s.dt.tz_convert(None)
        return s

    # normalize TIME_COL for both (prevents tz-aware mismatch)
    hist_and_test[TIME_COL] = _to_tznaive_dt64(hist_and_test[TIME_COL])
    te[TIME_COL]            = _to_tznaive_dt64(te[TIME_COL])

    # build exact int64 timestamp key
    hist_and_test["_tkey"] = hist_and_test[TIME_COL].view("int64")
    te_key = te[[TIME_COL]].copy()
    te_key["_tkey"] = te_key[TIME_COL].view("int64")

    # guard against duplicates (should not happen, but fail fast if it does)
    if hist_and_test["_tkey"].duplicated().any():
        dup_n = int(hist_and_test["_tkey"].duplicated().sum())
        raise ValueError(f"[CELL5] hist_and_test has duplicate timestamps: {dup_n}")

    te_features = (
        te_key.merge(
            hist_and_test.drop(columns=[TIME_COL]),
            on="_tkey",
            how="left",
            validate="one_to_one"
        )
        .sort_values(TIME_COL)
        .reset_index(drop=True)
    )

    # cleanup
    te_features.drop(columns=["_tkey"], inplace=True)
    hist_and_test.drop(columns=["_tkey"], inplace=True)

    # sanity check: must not be all-NaN
    for c in ["hits", "bytes_sum"]:
        na_rate = float(te_features[c].isna().mean())
        if na_rate > 0.01:
            raise ValueError(f"[CELL5] join failed: te_features[{c}] NA rate = {na_rate:.4f}")

    keep_cols_test_feat = [TIME_COL, GAP_COL, SEG_COL, "hits","bytes_sum","avg_bytes_per_req"] + all_feat_cols
    te_features = te_features[keep_cols_test_feat].copy()
    te_features.to_parquet(os.path.join(OUT_03, f"xgb_test_features_{tag}.parquet"), index=False)

    te_truth = te[[TIME_COL,"hits","bytes_sum",GAP_COL,SEG_COL]].copy()
    te_truth = te_truth.rename(columns={"hits":"hits_true","bytes_sum":"bytes_sum_true"})
    te_truth.to_parquet(os.path.join(OUT_03, f"xgb_test_{tag}.parquet"), index=False)

    meta = {
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "tag": tag,
        "time_col": TIME_COL,
        "gap_col": GAP_COL,
        "segment_col": SEG_COL,
        "horizon_steps": h,
        "labels": {"hits":"y_hits_next","bytes_sum":"y_bytes_sum_next"},
        "time_features": time_cols,
        "hits_feature_cols": hits_feat_cols,
        "bytes_feature_cols": bytes_feat_cols,
        "all_feature_cols": all_feat_cols,
        "paths": {
            "train_in": os.path.join(OUT_02, f"ts3_train_{tag}.parquet"),
            "test_in":  os.path.join(OUT_02, f"ts3_test_{tag}.parquet"),
            "train_out": os.path.join(OUT_03, f"xgb_train_{tag}.parquet"),
            "test_truth_out": os.path.join(OUT_03, f"xgb_test_{tag}.parquet"),
            "test_features_out": os.path.join(OUT_03, f"xgb_test_features_{tag}.parquet"),
        },
        "spec": {
            "gap_policy": "train excludes is_gap==1; segment-safe; keep NaN in features; drop NaN only in labels",
            "lag_days": CFG["LAG_DAYS"],
            "roll_windows": CFG["ROLL_WINDOWS"],
        }
    }
    with open(os.path.join(OUT_03, f"meta_{tag}.json"), "w", encoding="utf-8") as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)

    build_rows.append({
        "tag": tag,
        "rows_out_train": after,
        "dropped_na_train_labels": before-after,
        "n_features": len(all_feat_cols),
        "train_time_min": str(tr_out[TIME_COL].min()) if len(tr_out) else None,
        "train_time_max": str(tr_out[TIME_COL].max()) if len(tr_out) else None,
        "test_rows": len(te_truth),
        "test_time_min": str(te_truth[TIME_COL].min()) if len(te_truth) else None,
        "test_time_max": str(te_truth[TIME_COL].max()) if len(te_truth) else None,
    })

report_df = pd.DataFrame(build_rows).sort_values("tag").reset_index(drop=True)
print(report_df.to_string(index=False))
print(" Cell 5 done — features saved to outputs/03_features/")


tag  rows_out_train  dropped_na_train_labels  n_features            train_time_min            train_time_max  test_rows       test_time_min       test_time_max
15m            4605                        3          45 1995-07-01 00:00:00-04:00 1995-08-22 23:30:00-04:00        864 1995-08-23 04:00:00 1995-09-01 03:45:00
 1m           69106                        3          45 1995-07-01 00:00:00-04:00 1995-08-22 23:58:00-04:00      12960 1995-08-23 04:00:00 1995-09-01 03:59:00
 5m           13819                        3          45 1995-07-01 00:00:00-04:00 1995-08-22 23:50:00-04:00       2592 1995-08-23 04:00:00 1995-09-01 03:55:00
 Cell 5 done — features saved to outputs/03_features/
