In [None]:
import argparse
import json
from pathlib import Path

import numpy as np
import pandas as pd


def parse_args():
    ap = argparse.ArgumentParser()
    ap.add_argument("--vnat_h5", type=str, required=True)
    ap.add_argument("--flows_parquet", type=str, required=True)
    ap.add_argument("--train_caps", type=str, required=True)  # vnat_train_captures.txt
    ap.add_argument("--out_dir", type=str, required=True)

    ap.add_argument("--N", type=int, default=100)
    ap.add_argument("--MIN_K", type=int, default=50)
    ap.add_argument("--max_raw", type=int, default=0, help="0 disables cap; else cap raw packets per flow (e.g., 5000)")
    ap.add_argument("--h5_key", type=str, default="", help="H5 key; leave empty to auto-pick first key")
    return ap.parse_args()


class RunningStats:
    """Running mean/std via sums. Stable and fast."""
    def __init__(self):
        self.sum = 0.0
        self.sumsq = 0.0
        self.count = 0

    def update(self, vals: np.ndarray):
        v = vals.astype(np.float64, copy=False).ravel()
        if v.size == 0:
            return
        self.sum += float(v.sum())
        self.sumsq += float((v * v).sum())
        self.count += int(v.size)

    def finalize(self):
        if self.count <= 1:
            return 0.0, 1.0
        mu = self.sum / self.count
        var = max((self.sumsq / self.count) - (mu * mu), 0.0)
        sigma = float(np.sqrt(var))
        if sigma == 0.0:
            sigma = 1.0
        return float(mu), float(sigma)


def main():
    args = parse_args()

    VNAT_H5 = Path(args.vnat_h5)
    FLOWS = Path(args.flows_parquet)
    TRAIN_CAPS_TXT = Path(args.train_caps)
    OUT_DIR = Path(args.out_dir)
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    N = int(args.N)
    MIN_K = int(args.MIN_K)
    MAX_RAW = int(args.max_raw)

    OUT_X = OUT_DIR / "cnn_X.npy"
    OUT_C = OUT_DIR / "cnn_count.npy"
    OUT_Y = OUT_DIR / "cnn_y.npy"
    OUT_K = OUT_DIR / "cnn_k.npy"
    OUT_STATS = OUT_DIR / "normalization_stats.json"

    # --- load flows + train split ---
    flows = pd.read_parquet(FLOWS).set_index("flow_id").sort_index()
    num_flows = len(flows)

    train_caps = set(TRAIN_CAPS_TXT.read_text(encoding="utf-8").splitlines())
    is_train_flow = flows["capture_id"].isin(train_caps).to_numpy()

    Y = flows["label"].to_numpy(dtype=np.int64)

    # --- allocate outputs ---
    X = np.zeros((num_flows, N, 2), dtype=np.float32)
    C = np.zeros((num_flows,), dtype=np.float32)
    K = np.zeros((num_flows,), dtype=np.int32)

    rs_size = RunningStats()
    rs_iat = RunningStats()

    # --- read entire VNAT H5 (server RAM should handle) ---
    # If you still want safety, you can use HDFStore + select, but on server itâ€™s usually fine.
    if args.h5_key.strip():
        key = args.h5_key.strip()
        df = pd.read_hdf(VNAT_H5, key=key)
    else:
        with pd.HDFStore(VNAT_H5, mode="r") as store:
            keys = store.keys()
            if not keys:
                raise ValueError("H5 file has no keys.")
            key = keys[0]
        df = pd.read_hdf(VNAT_H5, key=key)

    # align row i with flow_id i
    df = df.reset_index(drop=True)

    if len(df) != num_flows:
        raise ValueError(f"Mismatch: H5 rows={len(df)} vs flows={num_flows}. "
                         f"Your flow_id alignment assumption breaks.")

    # --- build X/C/K and collect train stats ---
    for flow_id in range(num_flows):
        ts = np.asarray(df.at[flow_id, "timestamps"], dtype=np.float64)
        sz = np.asarray(df.at[flow_id, "sizes"], dtype=np.float64)
        dr = np.asarray(df.at[flow_id, "directions"], dtype=np.int64)

        if ts.size == 0:
            continue

        if MAX_RAW > 0 and ts.size > MAX_RAW:
            ts = ts[:MAX_RAW]
            sz = sz[:MAX_RAW]
            dr = dr[:MAX_RAW]

        # sort by timestamp
        order = np.argsort(ts)
        ts, sz, dr = ts[order], sz[order], dr[order]

        k = int(min(sz.size, N))
        K[flow_id] = k
        C[flow_id] = k / float(N)

        # signed packet size
        signs = np.where(dr[:k] == 1, 1.0, -1.0)
        signed_sz = signs * sz[:k]

        # IAT
        iat = np.zeros(k, dtype=np.float64)
        if k > 1:
            iat[1:] = np.diff(ts[:k])
            iat[iat < 0] = 0.0

        # transforms
        size_feat = np.sign(signed_sz) * np.log1p(np.abs(signed_sz))
        iat_feat = np.log1p(iat)

        X[flow_id, :k, 0] = size_feat.astype(np.float32)
        X[flow_id, :k, 1] = iat_feat.astype(np.float32)

        # train-only stats, only if k>=MIN_K
        if is_train_flow[flow_id] and (k >= MIN_K):
            rs_size.update(X[flow_id, :k, 0])
            rs_iat.update(X[flow_id, :k, 1])

        if (flow_id + 1) % 2000 == 0:
            print(f"Processed {flow_id+1}/{num_flows}")

    mu_size, sigma_size = rs_size.finalize()
    mu_iat, sigma_iat = rs_iat.finalize()

    train_valid = int(np.sum(is_train_flow & (K >= MIN_K)))
    print("Train flows with k>=MIN_K:", train_valid)
    print("Norm stats:", {"mu_size": mu_size, "sigma_size": sigma_size, "mu_iat": mu_iat, "sigma_iat": sigma_iat})

    # --- normalize globally using train stats ---
    X[:, :, 0] = (X[:, :, 0] - mu_size) / sigma_size
    X[:, :, 1] = (X[:, :, 1] - mu_iat) / sigma_iat

    # --- save ---
    np.save(OUT_X, X)
    np.save(OUT_C, C)
    np.save(OUT_Y, Y)
    np.save(OUT_K, K)

    stats = {
        "N": N,
        "MIN_K_for_norm_stats": MIN_K,
        "MAX_RAW": MAX_RAW,
        "h5_key_used": key,
        "mu": {"size": mu_size, "iat": mu_iat},
        "sigma": {"size": sigma_size, "iat": sigma_iat},
        "count_norm": "pkt_count_observed / N",
        "shapes": {"X": list(X.shape), "C": [int(C.shape[0])], "Y": [int(Y.shape[0])], "K": [int(K.shape[0])]},
    }
    OUT_STATS.write_text(json.dumps(stats, indent=2), encoding="utf-8")

    print("Saved:")
    print(" ", OUT_X)
    print(" ", OUT_C)
    print(" ", OUT_Y)
    print(" ", OUT_K)
    print(" ", OUT_STATS)


if __name__ == "__main__":
    main()

Total flows: 33711
Excluded flows (pkt_len > 5000): 371 (1.1005%)
Safe ranges to read from H5: 302


MemoryError: 

- In case I cannot access better resources for running the cell above.

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
import json

VNAT_H5 = Path("../data/raw/vnat/VNAT_Dataframe_release_1.h5")
FLOWS   = Path("../data/processed/vnat/flows.parquet")

OUT_DIR = Path("../data/processed/vnat")
OUT_DIR.mkdir(parents=True, exist_ok=True)

N = 100
MIN_K = 50
MAX_RAW = 5000  # cap to prevent VL allocation blowups on monster flows

OUT_X = OUT_DIR / "cnn_X.npy"
OUT_C = OUT_DIR / "cnn_count.npy"
OUT_Y = OUT_DIR / "cnn_y.npy"
OUT_K = OUT_DIR / "cnn_k.npy"

flows = pd.read_parquet(FLOWS).set_index("flow_id").sort_index()
num_flows = len(flows)
Y = flows["label"].to_numpy(dtype=np.int64)

train_caps = set(Path("../data/splits/vnat_train_captures.txt").read_text(encoding="utf-8").splitlines())
train_ids = set(flows.index[flows["capture_id"].isin(train_caps)].to_numpy().tolist())

X = np.zeros((num_flows, N, 2), dtype=np.float32)
C = np.zeros((num_flows,), dtype=np.float32)
K = np.zeros((num_flows,), dtype=np.int32)

# running stats (Welford)
class RunningStats:
    def __init__(self):
        self.n = 0
        self.mean = 0.0
        self.M2 = 0.0
    def update(self, arr):
        arr = np.asarray(arr, dtype=np.float64).ravel()
        for v in arr:
            self.n += 1
            delta = v - self.mean
            self.mean += delta / self.n
            delta2 = v - self.mean
            self.M2 += delta * delta2
    def finalize(self):
        if self.n < 2:
            return float(self.mean), 1.0
        var = self.M2 / self.n
        std = float(np.sqrt(var))
        if std == 0.0:
            std = 1.0
        return float(self.mean), std

rs_size = RunningStats()
rs_iat  = RunningStats()

bad_rows = 0
skipped_monster = 0

def safe_read_one_row(i: int):
    # read exactly 1 row; avoids VL chunk allocation issues
    try:
        d = pd.read_hdf(VNAT_H5, start=i, stop=i+1)
        if len(d) != 1:
            return None
        return d.iloc[0]
    except Exception:
        return None

for flow_id in range(num_flows):
    row = safe_read_one_row(flow_id)
    if row is None:
        bad_rows += 1
        continue

    ts = np.asarray(row["timestamps"], dtype=np.float64)
    sz = np.asarray(row["sizes"], dtype=np.float64)
    dr = np.asarray(row["directions"], dtype=np.int64)

    if ts.size == 0:
        continue

    # cap extremely long flows early to avoid heavy ops
    if ts.size > MAX_RAW:
        ts = ts[:MAX_RAW]
        sz = sz[:MAX_RAW]
        dr = dr[:MAX_RAW]
        skipped_monster += 1

    # sort by time
    order = np.argsort(ts)
    ts, sz, dr = ts[order], sz[order], dr[order]

    k = int(min(sz.size, N))
    K[flow_id] = k
    C[flow_id] = k / float(N)

    # signed sizes
    signs = np.where(dr[:k] == 1, 1.0, -1.0)
    signed_sz = signs * sz[:k]

    # IAT
    iat = np.zeros(k, dtype=np.float64)
    if k > 1:
        iat[1:] = np.diff(ts[:k])
        iat[iat < 0] = 0.0

    # transforms
    size_feat = np.sign(signed_sz) * np.log1p(np.abs(signed_sz))
    iat_feat  = np.log1p(iat)

    X[flow_id, :k, 0] = size_feat.astype(np.float32)
    X[flow_id, :k, 1] = iat_feat.astype(np.float32)

    # update normalization stats on TRAIN ONLY, and only if enough packets
    if (flow_id in train_ids) and (k >= MIN_K):
        rs_size.update(X[flow_id, :k, 0])
        rs_iat.update(X[flow_id, :k, 1])

    if (flow_id + 1) % 500 == 0:
        print(f"Processed {flow_id+1}/{num_flows} | bad_rows={bad_rows} | capped_monster={skipped_monster}")

mu_size, sigma_size = rs_size.finalize()
mu_iat,  sigma_iat  = rs_iat.finalize()

print("Final stats:", mu_size, sigma_size, mu_iat, sigma_iat)
print("Bad rows:", bad_rows, "| Monster flows capped:", skipped_monster)

# standardize
X[:, :, 0] = (X[:, :, 0] - mu_size) / sigma_size
X[:, :, 1] = (X[:, :, 1] - mu_iat) / sigma_iat

# save
np.save(OUT_X, X)
np.save(OUT_C, C)
np.save(OUT_Y, Y)
np.save(OUT_K, K)

stats = {
    "N": N,
    "MIN_K_for_norm_stats": MIN_K,
    "MAX_RAW_cap": MAX_RAW,
    "mu": {"size": mu_size, "iat": mu_iat},
    "sigma": {"size": sigma_size, "iat": sigma_iat},
    "count_norm": "pkt_count_observed / N",
    "bad_rows": int(bad_rows),
    "monster_flows_capped": int(skipped_monster),
}

Path("../artifacts/cnn").mkdir(parents=True, exist_ok=True)
Path("../artifacts/cnn/normalization_stats.json").write_text(json.dumps(stats, indent=2), encoding="utf-8")

print("Saved:", OUT_X, OUT_C, OUT_Y, OUT_K)
print("X shape:", X.shape, "C min/max:", float(C.min()), float(C.max()))
print("Y labels:", set(np.unique(Y).tolist()))
print("Stats saved to artifacts/cnn/normalization_stats.json")

Processed 500/33711 | bad_rows=421 | capped_monster=0


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x000001EAF48555D0>>
Traceback (most recent call last):
  File "c:\Users\scoti\AppData\Local\Programs\Python\Python311\Lib\site-packages\ipykernel\ipkernel.py", line 781, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 
