# LLM Authorship Attribution (JavaScript) - 5-class Machine Learning

## STEP 0: Installing dependencies

In [None]:
# ============================ Imports ============================
import pandas as pd
import time
import numpy as np
import matplotlib.pyplot as plt
import json
import warnings
import joblib
import math

from operator import methodcaller
from pathlib import Path

from sklearn.model_selection import GroupShuffleSplit
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder, FunctionTransformer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.pipeline import Pipeline

from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC
from xgboost import XGBClassifier

warnings.filterwarnings("ignore")

# ============================ Step 1: Load Dataset ============================
print("[*] Downloading dataset...")
!wget -O LLM-NodeJS-medium.json.zip https://github.com/LLM-NodeJS-dataset/LLM-NodeJS-dataset/releases/download/LLM-NodeJS-medium/LLM-NodeJS-medium.json.zip > /dev/null 2>&1
!unzip -o LLM-NodeJS-medium.json.zip > /dev/null 2>&1
!rm LLM-NodeJS-medium.json.zip > /dev/null 2>&1

MAIN_DATASET_PATH = "/content/LLM-NodeJS-medium.json"
CROSS_CHECK_DATASET_PATH = "/content/CROSSCHECK-v2.json"

with open(MAIN_DATASET_PATH, "r", encoding="utf-8") as f:
    data = json.load(f)
rows = data if isinstance(data, list) else [data]
df = pd.json_normalize(rows, sep=".")
print("[*] Training, Test Dataset loaded:", df.shape)

with open(CROSS_CHECK_DATASET_PATH, "r", encoding="utf-8") as f:
    data = json.load(f)
rows = data if isinstance(data, list) else [data]
crosscheck_df = pd.json_normalize(rows, sep=".")
print("[*] Cross-Check Dataset loaded:", crosscheck_df.shape)

## STEP 2: Machine Learning Training

In [1]:
# -------------------------------- Configuration ---------------------------------

# 20-class
# TARGET_MODELS = [
#     "gpt-4o", "gpt-4o-mini", "gpt-5-mini", "gpt-5-nano", "gpt-oss-120b",  # OpenAI
#     "gemini-2.0-flash", "gemini-2.5-flash-lite", "gemma-3-27b", # Google
#     "llama-3.1-8b", "llama-3.3-70b", "llama-4-scout",  # Meta
#     "codestral-2508", "mixtral-8x7b",  # Mistral
#     "qwen-2.5-7b", "qwen-2.5-coder-32b", "qwen3-coder",  # Alibaba
#     "grok-3-mini", "grok-code-fast-1",  # xAI
#     "deepseek-v3.1",  # DeepSeek
#     "phi-4-reasoning-plus"  # Microsoft
# ]

# 10-class
TARGET_MODELS = [
    "gpt-4o","gpt-4o-mini","gpt-5-mini","gpt-5-nano","gpt-oss-120b",   # OpenAI
    "gemini-2.5-flash-lite",  # Google
    "llama-3.3-70b",  # Meta
    "mixtral-8x7b",  # Mistral
    "qwen-2.5-coder-32b",  # Alibaba
    "deepseek-v3.1"  # DeepSeek
]

# 5-class
# TARGET_MODELS = ["gpt-4o", "gpt-4o-mini" ,"gpt-5-mini", "gpt-5-nano", "gpt-oss-120b"]

USED_DATASET_TYPES = ["js_original", "terser_mangled", "js_deobfuscated"]
SAMPLE_SIZES = (12500,)


# -------------------------- Step 1: Model Definitions ---------------------------

def get_models():
    return {
        "KNN": KNeighborsClassifier(),
        "Random Forest": RandomForestClassifier(n_estimators=400, random_state=42),
        "SVM (Linear)": LinearSVC(max_iter=2000, random_state=42),
        "XGBoost": XGBClassifier(n_estimators=400, max_depth=9, use_label_encoder=False, eval_metric="mlogloss", random_state=42),
        "Logistic Regression": LogisticRegression(max_iter=2000,random_state=42)
    }


# ---------------------------- Step 2: Data Splitting ----------------------------

def get_dataframes(df, *, sample_size=None, test_size=0, val_size=0, random_state=42):
    groups = df["prompt"].unique()

    current_df = df

    if sample_size is not None and sample_size < len(df):
        avg_rows_per_group = len(df) / len(groups)
        n_groups_needed = int(sample_size / avg_rows_per_group)

        if n_groups_needed < len(groups):
            rng = np.random.RandomState(random_state)
            selected_groups = rng.choice(groups, size=n_groups_needed, replace=False)

            current_df = df[df["prompt"].isin(selected_groups)].copy()
        else:
            print(f"WARNING: The size of the Dataframe ({sample_size}) is smaller than the sample size ({len(df)})")

    if test_size > 0:
        splitter = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
        train_val_idx, test_idx = next(splitter.split(current_df, groups=current_df["prompt"]))

        train_val_df = current_df.iloc[train_val_idx]
        test_df = current_df.iloc[test_idx].copy()
    else:
        train_val_df = current_df
        test_df = pd.DataFrame(columns=df.columns)

    if val_size > 0:
        relative_val_size = val_size / (1 - test_size)
        if relative_val_size >= 1.0:
             raise ValueError("ERROR: The sum of test and val size reaches or exceeds 1.0!")

        splitter = GroupShuffleSplit(n_splits=1, test_size=relative_val_size, random_state=random_state)
        train_idx, val_idx = next(splitter.split(train_val_df, groups=train_val_df["prompt"]))

        train_df = train_val_df.iloc[train_idx].copy()
        val_df = train_val_df.iloc[val_idx].copy()
    else:
        train_df = train_val_df.copy()
        val_df = pd.DataFrame(columns=df.columns)

    return train_df, val_df, test_df


# -------------------------- Step 3: Feature Extraction --------------------------

def prepare_features(train_df, val_df, test_df, vectorizer):
    X_train = vectorizer.fit_transform(train_df["js_code"].fillna("")).toarray()
    X_val = vectorizer.transform(val_df["js_code"].fillna("")).toarray() if not val_df.empty else np.array([])
    X_test = vectorizer.transform(test_df["js_code"].fillna("")).toarray()

    y_train = train_df["label"].to_numpy()
    y_val = val_df["label"].to_numpy() if not val_df.empty else np.array([])
    y_test = test_df["label"].to_numpy()

    return X_train, y_train, X_val, y_val, X_test, y_test


# ------------------------ Step 4: Training & Evaluation -------------------------

def train_models(models, X_train, y_train):
    metrics = {}

    for name, model in models.items():
        try:
            start_time = time.time()
            model.fit(X_train, y_train)
            elapsed = time.time() - start_time

            metrics[name] = {
                "train_time_sec": elapsed
            }
        except Exception as e:
            print(f"ERROR: {name} failed: {e}")

    return metrics


def evaluate_models(models, X_test, y_test):
    metrics = {}

    for name, model in models.items():
        try:
            y_pred = model.predict(X_test)
            metrics[name] = {
                "accuracy": accuracy_score(y_test, y_pred),
                "precision": precision_score(y_test, y_pred, average="weighted", zero_division=0),
                "recall": recall_score(y_test, y_pred, average="weighted"),
                "f1_score": f1_score(y_test, y_pred, average="weighted")
            }
        except Exception as e:
            print(f"ERROR: {name} failed: {e}")
    
    return metrics


def save_models(vectorizer, encoder, models, sample_size, base_dir):
    root_path = Path(base_dir)
    size_path = root_path / f"size_{sample_size}"
    size_path.mkdir(parents=True, exist_ok=True)

    joblib.dump(encoder, root_path / "label_encoder.joblib")
    to_dense_transformer = FunctionTransformer(methodcaller("toarray"), accept_sparse=True)

    for name, model in models.items():
        pipeline = Pipeline([
            ("tfidf", vectorizer),
            ("to_dense", to_dense_transformer),
            ("classifier", model)
        ])
        joblib.dump(pipeline, size_path / f"pipeline_{name.replace(" ", "_")}.joblib")

    print(f"[*] Trained Models Saved")


# ---------------------------- Step 5: Visualization -----------------------------

def plot_sample_size_results(all_results, metrics=("accuracy", "precision", "recall", "f1_score")):
    sample_sizes = sorted(all_results.keys())
    models = list(next(iter(all_results.values())).keys())

    n_metrics = len(metrics)
    n_cols = 2
    n_rows = math.ceil(n_metrics / n_cols)

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(14, 5 * n_rows))
    
    if n_metrics > 1:
        axes = axes.flatten()
    else:
        axes = [axes]

    for ax, metric in zip(axes, metrics):
        for model in models:
            values = [all_results[size][model][metric] for size in sample_sizes]
            ax.plot(sample_sizes, values, marker="o", label=model)

        ax.set_xlabel("Sample Size")
        ax.set_ylabel(metric.title())
        ax.set_ylim(0, 1.05)
        ax.grid(True, linestyle="--", alpha=0.6)

    for i in range(n_metrics, len(axes)):
        fig.delaxes(axes[i])

    handles, labels = axes[0].get_legend_handles_labels()
    fig.legend(handles, labels, loc='center right', bbox_to_anchor=(0.98, 0.5), fontsize='medium')

    plt.tight_layout()
    plt.subplots_adjust(right=0.85)
    
    plt.show()


# -------------------------- Step 6: Training Pipeline ---------------------------

def run_training_pipeline(df, *, encoder=None, save_models_dir=None, sample_sizes=(12500,)):
    print("[*] Precomputing full feature matrix...")
    all_results = {}
    for size in sample_sizes:
        if size > len(df):
            print(f"Skipping size {size}, dataset too small ({len(df)})")
            continue

        train_df, val_df, test_df = get_dataframes(df, sample_size=size, val_size=0, test_size=0.2)

        vectorizer = TfidfVectorizer(max_features=400, token_pattern=r"(?u)\b\w+\b")
        X_train, y_train, _, _, X_test, y_test = prepare_features(train_df, val_df, test_df, vectorizer)

        print(f"\n[*] Evaluating Sample Size: {size}, Actual Size: {len(train_df) + len(val_df) + len(test_df)}")
        models = get_models()
        train_metrics = train_models(models, X_train, y_train)
        eval_metrics = evaluate_models(models, X_test, y_test)

        if save_models_dir and encoder:
            save_models(vectorizer, encoder, models, sample_size=size, base_dir=save_models_dir)

        metrics = {name: train_metrics.get(name, {}) | eval_metrics.get(name, {}) for name in models}
        all_results[size] = metrics

        print("\n" + f" Results for Sample Size {size} ".center(80, "-"))
        for model_name, scores in metrics.items():
            acc = scores["accuracy"]
            prec = scores["precision"]
            rec = scores["recall"]
            f1 = scores["f1_score"]
            time_sec = scores["train_time_sec"]
            print(f"{model_name:20s} | Acc: {acc:.4f} | Prec: {prec:.4f} | Recall: {rec:.4f} | F1: {f1:.4f} | Time: {time_sec:.4f}s")

    # Plot curves
    plot_sample_size_results(all_results)

    return all_results


# ---------------------------- Step 7: Main Execution ----------------------------

# Keep only selected models
filtered_df = df[df["model_name"].isin(TARGET_MODELS)].copy()

# Encode labels
encoder = LabelEncoder()
filtered_df["label"] = encoder.fit_transform(filtered_df["model_name"])

print("[*] Filtered Dataset Shape:", filtered_df.shape)
print("[*] Classes:", dict(zip(encoder.classes_, encoder.transform(encoder.classes_))), end="\n\n")

all_test_metrics = {}
for dataset_type in USED_DATASET_TYPES:
    filtered_df["js_code"] = filtered_df[f"{dataset_type}.js_code"]

    print(f" {dataset_type} Dataset Type ".center(80, "="))
    test_metrics = run_training_pipeline(filtered_df, encoder=encoder, save_models_dir=Path("trained_models") / dataset_type, sample_sizes=SAMPLE_SIZES)
    all_test_metrics[dataset_type] = test_metrics

NameError: name 'df' is not defined

## STEP 3: Model Loading & Final Testing

The `CROSSCHECK-v2.json` file was generated using the *OpenAI API* instead of the *OpenRouter.ai* service. The code snippets were generated with entirely different prompts. There are only 4 classes in cross-check dataset.

In [None]:
def load_models(models_path):
    models = {}
    models_path = Path(models_path)

    pipeline_files = list(models_path.glob("pipeline_*.joblib"))
    for file_path in pipeline_files:
        try:
            name = file_path.stem.replace("pipeline_", "").replace("_", " ")
            pipeline = joblib.load(file_path)
            models[name] = pipeline
        except Exception as e:
            print(f"ERROR: Failed to load {file_path.name}: {e}")

    return models


def cross_check(df, base_dir):
    root_dir = Path(base_dir)
    X_test = df["js_code"].fillna("").astype(str)

    encoder = joblib.load(root_dir / "label_encoder.joblib")
    y_test = encoder.transform(df["model_name"])

    size_paths = sorted(Path(base_dir).glob("size_*"), key=lambda p: int(p.name.split("_")[1]))
    
    if not size_paths:
        print(f"ERROR: No 'size_*' directories found in {base_dir}")
        return

    all_metrics = {}
    for size_path in size_paths:
        loaded_pipelines = load_models(size_path)
        
        if not loaded_pipelines:
            continue

        size = size_path.name.split("_")[-1]
        metrics = evaluate_models(loaded_pipelines, X_test, y_test)
        print("\n" + f" Results for sample size {size} ".center(80, "-"))
        for model_name, scores in metrics.items():
            acc = scores["accuracy"]
            prec = scores["precision"]
            rec = scores["recall"]
            f1 = scores["f1_score"]
            print(f"{model_name:20s} | Acc: {acc:.4f} | Prec: {prec:.4f} | Recall: {rec:.4f} | F1: {f1:.4f}")

        all_metrics[size] = metrics
    
    return all_metrics


if len(TARGET_MODELS) > 5:
    print("WARNING: There are only 4 classes in cross-check dataset!\n")

filtered_crosscheck_df = crosscheck_df[crosscheck_df["model_name"].isin(TARGET_MODELS)].copy()
base_models_folder = Path("trained_models")

# Evaluate models by dataset type
all_final_test_metrics = {}
for dataset_type in USED_DATASET_TYPES:
    print(f" {dataset_type} Dataset Type ".center(80, "="))
    filtered_crosscheck_df["js_code"] = filtered_crosscheck_df[f"{dataset_type}.js_code"]

    model_dir = base_models_folder / dataset_type
    if model_dir.exists():
        final_test_metrics = cross_check(filtered_crosscheck_df, base_dir=model_dir)
        all_final_test_metrics[dataset_type] = final_test_metrics
    else:
        print(f"Skipping {dataset_type}: directory not found.")

    print(end="\n\n")

[*] Dataset loaded: (50000, 27)

------------------------ Results for sample size 12500 -------------------------
Random Forest        | Acc: 0.8320 | Prec: 0.8360 | Recall: 0.8320 | F1: 0.8331
KNN                  | Acc: 0.5560 | Prec: 0.5706 | Recall: 0.5560 | F1: 0.5548
Logistic Regression  | Acc: 0.7840 | Prec: 0.7857 | Recall: 0.7840 | F1: 0.7831
SVM (Linear)         | Acc: 0.7960 | Prec: 0.7961 | Recall: 0.7960 | F1: 0.7950
XGBoost              | Acc: 0.8460 | Prec: 0.8490 | Recall: 0.8460 | F1: 0.8469



------------------------ Results for sample size 12500 -------------------------
Random Forest        | Acc: 0.7940 | Prec: 0.7979 | Recall: 0.7940 | F1: 0.7945
KNN                  | Acc: 0.5620 | Prec: 0.5624 | Recall: 0.5620 | F1: 0.5553
Logistic Regression  | Acc: 0.7580 | Prec: 0.7582 | Recall: 0.7580 | F1: 0.7573
SVM (Linear)         | Acc: 0.7700 | Prec: 0.7700 | Recall: 0.7700 | F1: 0.7685
XGBoost              | Acc: 0.8180 | Prec: 0.8204 | Recall: 0.8180 | F1: 0.8181




## STEP 4: Divergence Analysis - Compare Final Test Results with Test Results

There are only 4 classes in cross-check dataset

In [85]:
def analyze_performance_divergence(internal_results, external_results_dict, dataset_types):
    print("===================================")
    print("| PERFORMANCE DIVERGENCE ANALYSIS |")
    print("===================================")

    all_accuracy_diffs = []
    all_f1_diffs = []

    for dtype in dataset_types:
        if dtype not in internal_results or dtype not in external_results_dict:
            continue

        print(f" >> Dataset Type: {dtype.upper()}")

        int_data_by_size = internal_results[dtype]
        ext_data_by_size = external_results_dict[dtype]

        sorted_sizes = sorted(int_data_by_size.keys())
        
        for size in sorted_sizes:
            if str(size) not in ext_data_by_size and size not in ext_data_by_size:
                print(f"   [!] Warning: No cross-check data for size {size}.")
                continue
            
            ext_models = ext_data_by_size.get(str(size)) or ext_data_by_size.get(size)
            int_models = int_data_by_size[size]
            
            print(f"\n   [Sample Size: {size}]")
            print(f"   {'-'*90}")
            print(f"   {'Model':<25} | {'Acc (Int)':<10} | {'Acc (Ext)':<10} | {'Diff (Acc)':<12} | {'Diff (F1)':<12}")
            print(f"   {'-'*90}")

            for model_name in int_models:
                if model_name in ext_models:
                    acc_int = int_models[model_name]['accuracy']
                    acc_ext = ext_models[model_name]['accuracy']
                    f1_int = int_models[model_name]['f1_score']
                    f1_ext = ext_models[model_name]['f1_score']

                    diff_acc = acc_ext - acc_int
                    diff_f1 = f1_ext - f1_int

                    all_accuracy_diffs.append(diff_acc)
                    all_f1_diffs.append(diff_f1)

                    sign = "+" if diff_acc >= 0 else ""
                    
                    print(f"   {model_name:<25} | {acc_int:.4f}     | {acc_ext:.4f}     | {sign}{diff_acc:.4f}       | {diff_f1:+.4f}")
            print("\n")

    if all_accuracy_diffs:
        avg_acc_diff = sum(all_accuracy_diffs) / len(all_accuracy_diffs)
        avg_f1_diff = sum(all_f1_diffs) / len(all_f1_diffs)

        print(f"{'='*100}")
        print(f" AGGREGATED RESULTS (Average divergence between sets)")
        print(f"{'='*100}")
        print(f" Average Accuracy Divergence: {avg_acc_diff:+.4f}  (Negative = Worse performance on Cross-Check)")
        print(f" Average F1-Score Divergence: {avg_f1_diff:+.4f}")

        print(f"{'='*100}")


analyze_performance_divergence(all_test_metrics, all_final_test_metrics, USED_DATASET_TYPES)


| PERFORMANCE DIVERGENCE ANALYSIS |
 >> Dataset Type: JS_ORIGINAL

   [Sample Size: 12500]
   ------------------------------------------------------------------------------------------
   Model                     | Acc (Int)  | Acc (Ext)  | Diff (Acc)   | Diff (F1)   
   ------------------------------------------------------------------------------------------
   KNN                       | 0.4976     | 0.5560     | +0.0584       | +0.0631
   Random Forest             | 0.8255     | 0.8320     | +0.0065       | +0.0068
   SVM (Linear)              | 0.7853     | 0.7960     | +0.0107       | +0.0096
   XGBoost                   | 0.8502     | 0.8460     | -0.0042       | -0.0041
   Logistic Regression       | 0.7765     | 0.7840     | +0.0075       | +0.0064


 >> Dataset Type: TERSER_MANGLED

   [Sample Size: 12500]
   ------------------------------------------------------------------------------------------
   Model                     | Acc (Int)  | Acc (Ext)  | Diff (Acc)   | Diff 