## CS310 Natural Language Processing
## Assignment 3 (part 2). Named Entity Recognition with Bi-LSTM

**Total points**: 30 + 20 bonus points

In this assignment, you will train a bidirectional LSTM model on the CoNLL2003 English named entity recognition task set and evaluate its performance.

For the bonus questions, submit them as separate notebook files.

### 0. Import Necessary Libraries

In [1]:
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from collections import Counter
from torch.utils.data import DataLoader, Dataset
import io
import torch.nn.functional as F
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence
import torch
from torch.utils.data import DataLoader, Dataset
import numpy as np
from collections import defaultdict
from sklearn.metrics import f1_score

#### (1) (10 points) Data preprocessing.   
#### (a) Load the train, dev, and test data; build vocabularies for words and labels (tags); defined a data loader that return batches.

In [2]:

def load_data_and_build_vocab(file_path):
    sentences, labels = [], []
    word_set, label_set = set(), set()

    with open(file_path, 'r', encoding='utf-8') as file:
        current_sentence, current_labels = [], []
        for line in file:
            line = line.strip()
            if line:
                if line.startswith("-DOCSTART-"):
                    continue
                parts = line.split()
                word, tag = parts[0], parts[-1]
                current_sentence.append(word.lower())
                current_labels.append(tag)
                word_set.add(word.lower())
                label_set.add(tag)
            else:
                if current_sentence and current_labels:
                    sentences.append(current_sentence)
                    labels.append(current_labels)
                    current_sentence, current_labels = [], []
        if current_sentence and current_labels:  # Add the last sentence
            sentences.append(current_sentence)
            labels.append(current_labels)
    
    word_vocab = {word: idx for idx, word in enumerate(sorted(word_set), start=1)}
    label_vocab = {label: idx for idx, label in enumerate(sorted(label_set), start=0)}
    word_vocab['<pad>'] = -1
    return sentences, labels, word_vocab, label_vocab

class NERDataset(Dataset):
    def __init__(self, sentences, labels, word_vocab, label_vocab):
        self.sentences = sentences
        self.labels = labels
        self.word_vocab = word_vocab
        self.label_vocab = label_vocab

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        unk_idx = self.word_vocab.get('<unk>', 0)
        words = [self.word_vocab.get(word, unk_idx) for word in self.sentences[idx]]
        
        tags = [self.label_vocab[label] for label in self.labels[idx]]
        
        min_length = min(len(words), len(tags))
        words = words[:min_length]
        tags = tags[:min_length]
        
        return torch.tensor(words, dtype=torch.long), len(words), torch.tensor(tags, dtype=torch.long)

def pad_sequences(batch):
    sentences,seq_lengths,labels = zip(*batch)
    padded_sentences = pad_sequence(sentences, batch_first=True)
    padded_labels = pad_sequence(labels, batch_first=True)
    
    for i, (sentence, label) in enumerate(zip(sentences, labels)):
        padded_sentences[i, :len(sentence)] = sentence
        padded_labels[i, :len(label)] = label
    
    return padded_sentences,torch.tensor(seq_lengths), padded_labels



train_file_path = "data/train.txt"
dev_file_path = "data/dev.txt"
test_file_path = "data/test.txt"

sentences, labels, word_vocab, label_vocab = load_data_and_build_vocab(train_file_path)
print(sentences[:5])
print(labels[:5])
print(word_vocab)
print(len(word_vocab))
print(label_vocab)
print(len(label_vocab))
dev_sentences, dev_labels, _, _ = load_data_and_build_vocab(dev_file_path)
test_sentences, test_labels, _, _ = load_data_and_build_vocab(test_file_path)

train_dataset = NERDataset(sentences, labels, word_vocab, label_vocab)
dev_dataset = NERDataset(dev_sentences, dev_labels, word_vocab, label_vocab)
test_dataset = NERDataset(test_sentences, test_labels, word_vocab, label_vocab)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=pad_sequences)
dev_loader = DataLoader(dev_dataset, batch_size=32, shuffle=False, collate_fn=pad_sequences)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=pad_sequences)


[['eu', 'rejects', 'german', 'call', 'to', 'boycott', 'british', 'lamb', '.'], ['peter', 'blackburn'], ['brussels', '1996-08-22'], ['the', 'european', 'commission', 'said', 'on', 'thursday', 'it', 'disagreed', 'with', 'german', 'advice', 'to', 'consumers', 'to', 'shun', 'british', 'lamb', 'until', 'scientists', 'determine', 'whether', 'mad', 'cow', 'disease', 'can', 'be', 'transmitted', 'to', 'sheep', '.'], ['germany', "'s", 'representative', 'to', 'the', 'european', 'union', "'s", 'veterinary', 'committee', 'werner', 'zwingmann', 'said', 'on', 'wednesday', 'consumers', 'should', 'buy', 'sheepmeat', 'from', 'countries', 'other', 'than', 'britain', 'until', 'the', 'scientific', 'advice', 'was', 'clearer', '.']]
[['B-ORG', 'O', 'B-MISC', 'O', 'O', 'O', 'B-MISC', 'O', 'O'], ['B-PER', 'I-PER'], ['B-LOC', 'O'], ['O', 'B-ORG', 'I-ORG', 'O', 'O', 'O', 'O', 'O', 'O', 'B-MISC', 'O', 'O', 'O', 'O', 'O', 'B-MISC', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'], ['B-LOC', 'O

#### (b) Load the pretrained embedding data to initialize the embedding layer in model.

In [10]:
import numpy as np
import torch

def load_glove_embeddings(path, word_to_idx, embedding_dim=100):

    embeddings = np.zeros((len(word_to_idx), embedding_dim))
    
    found_words = 0

    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            if word in word_to_idx:
                idx = word_to_idx[word]
                embeddings[idx] = np.asarray(values[1:], dtype='float32')
                found_words += 1

    print(f"Found embeddings for {found_words}/{len(word_to_idx)} words in the vocabulary.")
    return torch.tensor(embeddings)



embeddings_path = 'glove.6B.100d.txt' 
embedding_dim = 100
glove_embeddings = load_glove_embeddings(embeddings_path, word_vocab, embedding_dim)
print(glove_embeddings.size())


Found embeddings for 18415/21010 words in the vocabulary.
torch.Size([21010, 100])


### 2. Build the Model

#### (2) (10 points) Implement the “level 1” sequential classifier model, with bi-LSTM architecture.

In [4]:
class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx):
        super(BiLSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout , batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2 , output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text, text_lengths):
        embedded = self.dropout(self.embedding(text))
        
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        
        output = self.dropout(output)
        predictions = self.fc(output)
        
        return predictions


In [5]:
vocab_size = len(word_vocab)  
embedding_dim = 100  
output_dim = len(label_vocab)  
hidden_dim = 256  
n_layers = 2 
bidirectional = True  
dropout = 0.5  
pad_idx = word_vocab['<pad>']  

model = BiLSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx)


model.embedding.weight.data.copy_(glove_embeddings)
model.embedding.weight.requires_grad = False  


### 3. Train and Evaluate

In [6]:
def train(model, iterator, optimizer, criterion, device):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        texts, text_lengths, labels = batch
        texts, labels = texts.to(device), labels.to(device)
        
        optimizer.zero_grad()
        predictions = model(texts, text_lengths).squeeze(1)
        predictions = predictions.view(-1, predictions.shape[-1])  
        labels= labels.view(-1)

        loss = criterion(predictions,labels)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

from sklearn.metrics import f1_score,precision_score,recall_score
import numpy as np

def evaluate(model, iterator, criterion, device):
    model.eval()
    epoch_loss = 0
    all_predictions = []
    all_labels = []
    
    with torch.no_grad():
        for batch in iterator:
            texts, text_lengths, labels = batch
            texts, labels = texts.to(device), labels.to(device)
            
            predictions = model(texts, text_lengths)
            loss = criterion(predictions.view(-1, predictions.shape[-1]), labels.view(-1))
            epoch_loss += loss.item()
            
            _, preds = torch.max(predictions, dim=2)
            all_predictions.extend(preds.view(-1).cpu().numpy())
            all_labels.extend(labels.view(-1).cpu().numpy())

    all_labels = np.array(all_labels)
    all_predictions = np.array(all_predictions)
    non_pad_elements = all_labels != criterion.ignore_index
    filtered_labels = all_labels[non_pad_elements]
    filtered_predictions = all_predictions[non_pad_elements]

    precision = precision_score(filtered_labels, filtered_predictions, average='macro')
    recall = recall_score(filtered_labels, filtered_predictions, average='macro')
    f1=2*precision*recall/(precision+recall)
    
    return epoch_loss / len(iterator), f1



In [7]:
device = torch.device('cpu')
model = model.to(device)
criterion = torch.nn.CrossEntropyLoss(ignore_index=-1).to(device)

optimizer = torch.optim.Adam(model.parameters())

# 训练和评估
num_epochs = 5
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, optimizer, criterion, device)
    valid_loss, valid_f1 = evaluate(model, dev_loader, criterion, device)
    
    print(f'Epoch: {epoch+1}, Train Loss: {train_loss:.4f}, Val Loss: {valid_loss:.4f}, Val F1: {valid_f1:.4f}')


Epoch: 1, Train Loss: 1.3582, Val Loss: 0.9525, Val F1: 0.7367
Epoch: 2, Train Loss: 0.8817, Val Loss: 0.6232, Val F1: 0.7810
Epoch: 3, Train Loss: 0.5739, Val Loss: 0.4090, Val F1: 0.8200
Epoch: 4, Train Loss: 0.3793, Val Loss: 0.2785, Val F1: 0.8299
Epoch: 5, Train Loss: 0.2610, Val Loss: 0.1979, Val F1: 0.8495


In [8]:
torch.save(model.state_dict(), 'model_state_dict.pth')


In [9]:
model.load_state_dict(torch.load('model_state_dict.pth'))
model.eval() 
test_loss, test_f1 = evaluate(model, test_loader, criterion, device)


print(f"Test Loss: {test_loss}")
print(f"Test F1 Score: {test_f1}")


Test Loss: 0.21526903327968386
Test F1 Score: 0.8186873603989944
