In [None]:
%pip install seqeval

In [None]:
import numpy as np 
import pandas as pd 

import warnings
warnings.filterwarnings('ignore')

In [None]:
train_df = pd.read_csv("/kaggle/input/df-ass4-small/train.csv")[:1000]
val_df =  pd.read_csv("/kaggle/input/df-ass4-small/val.csv")[:500]

## Tokinization

In [None]:
model_name = "microsoft/mdeberta-v3-base" # mDeBERTa-v3

label_list = ["O", "S-LOC", "I-LOC"]

id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in enumerate(label_list)}

batch_size = 4
epochs = 2
lr = 3e-5
num_labels = len(label_list)
padding = -100
max_len = 512

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_name) 

In [None]:
import ast

def char_markers_label(char_start, char_end, spans):
        for (span_start, span_end) in spans:
            if not (char_end <= span_start or char_start >= span_end):
                if char_start == span_start:
                    return "S-LOC"
                else:
                    return "I-LOC"
        return None



def to_token_labels(text, loc_markers, tokenizer, label2id):
    spans = []

    if loc_markers is None:
        loc_markers = []

    if isinstance(loc_markers, str):
        try:
            loc_markers = ast.literal_eval(loc_markers)
        except (SyntaxError, ValueError):
            loc_markers = []

    if not isinstance(loc_markers, (list, tuple)):
        loc_markers = []

    for span in loc_markers:
        if not isinstance(span, (list, tuple)):
            continue
        if len(span) != 2:
            continue
        s, e = span
        spans.append((int(s), int(e)))


    encoded = tokenizer(
        text,
        return_offsets_mapping=True,
        truncation=True,
        max_length=512,
    )

    offsets = encoded["offset_mapping"]
    labels = [label2id["O"]] * len(offsets)

    prev_was_loc = False
    for i, (start, end) in enumerate(offsets):
        if start == end:
            labels[i] = label2id["O"]
            prev_was_loc = False
            continue

        lbl = char_markers_label(start, end, spans)
        if lbl is None:
            labels[i] = label2id["O"]
            prev_was_loc = False
        else:
            if lbl == "I-LOC" and not prev_was_loc:
                lbl = "S-LOC"
            labels[i] = label2id[lbl]
            prev_was_loc = True

    encoded["labels"] = labels
    
    return encoded

In [None]:
from datasets import Dataset

train_dataset = Dataset.from_pandas(train_df[["text", "loc_markers"]])
val_dataset = Dataset.from_pandas(val_df[["text", "loc_markers"]])


In [None]:
def encode(df):
    ttl = to_token_labels(df["text"], df["loc_markers"], tokenizer, label2id)
    return ttl
    
train_dataset = train_dataset.map(encode, batched=False)
val_dataset = val_dataset.map(encode, batched=False)

In [None]:
train_dataset = train_dataset.remove_columns('offset_mapping')
train_dataset = train_dataset.remove_columns('loc_markers')
train_dataset = train_dataset.remove_columns('__index_level_0__')

val_dataset = val_dataset.remove_columns('offset_mapping')
val_dataset = val_dataset.remove_columns('loc_markers')
val_dataset = val_dataset.remove_columns('__index_level_0__')

## Model

In [None]:
from transformers import AutoModelForTokenClassification

model = AutoModelForTokenClassification.from_pretrained(
    model_name,
    num_labels=num_labels,
    id2label=id2label,
    label2id=label2id,
)

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(
    tokenizer=tokenizer,
    label_pad_token_id=padding,
)

In [None]:
import numpy as np
from seqeval.metrics import f1_score, precision_score, recall_score

def compute_metrics(eval_pred):
    true_tags = []
    pred_tags = []

    id2label_local = id2label

    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    
    for p_seq, l_seq in zip(preds, labels):
        t_seq = []
        p_tags = []
        
        for p, l in zip(p_seq, l_seq):
            if l == -100:
                continue
                
            t_seq.append(id2label_local[l])
            p_tags.append(id2label_local[p])
            
        true_tags.append(t_seq)
        pred_tags.append(p_tags)

    return {
        "precision": precision_score(true_tags, pred_tags),
        "recall": recall_score(true_tags, pred_tags),
        "f1": f1_score(true_tags, pred_tags),
    }

In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="./ner-mdeberta-v3",
    learning_rate=lr,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=epochs,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_steps=100,
    report_to="none",  
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()

## Testing

In [None]:
test = pd.read_csv("/kaggle/input/kse-ua-location-extraction-2025/test.csv")

In [None]:
from datasets import Dataset

texts = test["text"].tolist()

encoded = tokenizer(
    texts,
    padding=True,
    truncation=True,
    max_length=max_len,
    return_offsets_mapping=True,
)

test_dataset = Dataset.from_dict({
    "input_ids": encoded["input_ids"],
    "attention_mask": encoded["attention_mask"],
})

pred_output = trainer.predict(test_dataset)

logits = pred_output.predictions 
pred_ids = logits.argmax(-1)

offsets = np.array(encoded["offset_mapping"])
attn = np.array(encoded["attention_mask"])

In [None]:
# print(f"{pred_output[:1]} /n
# {logits[:1]}, \n
# {pred_ids[:1]}, \n
# {offsets[:1]}, \n
# {attn[:1]}, \n
# ")

In [None]:
all_locations_strings = []

for i, text in enumerate(texts):
    seq_ids = pred_ids[i]
    seq_offsets = offsets[i]
    seq_attn = attn[i]

    spans = []
    current_span = None

    for token_id, (start, end), m in zip(seq_ids, seq_offsets, seq_attn):
        if m == 0:
            continue
        if start == 0 and end == 0:
            continue

        label = id2label[int(token_id)]

        if label.startswith("S-LOC"):
            if current_span is not None:
                spans.append(current_span)
            current_span = [start, end]
        elif label.startswith("I-LOC") and current_span is not None:
            current_span[1] = end
        else:
            if current_span is not None:
                spans.append(current_span)
                current_span = None

    if current_span is not None:
        spans.append(current_span)

    loc_strings = []
    for (start, end) in spans:
        loc = text[start:end].strip()
        if loc:
            loc_strings.append(loc)

    unique_locs = list(dict.fromkeys(loc_strings))
    locations_str = ", ".join(unique_locs)

    all_locations_strings.append(locations_str)

In [None]:
predictions = test[["text_id", "text"]].copy()
predictions["locations"] = all_locations_strings

# TODO: split locations

# predictions.head()
predictions[["text_id", "locations"]].to_csv("baseline.csv", index=False)

---
# Code Ends