In [141]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [142]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset,DataLoader
import numpy as np
import torch.optim as optim
import pandas as pd
import math

In [193]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"


In [205]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
class CustomDataset(Dataset):
    def __init__(self, filepath):
        self.sentences, self.ner_tags = self.load_data(filepath)

    def load_data(self, filepath):
        sentences, ner_tags = [], []
        sentence, ner_tag = [], []
        with open(filepath, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line:
                    parts = line.split("\t")
                    if len(parts) >= 3:
                        word, _, ner = parts
                        sentence.append(word)
                        ner_tag.append(ner)
                else:
                    if sentence:
                        sentences.append(sentence)
                        ner_tags.append(ner_tag)
                        sentence, ner_tag = [], []
            # In case file does not end with a newline
            if sentence:
                sentences.append(sentence)
                ner_tags.append(ner_tag)
        return sentences, ner_tags

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        word_ids = [self.word2idx.get(w, self.word2idx["<UNK>"]) for w in self.sentences[idx]]
        tag_ids = [self.tag2idx.get(t, self.tag2idx["<UNK>"]) for t in self.ner_tags[idx]]
        return word_ids, tag_ids

def collate_fn(batch):
    sentences, tags = zip(*batch)
    max_len = max(len(s) for s in sentences)

    pad_token = 0  # for both word and tag
    padded_sentences = [s + [pad_token] * (max_len - len(s)) for s in sentences]
    padded_tags = [t + [pad_token] * (max_len - len(t)) for t in tags]
    attention_masks = [[1]*len(s) + [0]*(max_len - len(s)) for s in sentences]

    return (
        torch.tensor(padded_sentences, dtype=torch.long),
        torch.tensor(padded_tags, dtype=torch.long),
        torch.tensor(attention_masks, dtype=torch.bool),
    )



In [206]:
train_path = "/content/drive/MyDrive/Datasets/train_v5.conll"
val_path = "/content/drive/MyDrive/Datasets/val_v5.conll"
test_path = "/content/drive/MyDrive/Datasets/test_v5.conll"

Load Dataset

In [172]:
#Force reinstall compatible versions
# !pip install gensim
# !pip install numpy==1.24.3 --force-reinstall
# !pip install pytorch-crf

In [197]:
from gensim.models import KeyedVectors
fasttext_model = KeyedVectors.load_word2vec_format('/content/drive/MyDrive/Datasets/cc.my.300.vec', binary=False)
# https://fasttext.cc/docs/en/crawl-vectors.html choose Burmese choose text .vec file

In [207]:
train_data = CustomDataset(train_path)
val_data = CustomDataset(val_path)
test_data = CustomDataset(test_path)

In [208]:
# Create vocabulary and tag-to-index mappings
vocab = {"<PAD>": 0, "<UNK>": 1}
ner_tag_to_ix = {"<PAD>": 0, "<UNK>": 1}  # Start with <PAD> and <UNK>

# Build vocab and tag mappings
for dataset in [train_data, val_data, test_data]:
    for sentence, ner_tags in zip(dataset.sentences, dataset.ner_tags):
        for word in sentence:
            if word not in vocab:
                vocab[word] = len(vocab)
        for ner_tag in ner_tags:
            if ner_tag not in ner_tag_to_ix:
                ner_tag_to_ix[ner_tag] = len(ner_tag_to_ix)

embedding_dim = 300  # FastText embedding dimension
embedding_matrix = []
for word in vocab:
    if word in fasttext_model:
        embedding_matrix.append(fasttext_model[word])
    elif word == "<PAD>":
        embedding_matrix.append(np.zeros(embedding_dim))
    else:
        embedding_matrix.append(np.random.normal(scale=0.6, size=(embedding_dim,)))

fasttext_embeddings = torch.tensor(embedding_matrix, dtype=torch.float32)
ix_to_ner_tag = {v: k for k, v in ner_tag_to_ix.items()}

In [200]:
for dataset in [train_data, val_data, test_data]:
    dataset.word2idx = vocab
    dataset.tag2idx = ner_tag_to_ix


In [210]:
hidden_dim = 256
vocab_size = len(vocab)
num_ner_tags = len(ner_tag_to_ix)
print(vocab_size)
print(num_ner_tags)

19304
27


In [211]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from torchcrf import CRF

class PositionalEncoding(nn.Module):
    def __init__(self, max_len, d_model):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position.float() * div_term)
        pe[:, 1::2] = torch.cos(position.float() * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        x = x + self.pe[:x.size(1)].unsqueeze(0)  # Fix: broadcast shape correctly
        return x

class FeedForward(nn.Module):
    def __init__(self, dim, expension_factor, dropout):
        super(FeedForward, self).__init__()
        hidden_dim = dim * expension_factor
        self.fc1 = nn.Linear(dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, dim)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x):
        x = self.dropout1(F.gelu(self.fc1(x)))
        return self.dropout2(self.fc2(x))

class Fourier(nn.Module):
    def __init__(self, dropout=0.3):
        super(Fourier, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.act = nn.ReLU()

    def forward(self, x):
        x = x.to(dtype=torch.float64)
        x = torch.fft.fft(x, dim=-1)
        x = torch.fft.fft(x, dim=1)
        x = self.act(x.real)
        x = x.to(dtype=torch.float32)
        x = self.dropout(x)
        return x

class FNetBlock(nn.Module):
    def __init__(self, dim, expension_factor, dropout):
        super(FNetBlock, self).__init__()
        self.fourier = Fourier(dropout)
        self.ffn = FeedForward(dim, expension_factor, dropout)
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)

    def forward(self, x):
        residual = x
        x = self.fourier(x)
        x = self.norm1(x + residual)
        residual = x
        x = self.ffn(x)
        return self.norm2(x + residual)

class FNetNER(nn.Module):
    def __init__(self, embedding_matrix, num_tags, expension_factor=4, dropout=0.3, num_layers=2):
        super(FNetNER, self).__init__()
        dim = embedding_matrix.shape[1]
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
        self.embedding_dropout = nn.Dropout(dropout)
        self.pos_en = PositionalEncoding(256, d_model=dim)

        # Stack multiple FNet blocks
        self.encoder = nn.Sequential(*[
            FNetBlock(dim, expension_factor, dropout) for _ in range(num_layers)
        ])

        self.classifier = nn.Linear(dim, num_tags)
        self.crf = CRF(num_tags, batch_first=True)

    def forward(self, x, mask=None):
        x = self.embedding(x)                  # [batch_size, seq_len, dim]
        x = self.embedding_dropout(x)
        x = self.pos_en(x)
        x = self.encoder(x)                    # Stacked FNet blocks
        emissions = self.classifier(x)         # [batch_size, seq_len, num_tags]
        return emissions

    def loss(self, emissions, tags, mask):
        return -self.crf(emissions, tags, mask=mask, reduction='mean')

    def decode(self, emissions, mask):
        return self.crf.decode(emissions, mask=mask)

In [154]:
# from sklearn.utils.class_weight import compute_class_weight
# labels = []
# for tag_seq in train_data.ner_tags:
#     labels.extend(tag_seq)


# label_ids = [tag2idx[tag] for tag in labels]
# present_classes = np.unique(label_ids)
# present_weights = compute_class_weight(class_weight='balanced', classes=present_classes, y=label_ids)

# full_weights = np.ones(len(tag2idx))
# for i, cls in enumerate(present_classes):
#     full_weights[cls] = present_weights[i]

# weights = torch.tensor(full_weights, dtype=torch.float).to(device)

In [212]:
model = FNetNER(embedding_matrix=fasttext_embeddings, num_tags=len(ner_tag_to_ix)).to(device)
optimizer = optim.Adam(model.parameters(), lr =3e-4,weight_decay=1e-5)

In [213]:
#Train process
num_epochs = 10

for epoch in range(num_epochs):
    total_loss = 0
    model.train()

    for sentences, tags, attention_mask in train_data:
        sentences = sentences.to(device)
        tags = tags.to(device)
        attention_mask = attention_mask.to(device)
        attention_mask = attention_mask.bool()

        optimizer.zero_grad()

        emissions = model(sentences)  # [B, L, num_tags]
        loss = model.loss(emissions, tags, attention_mask)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()


    val_loss = 0
    model.eval()
    with torch.no_grad():
      for sentences,tags,attention_mask in val_data:
          sentences = sentences.to(device)
          tags = tags.to(device)
          attention_mask = attention_mask.to(device)
          attention_mask = attention_mask.bool()

          emissions = model(sentences)
          loss = model.loss(emissions,tags,attention_mask)
          val_loss += loss.item()

    avg_val_loss = val_loss / len(val_data)
    avg_train_loss = total_loss / len(train_data)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")


AttributeError: 'CustomDataset' object has no attribute 'word2idx'

In [None]:
# Save only the model's parameters
torch.save(model.state_dict(), "ner_model.pth")

In [None]:
def evaluate_crf(model, dataloader, device, idx2tag):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for sentences, tags, attention_mask in dataloader:
            sentences = sentences.to(device)
            tags = tags.to(device)
            attention_mask = attention_mask.to(device)

            emissions = model(sentences)                      # [batch_size, seq_len, num_tags]
            mask = attention_mask.bool()
            pred_seq = model.decode(emissions, mask)         # List[List[int]]

            for i in range(len(tags)):
                seq_len = mask[i].sum().item()
                pred_tags = [idx2tag[idx] for idx in pred_seq[i][:seq_len]]
                true_tags = [idx2tag[idx.item()] for idx in tags[i][:seq_len]]
                all_preds.extend(pred_tags)
                all_labels.extend(true_tags)

    return all_preds, all_labels

In [None]:
from sklearn.metrics import classification_report

test_preds, test_labels = evaluate_crf(model, test_load, device, idx2tag)
print(classification_report(test_labels, test_preds))


In [None]:
# Test case
test_sentence = ["ကြက်", "ခြေ", "နီ", "မှ", "ပြော", "ရေး", "ဆို", "ခွင့်", "ရှိ", "သူ", "MattCochrane", "သည်။"]

word_ids = [word2idx.get(word, word2idx["<UNK>"]) for word in test_sentence]
input_tensor = torch.tensor([word_ids], dtype=torch.long).to(device)  # Shape: [1, seq_len]
mask = torch.ones_like(input_tensor, dtype=torch.bool).to(device)

# Predict using CRF
model.eval()
with torch.no_grad():
    emissions = model(input_tensor)  # returns emissions
    best_paths = model.decode(emissions, mask)  # List of list: [ [tag_id, tag_id, ...] ]
    predicted_ids = best_paths[0]  # since batch size is 1

# Invert the tag2idx dictionary
idx2tag = {idx: tag for tag, idx in tag2idx.items()}
predicted_tags = [idx2tag[idx] for idx in predicted_ids]

# Print results
for word, tag in zip(test_sentence, predicted_tags):
    print(f"{word}\t{tag}")


END