In [None]:
%pip install pyvi transformers seqeval evaluate google-cloud-bigquery-storage protobuf

## Library

In [None]:
import json
from tqdm import tqdm
from pyvi import ViTokenizer
import re
import itertools
import evaluate
import numpy as np
import pandas as pd
from collections import Counter
import torch.nn as nn
import torch
from datasets import Dataset, DatasetDict 
from transformers import AutoModel, RobertaTokenizerFast
from transformers import AutoModelForTokenClassification, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorForTokenClassification, DataCollatorWithPadding, EarlyStoppingCallback
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix
from seqeval.metrics import classification_report, f1_score, accuracy_score

## Loading Data

In [None]:
print("Loading data files...")
TRAIN_PATH = '../data/04_model_input/train_dataset.json'
TEST_PATH = '../data/04_model_input/test_dataset.json'
DEV_PATH = '../data/04_model_input/dev_dataset.json'

with open(TRAIN_PATH, 'r', encoding='utf-8') as f:
    train_json = json.load(f)

with open(TEST_PATH, 'r', encoding='utf-8') as f:
    test_json = json.load(f)

with open(DEV_PATH, 'r', encoding='utf-8') as f:
    dev_json = json.load(f)

print(f"-> Train raw docs: {len(train_json)}")
print(f"-> Test raw docs: {len(test_json)}")
print(f"-> Dev raw docs: {len(dev_json)}")

## Tokenizer Initialization

In [None]:
tokenizer = RobertaTokenizerFast.from_pretrained(
    "vinai/phobert-base-v2",
    add_prefix_space=True
    )

## Prepare Data

In [None]:
def prepare_re_data_from_json(json_data):
    dataset = []
    for task in json_data:
        text = task['data']['text']
        entities = {}
        relations = []
        if not task['annotations']:
            continue
            
        for item in task['annotations'][0]['result']:
            if item['type'] == 'labels':
                entities[item['id']] = {
                    'id': item['id'],
                    'text': item['value']['text'],
                    'start': item['value']['start'],
                    'end': item['value']['end'],
                    'label': item['value']['labels'][0]
                }
            elif item['type'] == 'relation':
                if 'labels' not in item or not item['labels']:
                    continue
                relations.append({
                    'from': item['from_id'],
                    'to': item['to_id'],
                    'label': item['labels'][0]
                })

        true_relation_map = {}
        for rel in relations:
            true_relation_map[(rel['from'], rel['to'])] = rel['label']
            
        entity_ids = list(entities.keys())
        for id1, id2 in itertools.permutations(entity_ids, 2):
            e1 = entities[id1]
            e2 = entities[id2]

            label = true_relation_map.get((id1, id2), 'NO_RELATION')
            
            dataset.append({
                'text': text,
                'ent1': e1,
                'ent2': e2,
                'label': label
            })
            
    return dataset

In [None]:
def insert_entity_markers(text, ent1, ent2):
    ents = sorted([ent1, ent2], key=lambda x: x["start"], reverse=True)

    for i, ent in enumerate(ents):
        if i == 0:
            start_tag, end_tag = "[E2]", "[/E2]"
        else:
            start_tag, end_tag = "[E1]", "[/E1]"

        text = (
            text[:ent["start"]] +
            f"{start_tag} " +
            text[ent["start"]:ent["end"]] +
            f" {end_tag}" +
            text[ent["end"]:]
        )

    return text

In [None]:
def build_re_samples(json_data):
    samples = prepare_re_data_from_json(json_data)
    processed = []

    for s in samples:
        marked_text = insert_entity_markers(
            s["text"], s["ent1"], s["ent2"]
        )

        processed.append({
            "text": marked_text,
            "label": s["label"]
        })

    return processed

In [None]:
print("\nConverting to RE format...")

raw_datasets_re = DatasetDict({
    "train": Dataset.from_list(build_re_samples(train_json)),
    "validation": Dataset.from_list(build_re_samples(dev_json)),
    "test": Dataset.from_list(build_re_samples(test_json)),
})
print("done!")

In [None]:
print("\n--- KI·ªÇM TRA M·∫™U T·∫¨P TRAIN ---")
print(raw_datasets_re['train'][0]['text'])
print(raw_datasets_re['train'][0]['label'])


print("\n--- KI·ªÇM TRA M·∫™U T·∫¨P VALIDATION ---")
print(raw_datasets_re['validation'][0]['text'])
print(raw_datasets_re['validation'][0]['label'])

In [None]:
re_label_set = set()
for split in raw_datasets_re.keys():
    for lbl in raw_datasets_re[split]["label"]:
        re_label_set.add(lbl)

re_label_list = sorted(list(re_label_set))
re_label2id = {l: i for i, l in enumerate(re_label_list)}
re_id2label = {i: l for l, i in re_label2id.items()}

print("RE labels:", re_label_list)

In [None]:
def encode_re_label(example):
    example["labels"] = re_label2id[example["label"]]
    return example

raw_datasets_re = raw_datasets_re.map(encode_re_label)

In [None]:
tokenizer.add_tokens(["[E1]", "[/E1]", "[E2]", "[/E2]"])

def tokenize_re(example):
    return tokenizer(
        example["text"],
        truncation=True,
        padding="max_length",
        max_length=256
    )

tokenized_datasets_re = raw_datasets_re.map(
    tokenize_re,
    batched=False,
    remove_columns = ['text', 'label'])

In [None]:
print(tokenized_datasets_re["train"].column_names)
print(tokenized_datasets_re["train"][0])

## Train Phase

In [None]:
def compute_class_weights_re(dataset, num_labels):
    counts = Counter(int(x["labels"]) for x in dataset)
    freqs = np.array([counts.get(i, 1) for i in range(num_labels)], dtype=np.float64)
    weights = freqs.sum() / freqs
    weights = weights / weights.mean()
    return torch.tensor(weights, dtype=torch.float)

In [None]:
class WeightedTrainerRE(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights
        self._loss_fct = None
        self._loss_fct_device = None

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        device = logits.device
        weight = self.class_weights.to(device) if self.class_weights is not None else None

        if self._loss_fct is None or self._loss_fct_device != device:
            self._loss_fct = nn.CrossEntropyLoss(weight=weight)
            self._loss_fct_device = device

        loss = self._loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
from transformers import AutoModelForSequenceClassification

model_re = AutoModelForSequenceClassification.from_pretrained(
    "vinai/phobert-base-v2",
    num_labels=len(re_label_list),
    id2label=re_id2label,
    label2id=re_label2id
)

model_re.resize_token_embeddings(len(tokenizer))

In [None]:
def compute_metrics_re_ignore_no_relation(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)

    valid_idx = labels != re_label2id["NO_RELATION"]
    labels = labels[valid_idx]
    preds = preds[valid_idx]

    precision, recall, f1, _ = precision_recall_fscore_support(
        labels,
        preds,
        average="macro",
        zero_division=0
    )

    return {
        "macro_f1_no_relation": f1,
        "precision": precision,
        "recall": recall,
    }

In [None]:
training_args_re = TrainingArguments(
    output_dir="./phobert-re-results",
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=64,
    gradient_accumulation_steps = 4,
    num_train_epochs=5,
    weight_decay=0.01,
    max_grad_norm=1.0,
    warmup_ratio=0.06,
    lr_scheduler_type="cosine",
    eval_strategy="steps",
    save_strategy="steps",
    eval_steps=2165,
    save_steps=2165,
    load_best_model_at_end=True,
    metric_for_best_model="macro_f1_no_relation",
    greater_is_better=True,
    label_smoothing_factor=0.0,
    report_to="none",
    fp16=True,
    logging_strategy="steps",
    logging_steps=500,
)

In [None]:
class_weights_re = compute_class_weights_re(tokenized_datasets_re["train"], len(re_label_list))

trainer_re = WeightedTrainerRE(
    model=model_re,
    args=training_args_re,
    train_dataset=tokenized_datasets_re["train"],
    eval_dataset=tokenized_datasets_re["validation"],
    processing_class=tokenizer,
    compute_metrics=compute_metrics_re_ignore_no_relation,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
    class_weights=class_weights_re,
)

In [None]:
print("\nüöÄ B·∫ÆT ƒê·∫¶U TRAINING...")
trainer_re.train()

print("\n‚úÖ TRAINING HO√ÄN T·∫§T!")

save_path = "./phobert-re-final"
trainer_re.save_model(save_path)
tokenizer.save_pretrained(save_path)
print(f"Model ƒë√£ ƒë∆∞·ª£c l∆∞u t·∫°i: {save_path}")

## Debug

In [None]:
def token_level_confusion(trainer, dataset, label_list):
    preds_out = trainer.predict(dataset)
    logits = preds_out.predictions
    labels = preds_out.label_ids
    preds = np.argmax(logits, axis=2)

    y_true = []
    y_pred = []
    for ps, ls in zip(preds, labels):
        for p, l in zip(ps, ls):
            if l == -100:
                continue
            y_true.append(int(l))
            y_pred.append(int(p))

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(label_list))))
    df = pd.DataFrame(cm, index=label_list, columns=label_list)
    return df

## Test Phase

In [None]:
MODEL_PATH = os.path.abspath("./phobert-re-final")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = RobertaTokenizerFast.from_pretrained(MODEL_PATH)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)
model.to(device)

id2label = model.config.id2label
label2id = model.config.label2id
label_list = list(id2label.values())

print("Labels:", label_list)

In [None]:
tokenized_test = tokenized_datasets_re['test']
tokenized_test.set_format(
    type="torch",
    columns=["labels", "input_ids", "attention_mask"]
)

In [None]:
args = TrainingArguments(
    output_dir="./tmp",
    per_device_eval_batch_size=64,   
    report_to="none"
)

data_collator = DataCollatorWithPadding(tokenizer)

trainer_re = Trainer(
    model=model,
    args=args,
    data_collator=data_collator
)

pred_output = trainer_re.predict(tokenized_test)

In [None]:
logits = pred_output.predictions
y_true = pred_output.label_ids
y_pred = np.argmax(logits, axis=-1)

no_rel_id = label2id["NO_RELATION"]

mask = y_true != no_rel_id
y_true_filt = y_true[mask]
y_pred_filt = y_pred[mask]

precision, recall, f1, _ = precision_recall_fscore_support(
    y_true_filt,
    y_pred_filt,
    average="macro",
    zero_division=0
)

micro_p, micro_r, micro_f1, _ = precision_recall_fscore_support(
    y_true_filt,
    y_pred_filt,
    average="micro",
    zero_division=0
)

accuracy = accuracy_score( y_true_filt, y_pred_filt)

print("\n===== RE TEST RESULTS =====")
print(f"Macro F1 (no NO_RELATION): {f1:.4f}")
print(f"Macro Precision:          {precision:.4f}")
print(f"Macro Recall:             {recall:.4f}")
print(f"Micro F1:                 {micro_f1:.4f}")
print(f"Accuracy:                 {accuracy:.4f}")

labels_no_rel_ids = [
    label2id[l] for l in label_list if l != "NO_RELATION"
]

cm = confusion_matrix(
    y_true_filt,
    y_pred_filt,
    labels=labels_no_rel_ids
)

labels_no_rel = [id2label[i] for i in labels_no_rel_ids]

df_cm = pd.DataFrame(
    cm,
    index=labels_no_rel,
    columns=labels_no_rel
)

In [None]:
df_cm