In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
from sklearn.metrics import f1_score
from gensim.test.utils import common_texts
from gensim.models import Word2Vec
import gensim.downloader
import pandas as pd
from sklearn.model_selection import train_test_split
from seqeval.metrics import f1_score as seqeval_f1_score
from tqdm import tqdm
import time
import os
import multiprocessing
import numpy as np
import codecs
import re
import torch.nn.functional as F

In [2]:
num_cores = multiprocessing.cpu_count()
print(f"Number of CPU cores available: {num_cores}")

Number of CPU cores available: 16


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
pretrained_model = gensim.downloader.load('word2vec-google-news-300')

In [5]:
train_file = "eng.train"
dev_file = "eng.testa"
test_file = "eng.testb"

In [6]:
def zero_digits(s):
    """
    Replace every digit in a string by a zero.
    """
    return re.sub('\d', '0', s)

def load_sentences(path, zeros):
    """
    Load sentences. A line must contain at least a word and its tag.
    Sentences are separated by empty lines.
    """
    sentences = []
    sentence = []
    for line in codecs.open(path, 'r', 'utf8'):
        line = zero_digits(line.rstrip()) if zeros else line.rstrip()
        if not line:
            if len(sentence) > 0:
                if 'DOCSTART' not in sentence[0][0]:
                    sentences.append(sentence)
                sentence = []
        else:
            word = line.split()
            assert len(word) >= 2
            sentence.append(word)
    if len(sentence) > 0:
        if 'DOCSTART' not in sentence[0][0]:
            sentences.append(sentence)
    return sentences

In [7]:
train_sentences = load_sentences(train_file, True)
test_sentences = load_sentences(test_file, True)
dev_sentences = load_sentences(dev_file, True)

In [8]:
def create_dico(item_list):
    """
    Create a dictionary of items from a list of list of items.
    """
    assert type(item_list) is list
    dico = {}
    dico['<PAD>'] = 1
    dico['<OOV>'] = 1
    for items in item_list:
        for item in items:
            if item not in dico:
                dico[item] = 1
            else:
                dico[item] += 1
    
    return dico
def create_mapping(dico):
    """
    Create a mapping (item to ID / ID to item) from a dictionary.
    Items are ordered by decreasing frequency.
    """
    sorted_items = sorted(dico.items(), key=lambda x: (-x[1], x[0]))
    id_to_item = {i: v[0] for i, v in enumerate(sorted_items)}
    item_to_id = {v: k for k, v in id_to_item.items()}
    return item_to_id, id_to_item
def char_mapping(sentences):
    """
    Create a dictionary and mapping of characters, sorted by frequency.
    """
    chars = ["".join([w[0] for w in s]) for s in sentences]
    dico = create_dico(chars)
    char_to_id, id_to_char = create_mapping(dico)
    return dico, char_to_id, id_to_char

dict_chars, char_to_id, id_to_char = char_mapping(train_sentences)
label_to_index = {"O": 0, "B-PER": 1, "I-PER": 2, "B-LOC": 3, "I-LOC": 4, "B-ORG": 5, "I-ORG": 6, "B-MISC": 7, "I-MISC": 8}
word_to_index = pretrained_model.key_to_index
OOV_INDEX = 3000000

In [9]:
class CoNLL2003Dataset(torch.utils.data.Dataset):
    def __init__(self, sentences_list, word_to_index, char_to_id, label_to_index, max_sequence_length, max_word_length):
        super(CoNLL2003Dataset, self).__init__()

        self.sentences = []  # List to store sentences
        self.labels = []     # List to store labels
        self.char_indices = []  # List to store character-level data

        for sent in sentences_list:
            words = []
            labels = []
            char_data = []  # Store character-level data
            for word in sent:
                words.append(word[0])
                labels.append(word[-1])
                # Convert word to character indices (replace with your method to map characters to indices)
                char_indices = [char_to_id.get(char,75) for char in word[0]]
                char_data.append(char_indices)
            words_index = [int(word_to_index.get(word, OOV_INDEX)) for word in words]
            labels_index = [int(label_to_index[label]) for label in labels]

            # Ensure that char_data is of length max_word_length
            for i in range(len(char_data)):
                if len(char_data[i]) < max_word_length:
                    pad_length = max_word_length - len(char_data[i])
                    pad_left = pad_length // 2
                    pad_right = pad_length - pad_left
                    char_data[i] = [76] * pad_left + char_data[i] + [76] * pad_right
                elif len(char_data[i]) > max_word_length:
                    char_data[i] = char_data[i][:max_word_length]
            if len(words_index) < max_sequence_length:
                for i in range(len(words_index), max_sequence_length):
                    words_index.append(0)
                    labels_index.append(0)
                    char_data.append([76] * max_word_length)
            elif len(words_index) > max_sequence_length:
                words_index = words_index[:max_sequence_length]
                labels_index = labels_index[:max_sequence_length]
                char_data = char_data[:max_sequence_length]
            self.sentences.append(torch.LongTensor(words_index))
            self.labels.append(torch.LongTensor(labels_index))
            self.char_indices.append(torch.LongTensor(char_data))

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, index):
        return self.sentences[index], self.labels[index], self.char_indices[index]
train_dataset = CoNLL2003Dataset(train_sentences, word_to_index,char_to_id, label_to_index,100,15)
dev_dataset = CoNLL2003Dataset(dev_sentences, word_to_index,char_to_id, label_to_index,100,15)
test_dataset = CoNLL2003Dataset(test_sentences, word_to_index,char_to_id, label_to_index,100,15)

In [10]:
pretrained_embeddings =pretrained_model.vectors
pretrained_embeddings = np.append(pretrained_embeddings, [[0] *300], axis=0)
pretrained_embeddings = torch.FloatTensor(pretrained_embeddings)

In [11]:
class CharCNN(nn.Module):
    def __init__(self, char_vocab_size, char_embedding_dim, char_cnn_output_dim, kernel_sizes):
        super(CharCNN, self).__init__()

        self.char_embedding = nn.Embedding(char_vocab_size, char_embedding_dim)

        # Define convolutional layers with different kernel sizes
        self.conv_layers = nn.ModuleList([nn.Conv2d(1, char_cnn_output_dim, (k, char_embedding_dim)) for k in kernel_sizes])

    def forward(self, char_sequences):
        batch_size, sequence_length, word_length = char_sequences.size()

        # Embed characters
        char_embeddings = self.char_embedding(char_sequences)

        # Reshape the embeddings to have 1 channel
        char_embeddings = char_embeddings.view(batch_size * sequence_length, 1, word_length, -1)

        # Apply convolutional layers
        conv_outputs = [conv(char_embeddings).squeeze(3) for conv in self.conv_layers]

        # Max-pooling over time
        pooled_outputs = [nn.functional.max_pool1d(conv, conv.size(2)).squeeze(2) for conv in conv_outputs]

        # Concatenate the outputs from different kernel sizes
        char_cnn_output = torch.cat(pooled_outputs, 1)

        # Reshape the output to be 2D (batch_size, char_cnn_output_dim)
        char_cnn_output = char_cnn_output.view(batch_size, sequence_length, -1)

        return char_cnn_output

In [12]:
# Define the NERModel class
class NERModel(nn.Module):
    def __init__(self, embedding_dim, final_vector_dim, pretrained_embeddings):
        super(NERModel, self).__init__()
        self.embedding_layer = nn.Embedding.from_pretrained(pretrained_embeddings, freeze=True)
        self.bilstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(2 * hidden_dim, final_vector_dim)
        
    def forward(self, inputs):
        embeddings = self.embedding_layer(inputs)
        lstm_out, _ = self.bilstm(embeddings)
        final_vector_representations = self.fc(lstm_out)

        return final_vector_representations

In [13]:
char_vocab_size = len(dict_chars.keys())  # Adjust based on your character vocabulary size
char_embedding_dim = 15

# Hyperparameters
batch_size = 4
num_epochs = 10
learning_rate = 0.001
patience = 10
batch_size = 64
hidden_dim = 512  # Hidden dimension for the BiLSTM layer
num_layers = 1  # Number of BiLSTM layers


num_filters = 64
kernel_sizes = [3, 4, 5]
char_cnn_output_dim = num_filters * len(kernel_sizes)

# Mini-batch data loaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = torch.utils.data.DataLoader(dev_dataset, batch_size=batch_size )
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

char_cnn = CharCNN(char_vocab_size, char_embedding_dim, num_filters, kernel_sizes)
model = NERModel(embedding_dim=300, pretrained_embeddings=pretrained_embeddings,final_vector_dim=len(label_to_index.keys())).to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early stopping setup
best_dev_loss = 100
best_model_state = model.state_dict()

In [14]:
def evaluate_model(model, dataloader, device):
    model.eval()
    predictions = []
    labels = []
    losses = []

    with torch.no_grad():
        for inputs, true_labels, char_embeddings in dataloader:
            inputs, true_labels, char_embeddings = inputs.to(device), true_labels.to(device), char_embeddings.to(device)
            outputs = model(inputs)
            loss = nn.functional.cross_entropy(outputs.view(-1, outputs.size(-1)), true_labels.view(-1))
            losses.append(loss.item())
            _, predicted = torch.max(outputs, 2)  # Use max to get predicted labels
            predictions.extend(predicted.tolist())
            labels.extend(true_labels.tolist())

        converted_predictions = []
        converted_labels = []
        for sent in predictions:
            converted_sent = [list(label_to_index.keys())[list(label_to_index.values()).index(int(i))] for i in sent]
            converted_predictions.append(converted_sent)

        for sent in labels:
            converted_sent = [list(label_to_index.keys())[list(label_to_index.values()).index(int(i))] for i in sent]
            converted_labels.append(converted_sent)

        f1 = seqeval_f1_score(converted_labels, converted_predictions)
        loss = sum(losses) / len(losses)

    return loss, f1

for epoch in range(num_epochs):
    model.train()
    progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}', unit='batch', leave=False)
    losses = []

    for inputs, labels, char_embeddings in progress_bar:
        inputs, labels, char_embeddings = inputs.to(device), labels.to(device), char_embeddings.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = nn.functional.cross_entropy(outputs.view(-1, outputs.size(-1)), labels.view(-1))
        losses.append(loss.item())
        loss.backward()
        optimizer.step()
        progress_bar.set_postfix(loss=loss.item())

    train_loss = sum(losses) / len(losses)

    dev_loss, dev_f1 = evaluate_model(model, dev_loader, device)

    print(f'Epoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {dev_loss:.4f}, Val F1: {dev_f1:.4f}')

    if dev_loss < best_dev_loss:
        best_dev_loss = dev_loss
        torch.save(model, 'best_model_cnn_bilstm.pth')


                                                                                                                       

Epoch 1, Train Loss: 0.1081, Val Loss: 0.0251, Val F1: 0.7202


                                                                                                                       

Epoch 2, Train Loss: 0.0195, Val Loss: 0.0191, Val F1: 0.7879


                                                                                                                       

Epoch 3, Train Loss: 0.0155, Val Loss: 0.0162, Val F1: 0.8210


                                                                                                                       

Epoch 4, Train Loss: 0.0131, Val Loss: 0.0139, Val F1: 0.8505


                                                                                                                       

Epoch 5, Train Loss: 0.0113, Val Loss: 0.0136, Val F1: 0.8542


                                                                                                                       

Epoch 6, Train Loss: 0.0100, Val Loss: 0.0126, Val F1: 0.8698


                                                                                                                       

Epoch 7, Train Loss: 0.0088, Val Loss: 0.0126, Val F1: 0.8694


                                                                                                                       

Epoch 8, Train Loss: 0.0078, Val Loss: 0.0123, Val F1: 0.8769


                                                                                                                       

Epoch 9, Train Loss: 0.0068, Val Loss: 0.0123, Val F1: 0.8792


                                                                                                                       

Epoch 10, Train Loss: 0.0056, Val Loss: 0.0128, Val F1: 0.8753


In [15]:
model = torch.load('best_model_cnn_bilstm.pth')
dev_f1, dev_loss = evaluate_model(model, dev_loader, device)
test_f1, test_loss = evaluate_model(model, test_loader, device)
print(f'Test F1: {test_f1:.4f}, Test Loss: {test_loss:.4f}, Val Loss: {dev_loss:.4f}, Val F1: {dev_f1:.4f}')

Test F1: 0.0169, Test Loss: 0.8202, Val Loss: 0.8769, Val F1: 0.0123
