In [1]:
# =========================
# 0) Mount Google Drive
# =========================
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os, json, zipfile
from pathlib import Path
import networkx as nx

# =========================
# 1) CONFIG
# =========================
ZIP_BASE = Path("/content/drive/MyDrive/KLTN/file_graphml")

zip_paths = [
    ZIP_BASE / "benign"  / "benign1033.zip",
    ZIP_BASE / "benign"  / "graph_data_benign_reports.zip",
    ZIP_BASE / "rans2021"/ "graph_ransomware_reports.zip",
    ZIP_BASE / "rans2021"/ "ransomware1033.zip",
    ZIP_BASE / "rans2025"/ "graph_ransomware2025_reports.zip",
]

# Giải nén dưới MyDrive (gọn gàng, tránh đè file)
EXTRACT_ROOT = Path("/content/drive/MyDrive/KLTN/graphml_unzipped")

# Output vocab
OUT_DIR = Path("/content/drive/MyDrive/KLTN/graph_vocab_out")
OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_TOKEN2ID = OUT_DIR / "graph_vocab.json"
OUT_ID2TOKEN = OUT_DIR / "graph_vocab_id2token.json"

# =========================
# 2) Unzip (mỗi zip -> 1 thư mục riêng)
# =========================
EXTRACT_ROOT.mkdir(parents=True, exist_ok=True)

print("[INFO] Checking zip files...")
for zp in zip_paths:
    if not zp.is_file():
        raise FileNotFoundError(f"Không thấy file: {zp}")
print("[OK] All zip files exist.")

def unzip_to_folder(zip_path: Path, out_root: Path):
    dest = out_root / zip_path.stem  # mỗi zip -> một folder riêng
    marker = dest / ".unzipped_ok"

    if marker.exists():
        print(f"[SKIP] Already unzipped: {zip_path.name} -> {dest}")
        return dest

    dest.mkdir(parents=True, exist_ok=True)
    print(f"[UNZIP] {zip_path.name} -> {dest}")
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(dest)

    marker.write_text("ok", encoding="utf-8")
    return dest

extracted_folders = []
for zp in zip_paths:
    extracted_folders.append(unzip_to_folder(zp, EXTRACT_ROOT))

print("\n[INFO] Extracted folders:")
for d in extracted_folders:
    print(" -", d)

# =========================
# 3) Build vocab for graphml
# =========================
def canonical_token(nid: str, a: dict) -> str:
    """
    QUAN TRỌNG: Giữ hàm này y hệt trong gen_pt.py
    Ưu tiên 'name'; nếu thiếu thì dựng token theo node_type & thuộc tính chính.
    """
    nt = a.get('node_type', '')
    nm = a.get('name') or a.get('shared name')
    if nm:
        with open("debug_token.txt", "a", encoding="utf-8") as f:
            f.write(f"{nt}:{nm}\n")
    # if nm: return str(nm)
    if nt == 'feature':
        return f"feature:{a.get('feature_type','')}:{a.get('feature_value','')}"
    if nt == 'api':
        return f"api:{a.get('api','')}"
    if nt == 'process':
        return f"process:{a.get('process_name','')}:{a.get('path','')}:{a.get('cmdline','')}"
    if nt == 'dropped_file':
        return f"dropped:{a.get('filepath','')}"
    if nt == 'network':
        return f"network:{a.get('category','')}"
    if nt == 'signature':
        return f"signature:{a.get('signature_name','')}"
    return str(nid)

def build_vocab(graphml_root: Path):
    tokset = set()
    gmls = list(graphml_root.rglob("*.graphml"))
    assert len(gmls) > 0, f"No .graphml found under {graphml_root}"

    bad = 0
    for i, gml in enumerate(gmls, 1):
        if i % 200 == 0:
            print(f"  [SCAN] {i}/{len(gmls)}: {gml}")
        try:
            G = nx.read_graphml(gml, node_type=str)
            for nid in G.nodes():
                tok = canonical_token(nid, G.nodes[nid]).strip()
                tokset.add(tok)
        except Exception as e:
            bad += 1
            print(f"[WARN] Failed to read: {gml} | err={e}")

    tokens = sorted(tokset)  # ổn định tuyệt đối
    token2id = {t: i for i, t in enumerate(tokens)}
    id2token = {i: t for t, i in token2id.items()}
    return token2id, id2token, len(gmls), bad

print("\n[INFO] Building graph vocab from:", EXTRACT_ROOT)
token2id, id2token, n_gml, n_bad = build_vocab(EXTRACT_ROOT)

with open(OUT_TOKEN2ID, "w", encoding="utf-8") as f:
    json.dump({"size": len(token2id), "token2id": token2id}, f, ensure_ascii=False, indent=2)

with open(OUT_ID2TOKEN, "w", encoding="utf-8") as f:
    json.dump(id2token, f, ensure_ascii=False, indent=2)

print("\n[DONE]")
print(f"  Total graphml files found : {n_gml}")
print(f"  Failed graphml files      : {n_bad}")
print(f"  Vocab size (unique tokens): {len(token2id)}")
print(f"  Saved token2id -> {OUT_TOKEN2ID}")
print(f"  Saved id2token -> {OUT_ID2TOKEN}")


[INFO] Checking zip files...
[OK] All zip files exist.
[UNZIP] benign1033.zip -> /content/drive/MyDrive/KLTN/graphml_unzipped/benign1033
[UNZIP] graph_data_benign_reports.zip -> /content/drive/MyDrive/KLTN/graphml_unzipped/graph_data_benign_reports
[UNZIP] graph_ransomware_reports.zip -> /content/drive/MyDrive/KLTN/graphml_unzipped/graph_ransomware_reports
[UNZIP] ransomware1033.zip -> /content/drive/MyDrive/KLTN/graphml_unzipped/ransomware1033
[UNZIP] graph_ransomware2025_reports.zip -> /content/drive/MyDrive/KLTN/graphml_unzipped/graph_ransomware2025_reports

[INFO] Extracted folders:
 - /content/drive/MyDrive/KLTN/graphml_unzipped/benign1033
 - /content/drive/MyDrive/KLTN/graphml_unzipped/graph_data_benign_reports
 - /content/drive/MyDrive/KLTN/graphml_unzipped/graph_ransomware_reports
 - /content/drive/MyDrive/KLTN/graphml_unzipped/ransomware1033
 - /content/drive/MyDrive/KLTN/graphml_unzipped/graph_ransomware2025_reports

[INFO] Building graph vocab from: /content/drive/MyDrive/KL