# Dataset Preprocessing

In [19]:
AIG_rewrites_path = "./Methods/UncoveringLLM/rewrites/AIGCodeSet"
CodeMirage_rewrites_path = "./Methods/UncoveringLLM/rewrites/CodeMirage"
SunEtAl_rewrites_path = "./Methods/UncoveringLLM/rewrites/SunEtAl"
AIG_Dataset_path = "./Dataset/AIGCodeSet/"
CodeMirage_path = "./Dataset/CodeMirage/"
SunEtAl_path = "./Dataset/SunEtAl/SunEtAlNew"

In [57]:
from datasets import load_from_disk

AIG = load_from_disk(AIG_Dataset_path)
CodeMirage = load_from_disk(CodeMirage_path)
SunEtAl = load_from_disk(SunEtAl_path)\

AIG_R= AIG["rightcode"]
AIG_W= AIG["wrongcode"]

In [58]:
from datasets import load_dataset



def extract_assistant_content(example):
    for m in example["messages"]:
        if m.get("role") == "assistant":
            return {"assistant_content": m.get("content")}
    return {"assistant_content": None}



def adding_rewriting(ds, rewrites_path, idx="idx", i=0):
    """
    Returns: {datasets.Dataset}
        The original dataset with additional columns `rewritedCode_0`, `rewritedCode_1`, etc.,
        one for each rewrite file provided.
    """


    # Sanity checks for required columns
    if idx not in ds.column_names:
        print(f"{idx} is not inside ds")


    # Mapping function to add a single rewrite column to each row
    def add_column(row, ds_map, i, idx):
        rewrited = ds_map.get(row[idx])  # use .get() to avoid KeyError
        return {f"rewritedCode_{i}": rewrited}

    # Process each rewrite file
    # Load rewrite JSONL as Dataset
    ds_jsonl = load_dataset("json", data_files=rewrites_path)["train"]

    # Flatten nested structures (e.g., metadata.index → top-level)
    ds_jsonl = ds_jsonl.flatten()

    ds_jsonl = ds_jsonl.map(extract_assistant_content)



     # Select only the needed columns
    ds_jsonl = ds_jsonl.select_columns([f"metadata.{idx}", "assistant_content"])

    # Rename columns to match our convention
    ds_jsonl = ds_jsonl.rename_column(f"metadata.{idx}", idx)
    ds_jsonl = ds_jsonl.rename_column("assistant_content", "content")

    # Convert rewrite dataset to dict: index → content
    ds_map = {row[idx]: row["content"] for row in ds_jsonl}

    # Add the rewrite column to the main dataset
    ds = ds.map(add_column, fn_kwargs={"ds_map": ds_map, "i": i, "idx": idx})

    return ds


In [59]:
for i in range(4):
    AIG_R=adding_rewriting(AIG_R, "./Methods/UncoveringLLM/rewrites/" + str(i) + "AIGCodeSet" + ".jsonl", i=i)
print(AIG_R)

Dataset({
    features: ['problem_id', 'submission_id', 'status_in_folder', 'LLM', 'code', 'ada_embedding', 'label', 'lines', 'code_lines', 'comments', 'functions', 'blank_lines', 'cleared_code', 'idx', 'rewritedCode_0', 'rewritedCode_1', 'rewritedCode_2', 'rewritedCode_3'],
    num_rows: 249
})


In [60]:
for i in range(4):
    AIG_W=adding_rewriting(AIG_W, "./Methods/UncoveringLLM/rewrites/" + str(i) + "AIGCodeSet" + ".jsonl", i=i)
print(AIG_W)

Map:   0%|          | 0/664 [00:00<?, ? examples/s]

Map:   0%|          | 0/664 [00:00<?, ? examples/s]

Map:   0%|          | 0/664 [00:00<?, ? examples/s]

Map:   0%|          | 0/664 [00:00<?, ? examples/s]

Dataset({
    features: ['problem_id', 'submission_id', 'status_in_folder', 'LLM', 'code', 'ada_embedding', 'label', 'lines', 'code_lines', 'comments', 'functions', 'blank_lines', 'cleared_code', 'idx', 'rewritedCode_0', 'rewritedCode_1', 'rewritedCode_2', 'rewritedCode_3'],
    num_rows: 664
})


In [None]:
#@TODO
for i in range(4):
    CodeMirage=adding_rewriting(CodeMirage, "./Methods/UncoveringLLM/rewrites/" + str(i) + "CodeMirage" + ".jsonl", i=i)
print(CodeMirage)

In [None]:
#@TODO
for i in range(4):
    SunEtAl=adding_rewriting(CodeMirage, CodeMirage_path + i + ".jsonl")

In [None]:
#@TODO
for i in range(4):
    SunEtAl=adding_rewriting(SunEtAl, SunEtAl_path + i + ".jsonl")

# Testing

Model preparation

In [63]:
import os
from pathlib import Path
import torch, torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel, AutoConfig


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


GC_Tokenizer = "microsoft/graphcodebert-base"  # same training backbone (GraphCodeBert) tokenizer
GC_FineTuned =  Path("./Methods/UncoveringLLM/model/") # trained model



tokenizer = AutoTokenizer.from_pretrained(GC_Tokenizer)
encoder   = AutoModel.from_pretrained(GC_FineTuned).to(DEVICE).eval() 


Some weights of RobertaModel were not initialized from the model checkpoint at Methods\UncoveringLLM\model and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:

#TODO: ultimi blocchi semantici per rientrare in lunghezza 

def encode_codes(original_code: str, rewrites: list[str], max_len: int = 512):
    texts = [original_code] + list(rewrites)          # List[str]
    enc = tokenizer(
        texts,
        padding=True,
        truncation=True,
        max_length=max_len,
        return_tensors="pt",
    )

    if len(enc) > max_len:
        enc = enc[-max_len:] ## !! We use only the last tokens !! ##
        print("!! We use only the last tokens !!")

    enc = {k: v.to(DEVICE) for k, v in enc.items()}

    with torch.no_grad():
        out = encoder(**enc).last_hidden_state        # [B, L, H]
        cls = out[:, 0, :]                            # [B, H]  (embedding del primo token)
        cls = F.normalize(cls, p=2, dim=-1)           # SimCSE: L2 normalization

    orig_emb = cls[0]          # [H]
    rewrite_embs = cls[1:]     # [R, H]  (R = len(rewrites))
    return orig_emb, rewrite_embs



# best number of rewrites is 4
def detect(original_code: str, rewrites: list[str]):
    if len(rewrites) != 4:
        print(f"Number of rewrites is not 4 but: {len(rewrites)}")

    e0, eR = encode_codes(original_code, rewrites)  # e0: [H], eR: [R, H]
    sims = F.cosine_similarity(eR, e0.unsqueeze(0).expand_as(eR), dim=-1).numpy()
    score = float(sims.mean()) if len(sims) else float("nan")
    return score

# Start testing

In [70]:
from datasets import load_from_disk
ds2 = load_from_disk("./Dataset/AIGCodeSet")


def evaluation(row): 
    score = detect(
        row["code"], 
        [row["rewritedCode_0"], row["rewritedCode_1"], row["rewritedCode_0"], row["rewritedCode_3"]]
        )
    return {"score":score}
    

AIG_RR = AIG_R.map(evaluation)

print(AIG_RR)

Map:   0%|          | 0/249 [00:00<?, ? examples/s]

AttributeError: 'tuple' object has no attribute 'expand_as'