In [None]:
from collections import Counter
from pathlib import Path

import numpy as np
import pandas as pd
from tqdm import tqdm

pd.set_option("display.max_colwidth", 99)
tqdm.pandas()

In [None]:
ROOT = Path("../../data-new/")
assert ROOT.exists()

In [None]:
df = pd.read_csv("corpus_metadata.csv")
test = df.query('subset == "test"')
test_hashes = set(test["sha256"])
test_dirty_hashes = set(df.query('subset == "test" & label == "dirty"')["sha256"])

# CSVs + sources

## GPT4o + CONSTRAINED

In [None]:
CSV = ROOT / "adversarial/metadata/gpt4o-adversarial-generation-results_with_mapping_label_filtered.csv"
print(CSV.resolve().as_posix())
SAMPLES = ROOT / "adversarial/data/gpt4o-adversarial-generation-results/all_modified/"
print(SAMPLES.resolve().as_posix())
assert CSV.exists()
assert SAMPLES.exists() and SAMPLES.is_dir()

xs = pd.read_csv(CSV)
# no .py suffix

assert set(xs["filename"]) <= set(df.query('subset == "test"')["sha256"])

assert len(xs) == 772
# assert (n := len(list(SAMPLES.iterdir()))) == 774, n
for f in xs["filename"]:
    assert (SAMPLES / f).exists(), f

## GPT4o + CONSTRAINED + RAG

In [None]:
CSV = ROOT / "adversarial/metadata/gpt4o_rag_with_mapping_label_filtered.csv"
print(CSV.resolve().as_posix())
SAMPLES = ROOT / "adversarial/data/gpt4o-rag-generation/gpt4o-adversarial-generation-results/new_prompt-modified/"
print(SAMPLES.resolve().as_posix())
assert CSV.exists()
assert SAMPLES.exists() and SAMPLES.is_dir()

xs = pd.read_csv(CSV)
# no .py suffix

assert set(xs["filename"]) <= set(df["sha256"])

assert len(xs) == 666
assert len(list(SAMPLES.iterdir())) == 799
for f in xs["filename"]:
    assert (SAMPLES / f).exists(), f

## GPT4o + UNCONSTRAINED

In [None]:
CSV = ROOT / "adversarial/metadata/gpt4o-scripts-full-unconstrained-parsable_with_mapping_label_filtered.csv"
print(CSV.resolve().as_posix())
SAMPLES = ROOT / "adversarial/data/gpt4o-unconstrained-parseable-scripts/gpt4o_scripts_full_unconstrained_ast_parsable/"
print(SAMPLES.resolve().as_posix())
assert CSV.exists()
assert SAMPLES.exists() and SAMPLES.is_dir()

xs = pd.read_csv(CSV)
# no .py suffix

# assert set(xs["filename"]) <= set(df.query('subset == "test"')["sha256"])
assert set(xs["sha256"]) <= set(df.query('subset == "test"')["sha256"])

assert len(xs) == 1650
assert len(list(SAMPLES.iterdir())) == 1671
for f in xs["sha256"]:
    assert (SAMPLES / f).exists(), f

## Llama + CONSTRAINED

In [None]:
CSV = ROOT / "adversarial/metadata/llama_full_parsable_scripts_with_mapping_label_filtered.csv"
print(CSV.resolve().as_posix())
SAMPLES = ROOT / "adversarial/data/llama_parsable_scripts_full/llama-full/parsable_scripts/"
print(SAMPLES.resolve().as_posix())
assert CSV.exists()
assert SAMPLES.exists() and SAMPLES.is_dir()

xs = pd.read_csv(CSV)
# no .py suffix

# assert set(xs["filename"]) <= set(df.query('subset == "test"')["sha256"])
assert set(xs["sha256"]) <= set(df.query('subset == "test"')["sha256"])

assert len(xs) == 909
assert len(list(SAMPLES.iterdir())) == 1147
for f in xs["sha256"]:
    assert (SAMPLES / f).exists(), f

## Llama + UNCONSTRAINED

In [None]:
CSV = ROOT / "adversarial/metadata/llama-scripts-full-unconstrained-parsable_with_mapping_label_filtered.csv"
print(CSV.resolve().as_posix())
SAMPLES = ROOT / "adversarial/data/llama-unconstrained-parseable-scripts/"
print(SAMPLES.resolve().as_posix())
assert CSV.exists()
assert SAMPLES.exists() and SAMPLES.is_dir()

xs = pd.read_csv(CSV)
# no .py suffix

# assert set(xs["filename"]) <= set(df.query('subset == "test"')["sha256"])
assert set(xs["sha256"]) <= set(df.query('subset == "test"')["sha256"])

assert len(xs) == 1416
assert len(list(SAMPLES.iterdir())) == 1595
for f in xs["sha256"]:
    assert (SAMPLES / f).exists(), f

## Heuristics

In [None]:
for k in ["add_comments", "add_documentation", "add_padding"]:
    s = Path("../../data-new/adversarial/data/heuristically-generated-new") / k
    assert s.exists()
    files = list(s.iterdir())
    names = [f.name.split("_")[0] for f in files]
    print(k, len(names))
    pd.DataFrame({"sha256": [f.name for f in files]}).to_csv(
        f"../../data-new/adversarial/metadata/heu_{k}.csv", index=False
    )

# Eval

In [None]:
def acc(pred, thr):
    acc = 0
    for f, dv in pred.items():
        acc += dv >= thr
    return (acc / len(pred)).item()


def label_change(pv, pa, thr):
    cnt = Counter()

    for f, dv_van in pv.items():
        dv_adv = pa[f]

        rv = ["und", "det"][int(dv_van >= thr)]
        ra = ["und", "det"][int(dv_adv >= thr)]

        cnt[(rv, ra)] += 1

    return dict(cnt)


def dv_diff(pv, pa):
    pos = []
    neg = []
    for f, dv_van in pv.items():
        dv_adv = pa[f]
        diff = dv_van - dv_adv
        if diff >= 0:
            pos.append(diff)
        else:
            neg.append(diff)
    res = {"pos_diff": len(pos), "neg_diff": len(neg)}
    for func in ["mean", "median", "std"]:
        res |= {
            f"{func}_pos_diff": getattr(np, func)(pos).item() if pos else None,
            f"{func}_neg_diff": getattr(np, func)(neg).item() if neg else None,
        }
    return res

In [None]:
def normalise_heu_names(xs):
    return [x.split("_")[0] for x in xs]

In [None]:
# NAME = "pred_gpt4o_constrained"
# NAME = "pred_gpt4o_constrained_rag"
# NAME = "pred_gpt4o_unconstrained"
# NAME = "pred_llama_constrained"
# NAME = "pred_llama_unconstrained"
# NAME = "pred_heu_add_comments"
# NAME = "pred_heu_add_documentation"
NAME = "pred_heu_add_padding"

pred_adv = pd.read_parquet(f"{NAME}.parquet").set_index("run_id")

if "heu" in NAME:
    pred_adv["files"] = pred_adv["files"].apply(normalise_heu_names)

thrs = pd.read_csv("vanilla_f1_thr.csv").set_index("run_id")["thr_f1"].to_dict()
exps2fxs = pd.read_csv("exp_ids_to_fx_funcs.csv").set_index("run_id")["fx_funcs"].to_dict()
exps = list(thrs.keys())
pred_van = pd.read_parquet("xgb_vanilla_test.parquet").set_index("run_id")

assert 255 == len(pred_adv) == len(pred_van) == len(exps) == len(exps2fxs)

results = []
for exp in tqdm(exps):
    v = pred_van.loc[exp]
    a = pred_adv.loc[exp]
    if "labels" in a:
        assert (a["labels"] == 1).all()
    assert set(a["files"]) <= set(v["files"])

    idx = [i for i, x in enumerate(v["files"]) if x in a["files"]]
    assert len(idx) == len(a["files"])
    assert sorted([v["files"][i] for i in idx]) == sorted(a["files"])

    pv = {v["files"][i]: v["preds"][i] for i in idx}
    pa = dict(zip(a["files"], a["preds"]))
    assert len(pv) == len(pa)
    assert pv.keys() == pa.keys()

    res = {
        "exp": exp,
        "fx_funcs": exps2fxs[exp],
    }
    res |= {
        "acc_van_05": acc(pv, 0.5),
        "acc_van_custom": acc(pv, thrs[exp]),
        "acc_adv_05": acc(pa, 0.5),
        "acc_adv_custom": acc(pa, thrs[exp]),
    }
    res |= {f"{a}->{b}_05": k for (a, b), k in label_change(pv, pa, 0.5).items()}
    res |= {f"{a}->{b}_custom": k for (a, b), k in label_change(pv, pa, thrs[exp]).items()}
    res |= dv_diff(pv, pa)

    results.append(res)
    # P(res)

# Results

In [None]:
res_df = pd.DataFrame(results)

In [None]:
print(NAME)
for l in ["adv", "van"]:
    print("\n", l)
    for t in ["05", "custom"]:
        for func in ["mean", "median", "std"]:
            x = getattr(np, func)([r[f"acc_{l}_{t}"] for r in res])
            print(f"{func} acc @ thr={t}: {x:.4f}")

In [None]:
print(NAME)
for t in ["05", "custom"]:
    key = f"label_change_{t}"

    xs = np.mean([(r.get(f"det->und_{t}", 0) / (r.get(f"det->det_{t}", 0) + r.get(f"det->und_{t}", 0))) for r in res])
    print(f"mean (det->und)|det @ {t}: {xs:.6f}")