In [14]:
import os, warnings
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["WANDB_DISABLED"] = "true"
warnings.filterwarnings("ignore")

print("Environment ready.")




Environment ready.


In [15]:
import os
print(os.listdir("/kaggle/input/binary-code-detection-a"))


['label_to_id (1).json', 'id_to_label (1).json', 'task_a_trial (1).parquet']


In [16]:
import pandas as pd

PATH = "/kaggle/input/binary-code-detection-a/task_a_trial (1).parquet"
df = pd.read_parquet(PATH)

print(df.columns)
print(len(df))
print(df.head(2))


Index(['code', 'generator', 'label', 'language'], dtype='object')
10000
                                                      code  \
991293   #include <iostream>\n#include <string>\n#inclu...   
1044782  #include <bits/stdc++.h>\n\n\n\n// #include <e...   

                                  generator  label language  
991293   microsoft/Phi-3-medium-4k-instruct      0      C++  
1044782                               Human      1      C++  


In [17]:
import os, warnings, torch
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["WANDB_DISABLED"] = "true"   # avoid wandb login
warnings.filterwarnings("ignore")

print("PyTorch:", torch.__version__, "| CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Device name:", torch.cuda.get_device_name(0))


PyTorch: 2.6.0+cu124 | CUDA available: True
Device name: Tesla T4


In [18]:
import os, re, unicodedata
from pathlib import Path
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report, confusion_matrix

import torch
from torch.utils.data import Dataset

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)
import transformers
print("Transformers:", transformers.__version__)


Transformers: 4.53.3


In [19]:
DATA_DIR = Path("/kaggle/input/binary-code-detection-a")
PARQUET = DATA_DIR / "task_a_trial (1).parquet"  # <- your exact file

df = pd.read_parquet(PARQUET)
print("Columns:", list(df.columns), "Shape:", df.shape)
display(df.head(2))

# Ensure required columns exist
assert "code" in df.columns, "Expected a 'code' column."
assert "label" in df.columns, "Expected a 'label' (0/1 or 'human'/'machine') column."

# Make numeric labels: 0=human, 1=AI
if df["label"].dtype == object:
    mapping = {"human":0, "machine":1, "ai":1, "Human":0, "Machine":1, "AI":1}
    df["y"] = df["label"].map(mapping).astype(int)
else:
    df["y"] = df["label"].astype(int)

# Drop NAs just in case
df = df.dropna(subset=["code","y"]).reset_index(drop=True)
print("After cleaning:", df.shape)


Columns: ['code', 'generator', 'label', 'language'] Shape: (10000, 4)


Unnamed: 0,code,generator,label,language
991293,#include <iostream>\n#include <string>\n#inclu...,microsoft/Phi-3-medium-4k-instruct,0,C++
1044782,#include <bits/stdc++.h>\n\n\n\n// #include <e...,Human,1,C++


After cleaning: (10000, 5)


In [20]:
def normalize_code(s: str) -> str:
    s = unicodedata.normalize("NFKC", str(s))
    s = s.replace("\r\n","\n").replace("\r","\n")
    s = re.sub(r"[ \t]+", " ", s)
    s = re.sub(r"\n{3,}", "\n\n", s).strip()
    return s

df["code_norm"] = df["code"].apply(normalize_code)

train_df, valid_df = train_test_split(
    df, test_size=0.2, random_state=42, stratify=df["y"]
)
print("Train:", train_df.shape, "Valid:", valid_df.shape)


Train: (8000, 6) Valid: (2000, 6)


In [21]:
MODEL_NAME = "microsoft/codebert-base"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

class CodeClsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.texts = list(texts)
        self.labels = list(labels)
        self.tokenizer = tokenizer
        self.max_len = max_len
    def __len__(self): return len(self.texts)
    def __getitem__(self, idx):
        enc = self.tokenizer(
            self.texts[idx],
            padding="max_length",
            truncation=True,
            max_length=self.max_len,
            return_tensors="pt",
        )
        item = {k: v.squeeze(0) for k, v in enc.items()}
        item["labels"] = torch.tensor(int(self.labels[idx]), dtype=torch.long)
        return item

dtrain = CodeClsDataset(train_df["code_norm"], train_df["y"], tokenizer)
dvalid = CodeClsDataset(valid_df["code_norm"], valid_df["y"], tokenizer)
len(dtrain), len(dvalid)


(8000, 2000)

In [22]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2).to(device)
model.config.problem_type = "single_label_classification"

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {"macro_f1": f1_score(labels, preds, average="macro")}

args_kw = dict(
    output_dir="codebert_out",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    learning_rate=2e-5,
    num_train_epochs=3,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="macro_f1",
    greater_is_better=True,
    fp16=True,
    report_to="none",  # no wandb
)
try:
    training_args = TrainingArguments(evaluation_strategy="epoch", **args_kw)
except TypeError:
    training_args = TrainingArguments(eval_strategy="epoch", **args_kw)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dtrain,
    eval_dataset=dvalid,
    compute_metrics=compute_metrics,
)


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [23]:
trainer.train()


Epoch,Training Loss,Validation Loss,Macro F1
1,0.3788,0.282935,0.893483
2,0.2378,0.266978,0.902913
3,0.1723,0.275037,0.916496


TrainOutput(global_step=1500, training_loss=0.2629658660888672, metrics={'train_runtime': 1541.0793, 'train_samples_per_second': 15.574, 'train_steps_per_second': 0.973, 'total_flos': 6314665328640000.0, 'train_loss': 0.2629658660888672, 'epoch': 3.0})

In [24]:
from sklearn.metrics import f1_score, classification_report, confusion_matrix

eval_res = trainer.evaluate()
print("FINAL VALID MACRO F1:", eval_res.get("eval_macro_f1"))

def predict_batched(ds, batch_size=64):
    preds = []
    for i in range(0, len(ds), batch_size):
        batch = [ds[j] for j in range(i, min(i+batch_size, len(ds)))]
        input_ids = torch.stack([b["input_ids"] for b in batch]).to(device)
        attn = torch.stack([b["attention_mask"] for b in batch]).to(device)
        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=attn).logits
        preds.extend(torch.argmax(logits, dim=-1).cpu().tolist())
    return np.array(preds)

y_true = valid_df["y"].to_numpy()
y_pred = predict_batched(dvalid)

print("\nConfusion matrix:\n", confusion_matrix(y_true, y_pred))
print("\nReport:\n", classification_report(y_true, y_pred, target_names=["human(0)","AI(1)"]))


FINAL VALID MACRO F1: 0.916496471975941

Confusion matrix:
 [[910  86]
 [ 81 923]]

Report:
               precision    recall  f1-score   support

    human(0)       0.92      0.91      0.92       996
       AI(1)       0.91      0.92      0.92      1004

    accuracy                           0.92      2000
   macro avg       0.92      0.92      0.92      2000
weighted avg       0.92      0.92      0.92      2000



In [25]:
def detect_language(snippet: str) -> str:
    s = snippet.lower()

    # C++
    if "#include" in s or "std::" in s or "cout<<" in s or "cout <<" in s or "cin>>" in s or "cin >>" in s:
        return "C++"
    # C (printf/scanf and NO std::)
    if "#include" in s and ("printf(" in s or "scanf(" in s) and "std::" not in s:
        return "C"
    # C#
    if "using system;" in s or "console.writeline" in s:
        return "C#"
    # Python
    if "def " in s or "print(" in s or "import " in s or "except:" in s or "try:" in s:
        return "Python"
    # Java
    if "public static void main" in s or "system.out.println" in s:
        return "Java"
    # JavaScript
    if "console.log(" in s or "function(" in s or "=>" in s or "var " in s or "let " in s or "const " in s:
        return "JavaScript"
    # PHP
    if "<?php" in s or ("echo " in s and "$" in s):
        return "PHP"
    # Go
    if "package main" in s or "fmt." in s or "func main()" in s:
        return "Go"
    # Ruby
    if re.search(r"\bdef\b.*\bend\b", s) or s.strip().startswith("puts "):
        return "Ruby"

    return "Unknown"


In [26]:
def predict_texts(texts, batch_size=128):
    preds = []
    for i in range(0, len(texts), batch_size):
        batch = list(texts[i:i+batch_size])
        enc = tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors="pt")
        enc = {k: v.to(device) for k, v in enc.items()}
        with torch.no_grad():
            logits = model(**enc).logits
        preds.extend(torch.argmax(logits, dim=-1).cpu().tolist())
    return np.array(preds, dtype=int)

# 1) predictions for ALL rows
pred_all = predict_texts(df["code_norm"])

# 2) generator: keep existing if present; else derive from prediction
if "generator" in df.columns:
    gen_col = df["generator"].astype(str)
else:
    gen_col = pd.Series(np.where(pred_all==0, "human", "AI"), index=df.index)

# 3) language: use existing if present; else detect
if "language" in df.columns:
    lang_col = df["language"].astype(str)
else:
    lang_col = df["code"].apply(detect_language)

# a) EXACT 4 columns you requested (label = prediction)
final_predictions = pd.DataFrame({
    "code": df["code"],
    "label": pred_all,             # predicted label (0=human,1=AI)
    "generator": gen_col,
    "language": lang_col
})
final_predictions.to_csv("/kaggle/working/final_predictions.csv", index=False)
print("Saved /kaggle/working/final_predictions.csv")

# b) ALSO a file with true label + prediction (for analysis)
with_true_and_pred = pd.DataFrame({
    "code": df["code"],
    "label_true": df["y"].astype(int),
    "label_pred": pred_all,
    "generator": gen_col,
    "language": lang_col
})
with_true_and_pred.to_csv("/kaggle/working/with_true_and_pred.csv", index=False)
print("Saved /kaggle/working/with_true_and_pred.csv")

display(final_predictions.head(3))


Saved /kaggle/working/final_predictions.csv
Saved /kaggle/working/with_true_and_pred.csv


Unnamed: 0,code,label,generator,language
0,#include <iostream>\n#include <string>\n#inclu...,0,microsoft/Phi-3-medium-4k-instruct,C++
1,#include <bits/stdc++.h>\n\n\n\n// #include <e...,1,Human,C++
2,class node:\n value = 0\n index = -1\n ...,1,ibm-granite/granite-8b-code-base-4k,Python


In [27]:
def predict_single(code: str):
    enc = tokenizer(code, padding="max_length", truncation=True, max_length=512, return_tensors="pt")
    enc = {k: v.to(device) for k, v in enc.items()}
    with torch.no_grad():
        pred = int(model(**enc).logits.argmax(-1).item())
    gen = "human" if pred==0 else "AI"
    lang = code if isinstance(code, str) else ""
    lang = detect_language(code) if "language" not in df.columns else None  # if your df has language, you may ignore this
    return {"code": code, "label": pred, "generator": gen, "language": detect_language(code)}

# example
ex = """
for(int i=0;i<n;i++){
    cout << a[i] << endl;
}

"""
predict_single(ex)


{'code': '\nfor(int i=0;i<n;i++){\n    cout << a[i] << endl;\n}\n\n',
 'label': 0,
 'generator': 'human',
 'language': 'C++'}

In [31]:
ex1 = """

a = 10
b = 20

sum = a + b

print("Sum =", sum)


"""
predict_single(ex1)


{'code': '\n# simple python code example\n\na = 10\nb = 20\n\nsum = a + b\n\nprint("Sum =", sum)\n\n\n',
 'label': 0,
 'generator': 'human',
 'language': 'Python'}