In [1]:
import auxiliary as aux
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer, AutoTokenizer, DataCollatorForTokenClassification
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
from kingbert import KingBert

  from .autonotebook import tqdm as notebook_tqdm
2025-05-13 16:02:30.443444: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1747144950.480605 2095921 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1747144950.494972 2095921 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1747144950.525131 2095921 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1747144950.525189 2095921 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1747144950.525195 2095921

In [2]:
# ------------------------------------------------------------------
# Helper: fitness for ONE mutated sentence  (uses aux.compute_metrics)
# ------------------------------------------------------------------
from copy import deepcopy
import random

from tqdm import tqdm


def fitness_score(model, datum, mutated_ids):
    _, preds, _, _ = aux.inference(
        model,
        torch.tensor([mutated_ids]),
        torch.tensor([datum["attention_mask"]])
    )
    metrics = aux.compute_metrics([preds.tolist()[0]], [datum["labels"]])
    return 1.0 - metrics["recall"]   # maximise mismatch ⟹ minimise recall


# ------------------------------------------------------------------
# Evolutionary attack on ONE sentence
# ------------------------------------------------------------------
def evolve_sentence(
    model,
    datum,
    valid_token_ids,
    pop_size=30,
    n_generations=20,
    mutation_rate=0.15,
    elite_frac=0.2,
    target_recall=0.2,
    seed=42,
):
    random.seed(seed)
    original = datum["input_ids"]
    seq_len  = len(original)

    def random_mutation(base):
        child = base.copy()
        for i, lab in enumerate(datum["labels"]):
            if lab == 18 and random.random() < mutation_rate:
                child[i] = random.choice(valid_token_ids)
        return child

    population = [random_mutation(original) for _ in range(pop_size)]

    for _ in range(n_generations):
        fitness_vals = [fitness_score(model, datum, ind) for ind in population]
        ranked = sorted(zip(population, fitness_vals), key=lambda x: x[1], reverse=True)
        best_ind, best_fit = ranked[0]
        best_rec = 1.0 - best_fit
        if best_rec <= target_recall:
            break

        n_elite = max(1, int(elite_frac * pop_size))
        elites  = [deepcopy(ind) for ind, _ in ranked[:n_elite]]

        offspring = []
        while len(offspring) < pop_size - n_elite:
            parent = random.choice(elites)
            child  = random_mutation(parent)
            if random.random() < 0.3:          # crossover
                other  = random.choice(elites)
                pt     = random.randint(1, seq_len - 2)
                child  = child[:pt] + other[pt:]
            offspring.append(child)

        population = elites + offspring

    return best_ind, best_rec

# ------------------------------------------------------------------
# Attack the whole dataset and report new metrics
# ------------------------------------------------------------------
def adversarial_dataset(
    model,
    tokenizer,
    dataset,
    pop_size=30,
    n_generations=20,
    mutation_rate=0.15,
):
    # Cache normal vocab minus specials
    special_ids = set(tokenizer.all_special_ids)
    valid_token_ids = [tid for tid in tokenizer.get_vocab().values()
                       if tid not in special_ids]

    adv_inputs = []
    adv_tokens = []
    adv_labels = []

    for datum in tqdm(dataset, desc="Evolving sentences"):
        best_ids, _ = evolve_sentence(
            model, datum, valid_token_ids,
            pop_size=pop_size,
            n_generations=n_generations,
            mutation_rate=mutation_rate,
        )
        adv_inputs.append(best_ids)
        adv_tokens.append(tokenizer.convert_ids_to_tokens(best_ids))
        adv_labels.append(datum["labels"])   # labels unchanged

    # Evaluate the whole adversarial corpus
    preds = []
    for ids, datum in tqdm(zip(adv_inputs, dataset),
                           total=len(dataset),
                           desc="Inference on adversarial set"):
        _, p, _, _ = aux.inference(
            model,
            torch.tensor([ids]),
            torch.tensor([datum["attention_mask"]])
        )
        preds.append(p.tolist()[0])

    dataset = dataset.add_column('adv_inputs', adv_inputs)
    dataset = dataset.add_column('adv_tokens', adv_tokens)

    metrics = aux.compute_metrics(preds, adv_labels)
    return metrics, preds, adv_inputs, dataset

In [3]:
import warnings
from sklearn.exceptions import UndefinedMetricWarning

# Suppress only the specific warning from seqeval
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

# 1️⃣ load everything exactly as you did
model      = AutoModelForTokenClassification.from_pretrained('to_share/distilbert1')
tokenizer  = AutoTokenizer.from_pretrained('to_share/distilbert1')
train_ds   = aux.json_to_Dataset("data/distilbert_train.json")

# 2️⃣ run the adversarial evolution
adv_metrics, adv_preds, adv_inputs, dataset = adversarial_dataset(
    model,
    tokenizer,
    train_ds.select(range(10)),    # start small for speed
    pop_size        = 20,
    n_generations   = 10,
    mutation_rate   = 0.05,
)

print("Metrics on evolved examples:")
for k, v in adv_metrics.items():
    if k != "confusion_matrix":
        print(f"{k:>12}: {v:.4f}")


  return torch.load(checkpoint_file, map_location=map_location)
Evolving sentences: 100%|██████████| 10/10 [01:09<00:00,  6.90s/it]
Inference on adversarial set: 100%|██████████| 10/10 [00:00<00:00, 39.78it/s]

Metrics on evolved examples:
   precision: 0.4717
      recall: 0.4902
          f1: 0.4808
    accuracy: 0.9488





In [8]:
import json
from datasets import Dataset
from typing import Union

def write_dataset_to_json(dataset: Dataset, filepath: str) -> None:
    """
    Write a Hugging Face Dataset to a JSON file in the same format as the input JSON used in `json_to_Dataset`.

    Each entry will contain:
    - id (optional if you want to generate or maintain an id field)
    - tokens
    - token_ids
    - bio_labels (renamed from 'labels')
    - source_text
    - adv_inputs (optional)
    - adv_tokens (optional)
    """
    json_data = []

    for idx, row in enumerate(dataset):
        entry = {
            "id": str(idx),  # You can replace this with row["id"] if it exists
            "tokens": row["tokens"],
            "token_ids": row["input_ids"],
            "bio_labels": row["labels"],
            "source_text": row["source_text"]
        }

        # Include adversarial fields if present
        if "adv_inputs" in row:
            entry["adv_inputs"] = row["adv_inputs"]
        if "adv_tokens" in row:
            entry["adv_tokens"] = row["adv_tokens"]

        json_data.append(entry)

    with open(filepath, "w") as f:
        json.dump(json_data, f, indent=2, ensure_ascii=False)
