In [1]:
import time
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [2]:
# Định nghĩa các hằng số
PAD_TOKEN = "<PAD>"
UNK_TOKEN = "<UNK>"
PAD_IDX = 0  # Chỉ số cho padding
UNK_IDX = 1  # Chỉ số cho UNK


def load_conllu(file_path):
    """Đọc dữ liệu từ file .conllu, trả về danh sách các câu."""
    sentences = []
    current_sentence = []

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                # Dòng trống -> kết thúc câu
                if current_sentence:
                    sentences.append(current_sentence)
                    current_sentence = []
            elif not line.startswith('#'):
                parts = line.split('\t')
                if parts and parts[0].isdigit():  # Đảm bảo là dòng chứa token (parts[0] là ID)
                    word = parts[1]
                    upos_tag = parts[3]
                    current_sentence.append((word, upos_tag))

    if current_sentence:
        sentences.append(current_sentence)

    return sentences


# Tải Dữ liệu
train_data = load_conllu('/home/dangth2004/Programming/Natural-Language-Processing/data/UD_English-EWT/en_ewt-ud-train.conllu')
dev_data = load_conllu('/home/dangth2004/Programming/Natural-Language-Processing/data/UD_English-EWT/en_ewt-ud-dev.conllu')

print(f"Số câu tập Train: {len(train_data)}")
print(f"Ví dụ câu đầu tiên: {train_data[0][:5]}...")

Số câu tập Train: 12544
Ví dụ câu đầu tiên: [('Al', 'PROPN'), ('-', 'PUNCT'), ('Zaman', 'PROPN'), (':', 'PUNCT'), ('American', 'ADJ')]...


In [3]:
def build_vocab(data):
    """Xây dựng từ điển word_to_ix và tag_to_ix."""
    word_to_ix = {PAD_TOKEN: PAD_IDX, UNK_TOKEN: UNK_IDX}
    tag_to_ix = {PAD_TOKEN: PAD_IDX}  # Padding tag cũng dùng 0

    all_words = set()
    all_tags = set()
    for sentence in data:
        for word, tag in sentence:
            all_words.add(word)
            all_tags.add(tag)

    for word in sorted(list(all_words)):
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)

    for tag in sorted(list(all_tags)):
        if tag not in tag_to_ix:
            tag_to_ix[tag] = len(tag_to_ix)

    return word_to_ix, tag_to_ix


word_to_ix, tag_to_ix = build_vocab(train_data)
ix_to_tag = {v: k for k, v in tag_to_ix.items()}

print(f"Kích thước Từ điển Từ (Word Vocab): {len(word_to_ix)}")
print(f"Kích thước Từ điển Nhãn (Tag Vocab): {len(tag_to_ix)}")

Kích thước Từ điển Từ (Word Vocab): 19675
Kích thước Từ điển Nhãn (Tag Vocab): 18


In [4]:
class POSDataset(Dataset):
    def __init__(self, data, word_to_ix, tag_to_ix):
        self.data = data
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sentence = self.data[idx]

        word_indices = [self.word_to_ix.get(word, UNK_IDX) for word, tag in sentence]
        tag_indices = [self.tag_to_ix[tag] for word, tag in sentence]

        return torch.tensor(word_indices, dtype=torch.long), torch.tensor(tag_indices, dtype=torch.long)

In [5]:
def collate_fn(batch):
    """
    Hàm đệm (pad) các câu và nhãn trong cùng một batch.
    Sử dụng torch.nn.utils.rnn.pad_sequence.
    """
    # Tách X (sentence_indices) và Y (tag_indices)
    sentences, tags = zip(*batch)

    # Đệm các tensor X (sentences) và Y (tags)
    # padding_value=PAD_IDX (0)
    sentences_padded = nn.utils.rnn.pad_sequence(sentences, batch_first=True, padding_value=PAD_IDX)
    tags_padded = nn.utils.rnn.pad_sequence(tags, batch_first=True, padding_value=PAD_IDX)

    return sentences_padded, tags_padded


BATCH_SIZE = 32

train_dataset = POSDataset(train_data, word_to_ix, tag_to_ix)
dev_dataset = POSDataset(dev_data, word_to_ix, tag_to_ix)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

print(f"\nSố lượng batch trong Train Loader: {len(train_loader)}")


Số lượng batch trong Train Loader: 392


In [6]:
class SimpleRNNForTokenClassification(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(SimpleRNNForTokenClassification, self).__init__()

        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_dim,
            padding_idx=PAD_IDX  # PyTorch sẽ bỏ qua chỉ số này khi tính embedding
        )

        self.rnn = nn.RNN(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            batch_first=True  # Input có dạng (batch_size, seq_len, features)
        )

        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        embedded = self.embedding(text)
        rnn_output, hidden_state = self.rnn(embedded)
        batch_size, seq_len, hidden_dim = rnn_output.shape

        rnn_output_reshaped = rnn_output.reshape(-1, hidden_dim)
        prediction = self.fc(rnn_output_reshaped)

        return prediction

In [7]:
# Config
VOCAB_SIZE = len(word_to_ix)
EMBEDDING_DIM = 100
HIDDEN_DIM = 128
OUTPUT_DIM = len(tag_to_ix)
NUM_EPOCHS = 10
LEARNING_RATE = 1e-3

# Khởi tạo mô hình và device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleRNNForTokenClassification(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Sử dụng PAD_IDX (0) để CrossEntropyLoss bỏ qua các token đệm
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX).to(device)

print(model)

SimpleRNNForTokenClassification(
  (embedding): Embedding(19675, 100, padding_idx=0)
  (rnn): RNN(100, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=18, bias=True)
)


In [8]:
def evaluate(model, data_loader, device):
    """Tính độ chính xác trên tập dữ liệu dev/test."""
    model.eval()
    total_acc = 0

    with torch.no_grad():  # Tắt tính toán gradient
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            predictions = model(X_batch)
            total_acc += calculate_accuracy(predictions.cpu(), y_batch.cpu())

    return total_acc / len(data_loader)

In [9]:
def calculate_accuracy(y_pred, y_true):
    """Tính độ chính xác, bỏ qua các token đệm."""
    y_true_flat = y_true.view(-1)
    max_preds = y_pred.argmax(dim=1)

    # Tạo mask cho các token không phải PAD
    non_pad_elements = (y_true_flat != PAD_IDX).nonzero(as_tuple=False)

    # Chỉ tính toán trên các phần tử không phải PAD
    correct = (max_preds[non_pad_elements] == y_true_flat[non_pad_elements]).sum()
    total = non_pad_elements.size(0)

    return correct.item() / total if total > 0 else 0.0


for epoch in range(NUM_EPOCHS):
    model.train()
    epoch_loss = 0
    epoch_acc = 0

    start_time = time.perf_counter()
    for X_batch, y_batch in train_loader:
        # Chuyển dữ liệu sang device
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch.view(-1))

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += calculate_accuracy(predictions.detach().cpu(), y_batch.detach().cpu())

    end_time = time.perf_counter()
    train_time = end_time - start_time

    avg_train_loss = epoch_loss / len(train_loader)
    avg_train_acc = epoch_acc / len(train_loader)
    avg_dev_acc = evaluate(model, dev_loader, device)

    print(
        f'Epoch: {epoch + 1:02} | Train Loss: {avg_train_loss:.3f} | Train Acc: {avg_train_acc:.3f} | Dev Acc: {avg_dev_acc:.3f} | Training time: {train_time:.2f}s')

Epoch: 01 | Train Loss: 1.125 | Train Acc: 0.658 | Dev Acc: 0.747 | Training time: 1.37s
Epoch: 02 | Train Loss: 0.623 | Train Acc: 0.802 | Dev Acc: 0.801 | Training time: 1.13s
Epoch: 03 | Train Loss: 0.468 | Train Acc: 0.851 | Dev Acc: 0.825 | Training time: 1.13s
Epoch: 04 | Train Loss: 0.371 | Train Acc: 0.882 | Dev Acc: 0.839 | Training time: 1.11s
Epoch: 05 | Train Loss: 0.303 | Train Acc: 0.903 | Dev Acc: 0.848 | Training time: 1.06s
Epoch: 06 | Train Loss: 0.252 | Train Acc: 0.920 | Dev Acc: 0.854 | Training time: 1.11s
Epoch: 07 | Train Loss: 0.211 | Train Acc: 0.933 | Dev Acc: 0.856 | Training time: 1.10s
Epoch: 08 | Train Loss: 0.178 | Train Acc: 0.943 | Dev Acc: 0.855 | Training time: 1.09s
Epoch: 09 | Train Loss: 0.150 | Train Acc: 0.952 | Dev Acc: 0.857 | Training time: 1.16s
Epoch: 10 | Train Loss: 0.128 | Train Acc: 0.960 | Dev Acc: 0.860 | Training time: 1.14s


In [10]:
def predict_sentence(sentence_str, model, word_to_ix, ix_to_tag, device):
    """Nhận vào câu dạng chuỗi, trả về (từ, nhãn dự đoán)."""
    model.eval()
    tokens = sentence_str.lower().split()
    indices = [word_to_ix.get(token, UNK_IDX) for token in tokens]
    input_tensor = torch.tensor([indices], dtype=torch.long).to(device)

    with torch.no_grad():
        predictions = model(input_tensor)  # (seq_len, output_dim) vì batch_size=1
        predicted_tag_indices = predictions.argmax(dim=1).cpu().numpy()

    # Chuyển chỉ số thành nhãn
    predicted_tags = [ix_to_tag[idx] for idx in predicted_tag_indices]
    print(predicted_tags)
    return list(zip(tokens, predicted_tags))


sentence = "i love natural language processing"
prediction_result = predict_sentence(sentence, model, word_to_ix, ix_to_tag, device)

['PRON', 'VERB', 'ADJ', 'NOUN', 'NOUN']
