## **Problem 7: Transformers**

# Part 2.

Transformer Layers = 4

In [1]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 4
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.413837
Epoch 2/15, Loss: 0.258707
Epoch 3/15, Loss: 0.194295
Epoch 4/15, Loss: 0.154911
Epoch 5/15, Loss: 0.125964
Epoch 6/15, Loss: 0.104109
Epoch 7/15, Loss: 0.087502
Epoch 8/15, Loss: 0.074660
Epoch 9/15, Loss: 0.065063
Epoch 10/15, Loss: 0.056267
Epoch 11/15, Loss: 0.049808
Epoch 12/15, Loss: 0.043685
Epoch 13/15, Loss: 0.039220
Epoch 14/15, Loss: 0.035655
Epoch 15/15, Loss: 0.033154

Accuracy: 97.8008%
F1_Score: 80.3828%
              precision    recall  f1-score   support

           _       0.79      0.81      0.80     10398
         dat       0.40      0.43      0.42       357
       event       0.81      0.84      0.82       396
         fac       0.79      0.85      0.82       281
         loc       0.86      0.87      0.86      3238
         mon       0.35      0.36      0.36       113
         org       0.76      0.84      0.80      3941
         pct       0.47      0.59      0.52        71
         per       0.63      0.58      0.60       928
        p

Transformer Layers = 6

In [2]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 6
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.400615
Epoch 2/15, Loss: 0.240033
Epoch 3/15, Loss: 0.178400
Epoch 4/15, Loss: 0.138245
Epoch 5/15, Loss: 0.111158
Epoch 6/15, Loss: 0.090183
Epoch 7/15, Loss: 0.074459
Epoch 8/15, Loss: 0.063179
Epoch 9/15, Loss: 0.054119
Epoch 10/15, Loss: 0.046605
Epoch 11/15, Loss: 0.041283
Epoch 12/15, Loss: 0.036731
Epoch 13/15, Loss: 0.032241
Epoch 14/15, Loss: 0.029901
Epoch 15/15, Loss: 0.025882

Accuracy: 97.8385%
F1_Score: 80.8847%
              precision    recall  f1-score   support

           _       0.77      0.82      0.79     10398
         dat       0.43      0.57      0.49       357
       event       0.81      0.91      0.86       396
         fac       0.81      0.95      0.87       281
         loc       0.84      0.89      0.86      3238
         mon       0.38      0.62      0.47       113
         org       0.78      0.84      0.81      3941
         pct       0.43      0.68      0.53        71
         per       0.61      0.70      0.65       928
        p

Transformer Layers = 12

In [3]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 12
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.385114
Epoch 2/15, Loss: 0.228463
Epoch 3/15, Loss: 0.166618
Epoch 4/15, Loss: 0.128085
Epoch 5/15, Loss: 0.100003
Epoch 6/15, Loss: 0.080159
Epoch 7/15, Loss: 0.064681
Epoch 8/15, Loss: 0.054228
Epoch 9/15, Loss: 0.045763
Epoch 10/15, Loss: 0.039530
Epoch 11/15, Loss: 0.033966
Epoch 12/15, Loss: 0.030754
Epoch 13/15, Loss: 0.027464
Epoch 14/15, Loss: 0.024556
Epoch 15/15, Loss: 0.022624

Accuracy: 97.9356%
F1_Score: 81.7282%
              precision    recall  f1-score   support

           _       0.79      0.81      0.80     10398
         dat       0.47      0.49      0.48       357
       event       0.82      0.92      0.87       396
         fac       0.90      0.96      0.93       281
         loc       0.85      0.90      0.87      3238
         mon       0.32      0.39      0.35       113
         org       0.80      0.83      0.81      3941
         pct       0.45      0.63      0.53        71
         per       0.67      0.66      0.66       928
        p

Attention Heads = 2

In [4]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 2
ff_dim = 512
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.445432
Epoch 2/15, Loss: 0.291151
Epoch 3/15, Loss: 0.229309
Epoch 4/15, Loss: 0.190191
Epoch 5/15, Loss: 0.161963
Epoch 6/15, Loss: 0.140347
Epoch 7/15, Loss: 0.123273
Epoch 8/15, Loss: 0.108695
Epoch 9/15, Loss: 0.097365
Epoch 10/15, Loss: 0.087595
Epoch 11/15, Loss: 0.078884
Epoch 12/15, Loss: 0.072299
Epoch 13/15, Loss: 0.064373
Epoch 14/15, Loss: 0.060194
Epoch 15/15, Loss: 0.055486

Accuracy: 97.3479%
F1_Score: 75.8377%
              precision    recall  f1-score   support

           _       0.73      0.78      0.75     10398
         dat       0.39      0.45      0.41       357
       event       0.57      0.81      0.67       396
         fac       0.72      0.90      0.80       281
         loc       0.83      0.83      0.83      3238
         mon       0.30      0.42      0.35       113
         org       0.69      0.80      0.74      3941
         pct       0.54      0.70      0.61        71
         per       0.57      0.58      0.57       928
        p

Attention Heads = 8

In [5]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 8
ff_dim = 512
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.443732
Epoch 2/15, Loss: 0.287109
Epoch 3/15, Loss: 0.225030
Epoch 4/15, Loss: 0.185043
Epoch 5/15, Loss: 0.155803
Epoch 6/15, Loss: 0.132716
Epoch 7/15, Loss: 0.115249
Epoch 8/15, Loss: 0.100076
Epoch 9/15, Loss: 0.087717
Epoch 10/15, Loss: 0.077669
Epoch 11/15, Loss: 0.069572
Epoch 12/15, Loss: 0.062570
Epoch 13/15, Loss: 0.056547
Epoch 14/15, Loss: 0.050791
Epoch 15/15, Loss: 0.046422

Accuracy: 97.4674%
F1_Score: 77.4394%
              precision    recall  f1-score   support

           _       0.74      0.79      0.76     10398
         dat       0.35      0.46      0.40       357
       event       0.72      0.84      0.77       396
         fac       0.79      0.91      0.85       281
         loc       0.82      0.87      0.85      3238
         mon       0.21      0.24      0.22       113
         org       0.72      0.81      0.76      3941
         pct       0.34      0.42      0.38        71
         per       0.59      0.62      0.61       928
        p

Attention Heads = 16

In [6]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 16
ff_dim = 512
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.439810
Epoch 2/15, Loss: 0.285379
Epoch 3/15, Loss: 0.224124
Epoch 4/15, Loss: 0.183414
Epoch 5/15, Loss: 0.153741
Epoch 6/15, Loss: 0.130982
Epoch 7/15, Loss: 0.113381
Epoch 8/15, Loss: 0.097937
Epoch 9/15, Loss: 0.085868
Epoch 10/15, Loss: 0.076247
Epoch 11/15, Loss: 0.068403
Epoch 12/15, Loss: 0.060848
Epoch 13/15, Loss: 0.054310
Epoch 14/15, Loss: 0.049642
Epoch 15/15, Loss: 0.045174

Accuracy: 97.2795%
F1_Score: 75.7856%
              precision    recall  f1-score   support

           _       0.71      0.78      0.74     10398
         dat       0.34      0.44      0.38       357
       event       0.74      0.81      0.78       396
         fac       0.86      0.89      0.87       281
         loc       0.78      0.86      0.82      3238
         mon       0.27      0.39      0.32       113
         org       0.69      0.82      0.75      3941
         pct       0.34      0.46      0.39        71
         per       0.59      0.53      0.56       928
        p

Hidden Size: d_model = 64, ff_dim = 256

In [10]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 64
num_heads = 4
ff_dim = 256
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.536494
Epoch 2/15, Loss: 0.393015
Epoch 3/15, Loss: 0.336848
Epoch 4/15, Loss: 0.299713
Epoch 5/15, Loss: 0.270997
Epoch 6/15, Loss: 0.250266
Epoch 7/15, Loss: 0.230171
Epoch 8/15, Loss: 0.214150
Epoch 9/15, Loss: 0.200083
Epoch 10/15, Loss: 0.186691
Epoch 11/15, Loss: 0.176203
Epoch 12/15, Loss: 0.165114
Epoch 13/15, Loss: 0.155822
Epoch 14/15, Loss: 0.147902
Epoch 15/15, Loss: 0.139299

Accuracy: 95.0960%
F1_Score: 59.5711%
              precision    recall  f1-score   support

           _       0.58      0.63      0.61     10398
         dat       0.32      0.26      0.28       357
       event       0.23      0.38      0.29       396
         fac       0.31      0.40      0.35       281
         loc       0.67      0.74      0.71      3238
         mon       0.10      0.12      0.11       113
         org       0.50      0.60      0.55      3941
         pct       0.17      0.21      0.19        71
         per       0.49      0.46      0.48       928
        p

Hidden Size: d_model = 192, ff_dim = 768

In [11]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 192
num_heads = 4
ff_dim = 768
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.390419
Epoch 2/15, Loss: 0.224583
Epoch 3/15, Loss: 0.162898
Epoch 4/15, Loss: 0.125233
Epoch 5/15, Loss: 0.098022
Epoch 6/15, Loss: 0.079726
Epoch 7/15, Loss: 0.065648
Epoch 8/15, Loss: 0.055045
Epoch 9/15, Loss: 0.047043
Epoch 10/15, Loss: 0.040298
Epoch 11/15, Loss: 0.035810
Epoch 12/15, Loss: 0.031402
Epoch 13/15, Loss: 0.028190
Epoch 14/15, Loss: 0.025423
Epoch 15/15, Loss: 0.023305

Accuracy: 97.9275%
F1_Score: 81.1314%
              precision    recall  f1-score   support

           _       0.80      0.81      0.80     10398
         dat       0.40      0.46      0.43       357
       event       0.88      0.89      0.89       396
         fac       0.87      0.94      0.90       281
         loc       0.86      0.89      0.87      3238
         mon       0.34      0.40      0.36       113
         org       0.80      0.81      0.80      3941
         pct       0.51      0.58      0.54        71
         per       0.63      0.61      0.62       928
        p

Hidden Size: d_model = 256, ff_dim = 1024

In [12]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 128
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 256
num_heads = 4
ff_dim = 1024
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.351152
Epoch 2/15, Loss: 0.185150
Epoch 3/15, Loss: 0.124825
Epoch 4/15, Loss: 0.090603
Epoch 5/15, Loss: 0.067479
Epoch 6/15, Loss: 0.052440
Epoch 7/15, Loss: 0.042068
Epoch 8/15, Loss: 0.035619
Epoch 9/15, Loss: 0.029857
Epoch 10/15, Loss: 0.024965
Epoch 11/15, Loss: 0.022335
Epoch 12/15, Loss: 0.020423
Epoch 13/15, Loss: 0.018143
Epoch 14/15, Loss: 0.016360
Epoch 15/15, Loss: 0.015407

Accuracy: 98.1125%
F1_Score: 82.3825%
              precision    recall  f1-score   support

           _       0.80      0.82      0.81     10398
         dat       0.45      0.49      0.47       357
       event       0.85      0.92      0.88       396
         fac       0.95      0.97      0.96       281
         loc       0.87      0.89      0.88      3238
         mon       0.37      0.41      0.38       113
         org       0.81      0.85      0.83      3941
         pct       0.49      0.61      0.54        71
         per       0.64      0.58      0.61       928
        p

Max Sequence Length = 64

In [7]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 64
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.443980
Epoch 2/15, Loss: 0.287881
Epoch 3/15, Loss: 0.224035
Epoch 4/15, Loss: 0.184461
Epoch 5/15, Loss: 0.155134
Epoch 6/15, Loss: 0.133605
Epoch 7/15, Loss: 0.115807
Epoch 8/15, Loss: 0.101513
Epoch 9/15, Loss: 0.089456
Epoch 10/15, Loss: 0.079369
Epoch 11/15, Loss: 0.070601
Epoch 12/15, Loss: 0.063626
Epoch 13/15, Loss: 0.057222
Epoch 14/15, Loss: 0.051651
Epoch 15/15, Loss: 0.047680

Accuracy: 97.5462%
F1_Score: 78.0052%
              precision    recall  f1-score   support

           _       0.76      0.80      0.78     10158
         dat       0.36      0.45      0.40       356
       event       0.75      0.81      0.78       390
         fac       0.74      0.81      0.78       269
         loc       0.79      0.87      0.83      3184
         mon       0.30      0.38      0.33        96
         org       0.74      0.80      0.77      3875
         pct       0.45      0.55      0.49        71
         per       0.61      0.58      0.60       910
        p

Max Sequence Length = 256

In [8]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 256
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.438014
Epoch 2/15, Loss: 0.288725
Epoch 3/15, Loss: 0.227379
Epoch 4/15, Loss: 0.187588
Epoch 5/15, Loss: 0.158676
Epoch 6/15, Loss: 0.137224
Epoch 7/15, Loss: 0.119525
Epoch 8/15, Loss: 0.103801
Epoch 9/15, Loss: 0.092007
Epoch 10/15, Loss: 0.082012
Epoch 11/15, Loss: 0.073142
Epoch 12/15, Loss: 0.066046
Epoch 13/15, Loss: 0.059298
Epoch 14/15, Loss: 0.054300
Epoch 15/15, Loss: 0.049690

Accuracy: 97.4764%
F1_Score: 77.8033%
              precision    recall  f1-score   support

           _       0.75      0.79      0.77     10449
         dat       0.35      0.50      0.41       357
       event       0.73      0.80      0.76       396
         fac       0.79      0.85      0.82       281
         loc       0.82      0.87      0.84      3240
         mon       0.28      0.43      0.34       114
         org       0.75      0.80      0.77      3945
         pct       0.44      0.62      0.51        71
         per       0.57      0.64      0.60       938
        p

Max Sequence Length = 512

In [9]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 512
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 128
num_heads = 4
ff_dim = 512
num_layers = 2
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.440509
Epoch 2/15, Loss: 0.286984
Epoch 3/15, Loss: 0.224916
Epoch 4/15, Loss: 0.186276
Epoch 5/15, Loss: 0.158530
Epoch 6/15, Loss: 0.135651
Epoch 7/15, Loss: 0.118433
Epoch 8/15, Loss: 0.104799
Epoch 9/15, Loss: 0.092508
Epoch 10/15, Loss: 0.082337
Epoch 11/15, Loss: 0.073490
Epoch 12/15, Loss: 0.066266
Epoch 13/15, Loss: 0.060150
Epoch 14/15, Loss: 0.054817
Epoch 15/15, Loss: 0.050072

Accuracy: 97.3677%
F1_Score: 77.0357%
              precision    recall  f1-score   support

           _       0.75      0.79      0.77     10449
         dat       0.36      0.50      0.42       357
       event       0.69      0.81      0.74       396
         fac       0.72      0.91      0.80       281
         loc       0.81      0.87      0.84      3240
         mon       0.31      0.42      0.36       114
         org       0.73      0.76      0.75      3945
         pct       0.43      0.58      0.49        71
         per       0.58      0.61      0.59       938
        p

# Optimal Transformer NER Model

In [13]:
import torch
import torch.nn as nn
import math
import numpy as np
from collections import Counter
from itertools import chain
import random
from torch.utils.data import Dataset, DataLoader
from seqeval.metrics import classification_report, f1_score, accuracy_score
import os
import warnings
warnings.filterwarnings("ignore")

def load_clean_ner_data(tokens_path, labels_path):
    with open(tokens_path, "r", encoding="utf-8") as token_file, open(labels_path, "r", encoding="utf-8") as label_file:
        token_lines = token_file.readlines()
        label_lines = label_file.readlines()

    sentences, labels = [], []
    
    for t_line, l_line in zip(token_lines, label_lines):
        tokens = t_line.strip().split()
        lbls = l_line.strip().split()
        if len(tokens) == len(lbls) and len(tokens) > 0:
            sentences.append([t.strip() for t in tokens])
            labels.append([l.strip().lower().replace("_", "-") for l in lbls])
    return sentences, labels

# Read and merge ARMAN & PEYMA datasets
arman_sent, arman_lab = load_clean_ner_data("arman-tokens.txt", "arman-labels.txt")
peyma_sent, peyma_lab = load_clean_ner_data("peyma-tokens.txt", "peyma-labels.txt")
all_sentences = arman_sent + peyma_sent
all_labels = arman_lab + peyma_lab

# Combine and shuffle all data, then split into train (%80) and test (%20) sets
combined = list(zip(all_sentences, all_labels))
random.seed(42)
random.shuffle(combined)
split_idx = int(0.8 * len(combined))
train_sentences, train_labels = zip(*combined[:split_idx])
test_sentences, test_labels = zip(*combined[split_idx:])

all_labels_unique = sorted(set(l for seq in (train_labels + test_labels) for l in seq))
label2id = {l: i for i, l in enumerate(all_labels_unique)}
id2label = {i: l for l, i in label2id.items()}

token_counter = Counter(chain(*train_sentences))
token2id = {"<pad>": 0, "<unk>": 1}
for tok, count in token_counter.items():
    if count > 1:
        token2id[tok] = len(token2id)
id2token = {i: t for t, i in token2id.items()}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# NER dataset with padding and masks
class NERDataset(Dataset):
    def __init__(self, sentences, labels, max_len, token2id, label2id):
        self.sentences = sentences
        self.labels = labels
        self.max_len = max_len
        self.token2id = token2id
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = [self.token2id.get(t, self.token2id["<unk>"]) for t in self.sentences[idx]]
        tags = [self.label2id[l] for l in self.labels[idx]]
        tokens = tokens[:self.max_len]
        tags = tags[:self.max_len]
        attn_mask = [1] * len(tokens)
        pad_len = self.max_len - len(tokens)
        tokens += [0] * pad_len
        tags += [-100] * pad_len
        attn_mask += [0] * pad_len
        return {
            "input_ids": torch.tensor(tokens),
            "labels": torch.tensor(tags),
            "attention_mask": torch.tensor(attn_mask)
        }

# Prepare NER datasets and loaders
MAX_LEN = 64
train_data = NERDataset(train_sentences, train_labels, MAX_LEN, token2id, label2id)
test_data = NERDataset(test_sentences, test_labels, MAX_LEN, token2id, label2id)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

class PositionalEncoding(nn.Module):
    # Positional encoding for token positions
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    # Standard multi-head self-attention
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B = q.size(0)
        q = self.w_q(q).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(B, -1, self.num_heads, self.d_k).transpose(1, 2)
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = attn @ v
        context = context.transpose(1, 2).contiguous().view(B, -1, self.num_heads * self.d_k)
        return self.w_o(context)

class PositionwiseFeedForward(nn.Module):
    # Feed-forward block used in transformer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))

class SublayerConnection(nn.Module):
    # Wraps sublayer with norm, dropout, and residual
    def __init__(self, size, dropout):
        super().__init__()
        self.norm = nn.LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, heads, dropout)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])

    def forward(self, x, mask):
        x = self.sublayers[0](x, lambda x: self.attn(x, x, x, mask))
        return self.sublayers[1](x, self.ffn)

# Full transformer-based NER model
class TransformerNER(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers, max_len, num_labels):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pe = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderLayer(d_model, heads, d_ff, 0.1) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, mask, labels=None):
        x = self.embedding(input_ids)
        x = self.pe(x)
        for layer in self.encoder:
            x = layer(x, mask)
        x = self.norm(x)
        logits = self.classifier(x)
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.size(-1)), labels.view(-1))
        return {"loss": loss, "logits": logits}

# Model hyperparameters
d_model = 256
num_heads = 4
ff_dim = 1024
num_layers = 6
vocab_size = len(token2id)
num_labels = len(label2id)

model = TransformerNER(
    vocab_size=vocab_size,
    d_model=d_model,
    heads=num_heads,
    d_ff=ff_dim,
    num_layers=num_layers,
    max_len=MAX_LEN,
    num_labels=num_labels
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)


# Training phase
for epoch in range(15):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, mask, labels)
        loss = outputs["loss"]
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/15, Loss: {total_loss / len(train_loader):.6f}")

# Predict on test set 
model.eval()
all_preds,all_labels_eval = [], []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        mask = batch["attention_mask"].to(device)
        logits = model(input_ids, mask)["logits"].argmax(-1)

        for p, l, m in zip(logits, labels, mask):
            true_seq = [id2label[i.item()] for i, msk in zip(l, m) if i.item() != -100 and msk.item() == 1]
            pred_seq = [id2label[i.item()] for i, msk, gt in zip(p, m, l) if gt.item() != -100 and msk.item() == 1]
            all_labels_eval.append(true_seq)
            all_preds.append(pred_seq)

# compute evaluation metrics
accuracy = 100 * accuracy_score(all_labels_eval, all_preds)
f1 = 100 * f1_score(all_labels_eval, all_preds, average="weighted")

print(f"\nAccuracy: {accuracy:.4f}%")
print(f"F1_Score: {f1:.4f}%")
print(classification_report(all_labels_eval, all_preds))

Epoch 1/15, Loss: 0.322535
Epoch 2/15, Loss: 0.155475
Epoch 3/15, Loss: 0.097106
Epoch 4/15, Loss: 0.065427
Epoch 5/15, Loss: 0.047040
Epoch 6/15, Loss: 0.036565
Epoch 7/15, Loss: 0.028971
Epoch 8/15, Loss: 0.025113
Epoch 9/15, Loss: 0.020459
Epoch 10/15, Loss: 0.019079
Epoch 11/15, Loss: 0.017134
Epoch 12/15, Loss: 0.014896
Epoch 13/15, Loss: 0.013966
Epoch 14/15, Loss: 0.013130
Epoch 15/15, Loss: 0.012225

Accuracy: 98.2762%
F1_Score: 84.8302%
              precision    recall  f1-score   support

           _       0.82      0.84      0.83     10158
         dat       0.47      0.41      0.44       356
       event       0.95      0.96      0.96       390
         fac       0.93      0.97      0.95       269
         loc       0.88      0.91      0.89      3184
         mon       0.39      0.50      0.44        96
         org       0.83      0.89      0.86      3875
         pct       0.53      0.77      0.63        71
         per       0.71      0.66      0.68       910
        p