In [None]:
import re
import json
import argparse
from itertools import chain
from functools import partial

import torch
from transformers import AutoTokenizer, Trainer, TrainingArguments
from transformers import AutoModelForTokenClassification, DataCollatorForTokenClassification
import evaluate
from datasets import Dataset, features
import numpy as np
import pandas as pd

In [None]:
! pip3 list

# Loading the kaggle provided data
* Loading jsons
* Downsampling negative samples of the data
* Defining and applying tokenizer

In [None]:
data = json.load(open("kaggle/input/pii-detection-removal-from-educational-data/train.json"))

# downsampling of negative examples
p=[] # positive samples (contain relevant labels)
n=[] # negative samples (presumably contain entities that are possibly wrongly classified as entity)
for d in data:
    if any(np.array(d["labels"]) != "O"): p.append(d)
    else: n.append(d)
print("original datapoints: ", len(data))

In [None]:
all_labels = sorted(list(set(chain(*[x["labels"] for x in data]))))
label2id = {l: i for i,l in enumerate(all_labels)}
id2label = {v:k for k,v in label2id.items()}

target = [
    'B-EMAIL', 'B-ID_NUM', 'B-NAME_STUDENT', 'B-PHONE_NUM', 
    'B-STREET_ADDRESS', 'B-URL_PERSONAL', 'B-USERNAME', 'I-ID_NUM', 
    'I-NAME_STUDENT', 'I-PHONE_NUM', 'I-STREET_ADDRESS', 'I-URL_PERSONAL'
]

print(id2label)

In [None]:
def tokenize(example, tokenizer, label2id, max_length):

    # rebuild text from tokens
    text = []
    labels = []

    for t, l, ws in zip(
        example["tokens"], example["provided_labels"], example["trailing_whitespace"]
    ):
        text.append(t)
        labels.extend([l] * len(t))

        if ws:
            text.append(" ")
            labels.append("O")

    # actual tokenization
    tokenized = tokenizer("".join(text), return_offsets_mapping=True, max_length=max_length)

    labels = np.array(labels)

    text = "".join(text)
    token_labels = []

    for start_idx, end_idx in tokenized.offset_mapping:
        # CLS token
        if start_idx == 0 and end_idx == 0:
            token_labels.append(label2id["O"])
            continue

        # case when token starts with whitespace
        if text[start_idx].isspace():
            start_idx += 1

        token_labels.append(label2id[labels[start_idx]])

    length = len(tokenized.input_ids)

    return {**tokenized, "labels": token_labels, "length": length}

In [None]:
# Tokenizing the data utilizing deberta tokenizer
TRAIN_MODEL_PATH = "microsoft/deberta-base"
TRAIN_MAX_LENGTH = 1024

tokenizer = AutoTokenizer.from_pretrained(TRAIN_MODEL_PATH)

ds = Dataset.from_dict({
    "full_text": [x["full_text"] for x in data],
    "document": [str(x["document"]) for x in data],
    "tokens": [x["tokens"] for x in data],
    "trailing_whitespace": [x["trailing_whitespace"] for x in data],
    "provided_labels": [x["labels"] for x in data],
})

ds = ds.map(tokenize, fn_kwargs={"tokenizer": tokenizer, "label2id": label2id, "max_length": TRAIN_MAX_LENGTH}, num_proc=3)

In [None]:
def downsample(df, percent):
    df = df.copy()

    df['is_labels'] = df['provided_labels'].apply(lambda labels: any(label != "O" for label in labels))
    true_samples = df[df['is_labels'] == True]
    false_samples = df[df['is_labels'] == False]

    downsampled_false_samples = false_samples.sample(frac=percent, random_state=42)


    return pd.concat([true_samples, downsampled_false_samples])

In [None]:
# Downsample the negative samples of the dataset
df_train = pd.DataFrame(ds)
df_train = downsample(df_train, 0.2)
df_train = df_train.drop(columns=['is_labels'])

ds = Dataset.from_pandas(df_train)
# Splitting the dataset into training and validation sets for performance evaluation
ds = ds.train_test_split(test_size=0.1, seed=42)

In [None]:
x = ds['train'][0]

for t,l in zip(x["tokens"], x["provided_labels"]):
    if l != "O":
        print((t,l))

print("*"*100)

for t, l in zip(tokenizer.convert_ids_to_tokens(x["input_ids"]), x["labels"]):
    if id2label[l] != "O":
        print((t,id2label[l]))

# Training and evaluation of model
* Defining metrics (precision, recall, and f5-score)
* Training model
* Evaluating on validation data

In [None]:
from seqeval.metrics import precision_score, recall_score

def metrics(p, all_labels):
    preds, labels = p
    preds = np.argmax(preds, axis=2)
    # Remove ignored index (special tokens)
    true_predictions = [
            [all_labels[p] for (p, l) in zip(prediction, label) if l != -100]
            for prediction, label in zip(preds, labels)
        ]
    true_labels = [
            [all_labels[l] for (p, l) in zip(prediction, label) if l != -100]
            for prediction, label in zip(preds, labels)
        ]

    precision = precision_score(true_labels, true_predictions, average='micro')
    recall = recall_score(true_labels, true_predictions, average='micro')

    f5_score = (1 + 5 ** 2) * (precision * recall) / (5 ** 2 * precision + recall)

    results = {
        "precision": precision,
        "recall": recall,
        "f5": f5_score
    }

    return results

In [None]:
model = AutoModelForTokenClassification.from_pretrained(
    TRAIN_MODEL_PATH,
    num_labels=len(all_labels),
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True
)
# mps GPU acceleration for training 
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    model.to(mps_device)
    print("Model moved to MPS device.")
elif torch.cuda.is_available():
    model.cuda()
    print("Model moved to CUDA device.")
else:
    print("No GPU available, using CPU.")

collator = DataCollatorForTokenClassification(tokenizer, pad_to_multiple_of=16)


# Training

In [None]:
args = TrainingArguments(
    output_dir='kaggle/output', 
    learning_rate=2e-5,
    num_train_epochs=3,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=2,
    report_to="none",
    evaluation_strategy="steps",
    eval_steps=100,
    do_eval=False,
    save_total_limit=1,
    logging_steps=20,
    lr_scheduler_type='cosine',
    metric_for_best_model="f1",
    greater_is_better=True,
    warmup_ratio=0.1,
    weight_decay=0.01
)


trainer = Trainer(
    model,
    args,
    train_dataset=ds['train'],
    eval_dataset=ds["test"],
    data_collator=collator,
    tokenizer=tokenizer,
    compute_metrics=partial(metrics, all_labels=all_labels)
)


In [None]:
trainer.train()
trainer.save_model("deberta3base_1024_downsampled")
tokenizer.save_pretrained("deberta3base_1024_downsampled")

In [None]:
model_path = "content/deberta3base_1024_downsampled"

# Load the tokenizer and the model
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForTokenClassification.from_pretrained(model_path)
model.to("cpu")

In [None]:
args = TrainingArguments(
        ".",
        per_device_eval_batch_size=1,
        report_to="none",
    )

trainer = Trainer(
    model=model,
    args=args,
    data_collator=collator,
    tokenizer=tokenizer,
)

In [None]:
email_regex = re.compile(r'[\w.+-]+@[\w-]+\.[\w.-]+')
phone_num_regex = re.compile(r"(\(\d{3}\)\d{3}\-\d{4}\w*|\d{3}\.\d{3}\.\d{4})\s")
regex_dict = {email_regex: label2id["B-EMAIL"], phone_num_regex: label2id["B-PHONE_NUM"]}

# Use the regex patterns to be able to label tokens in dataset as email or phone number, think about utilizing offset mapping given by the dataset for proper labeling
# I have a list of tokens and I want to label them as email or phone number
def label_email_num(regex_dict, tokens, preds):
    for i in range(len(tokens)):
        for j in range(len(tokens[i])):
            token = tokens[i][j]
        for regex, id in regex_dict.items():
            if regex.match(token):
                preds[i] = id
    return preds


In [None]:
# test_batch = ds['test'][0:15]
preds,labels,metric = trainer.predict(ds['test'])
# preds,labels,metric = trainer.predict(test_batch)


In [None]:
tokens = [x['tokens'] for x in ds['test']]
preds2 = label_email_num(regex_dict, tokens, preds)
differences = sum(a != b for a, b in zip(preds, preds2))
metrics((preds, labels), all_labels)

In [None]:
def tokenize(example, tokenizer, max_length):

    # rebuild text from tokens
    text = []
    for t, ws in zip(example["tokens"], example["trailing_whitespace"]):
        text.append(t)
        if ws:
            text.append(" ")

    # actual tokenization
    tokenized = tokenizer("".join(text), return_offsets_mapping=True, max_length=max_length)

    length = len(tokenized.input_ids)

    return {**tokenized, "length": length}


In [None]:
test_data = json.load(open("kaggle/input/pii-detection-removal-from-educational-data/test.json"))
test_frame = pd.DataFrame(test_data)

In [None]:
args = TrainingArguments(
        ".",
        per_device_eval_batch_size=1,
        report_to="none",
    )

trainer = Trainer(
    model=model,
    args=args,
    data_collator=collator,
    tokenizer=tokenizer,
    compute_metrics=partial(metrics, all_labels=all_labels)
)

In [None]:
test_frame.head()

In [None]:
# create dataset from test_data, tokenize it and predict
# test data provides no labels just dict_keys(['document', 'full_text', 'tokens', 'trailing_whitespace'])
# test data is a list of dictionaries

ds_test = Dataset.from_pandas(test_frame)
ds_test = ds_test.map(tokenize, fn_kwargs={"tokenizer": tokenizer, "max_length": TRAIN_MAX_LENGTH}, num_proc=3)

preds,_,_ = trainer.predict(ds_test)


In [None]:
def write_output(test_frame, id2label, preds):
    preds = np.argmax(preds, axis=2)
    output = []
    for i, row in test_frame.iterrows():
        tokens = row["tokens"]
        prediction = preds[i]
        for j, token in enumerate(tokens):
            if j < len(prediction):  # Check if index is within the range of prediction size
                label = id2label[prediction[j]]
                if label != "O":
                    output.append((i, row['document'], j, label))  # Include the missing column "token"
            else:
                break

    # Create a dataframe from the output list
    df_output = pd.DataFrame(output, columns=["row_id", "document", "token", "label"])  # Update column names

    # Save the dataframe to a CSV file
    df_output.to_csv("kaggle/output/output.csv", index=False)

    return df_output


In [None]:
write_output(test_frame, id2label, preds)