In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
import seaborn as sns
import matplotlib.pyplot as plt
from datasets import Dataset as HFDataset
import numpy as np


# Preprocessing utilities
class SequenceDataset(Dataset):
    def __init__(self, sentences, tags, word2idx, tag2idx, max_len):
        self.sentences = sentences
        self.tags = tags
        self.word2idx = word2idx
        self.tag2idx = tag2idx
        self.max_len = max_len

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        tag_seq = self.tags[idx]

        # Padding and converting to indices
        sentence_padded = [self.word2idx.get(word, self.word2idx["<UNK>"]) for word in sentence]
        tag_padded = [self.tag2idx[tag] for tag in tag_seq]

        sentence_padded = sentence_padded[:self.max_len] + [self.word2idx["<PAD>"]] * (
                    self.max_len - len(sentence_padded))
        tag_padded = tag_padded[:self.max_len] + [self.tag2idx["<PAD>"]] * (self.max_len - len(tag_padded))

        return torch.tensor(sentence_padded, dtype=torch.long), torch.tensor(tag_padded, dtype=torch.long)


# BIO Rule Enforcement
def enforce_bio_rules(tags):
    """Ensure 'B' tag always precedes 'I' tag."""
    corrected_tags = []
    prev_tag = 'O'

    for tag in tags:
        if tag == 'I' and prev_tag not in {'B', 'I'}:
            corrected_tags.append('B')
        else:
            corrected_tags.append(tag)
        prev_tag = tag

    return corrected_tags


# Model Definition
class LSTMTagger(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim, hidden_dim):
        super(LSTMTagger, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentences):
        embeddings = self.embedding(sentences)
        lstm_out, _ = self.lstm(embeddings)
        logits = self.fc(lstm_out)
        return logits


# Preprocessing function
def preprocess_data(dataset, max_len):
    sentences = dataset["tokens"]
    tags = dataset["tags"]

    # Enforce BIO rules on the data
    tags = [enforce_bio_rules(tag_seq) for tag_seq in tags]

    vocab = {word for sentence in sentences for word in sentence}
    tags_set = {tag for tag_seq in tags for tag in tag_seq}

    word2idx = {word: idx + 2 for idx, word in enumerate(vocab)}
    word2idx["<PAD>"] = 0
    word2idx["<UNK>"] = 1

    tag2idx = {tag: idx + 1 for idx, tag in enumerate(tags_set)}
    tag2idx["<PAD>"] = 0

    return sentences, tags, word2idx, tag2idx


# Training function
def train_model(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for sentences, tags in train_loader:
        sentences, tags = sentences.to(device), tags.to(device)
        optimizer.zero_grad()
        outputs = model(sentences)
        outputs = outputs.view(-1, outputs.shape[-1])
        tags = tags.view(-1)
        loss = criterion(outputs, tags)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)


# Evaluation function
def evaluate_model(model, test_loader, tag2idx, idx2tag, device):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for sentences, tags in test_loader:
            sentences, tags = sentences.to(device), tags.to(device)
            outputs = model(sentences)
            predicted = torch.argmax(outputs, dim=-1)
            for true, pred in zip(tags, predicted):
                true = true.cpu().numpy()
                pred = pred.cpu().numpy()
                for t, p in zip(true, pred):
                    if t != tag2idx["<PAD>"]:
                        y_true.append(idx2tag[t])
                        y_pred.append(idx2tag[p])

    # Classification report
    print("Classification Report:")
    print(classification_report(y_true, y_pred))

    # Accuracy and F1 Score
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average="weighted")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")

    # Generate confusion matrix
    labels = [tag for tag in tag2idx if tag != "<PAD>"]
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    # Plot confusion matrix
    plt.figure(figsize=(12, 8))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=labels, yticklabels=labels, cmap="Blues")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()


In [2]:


# Function to predict tags for a new sentence
def predict_tags(tokens):
    model.eval()
    features = [word2idx.get(word, word2idx["<UNK>"]) for word in tokens]
    features_padded = features + [word2idx["<PAD>"]] * (MAX_LEN - len(features))
    input_tensor = torch.tensor([features_padded], dtype=torch.long).to(device)

    with torch.no_grad():
        output = model(input_tensor)
        predicted_tags_idx = torch.argmax(output, dim=-1).cpu().numpy()[0]
        predicted_tags = [idx2tag[idx] for idx in predicted_tags_idx[:len(tokens)]]

    # Enforce BIO rules
    corrected_tags = enforce_bio_rules(predicted_tags)
    return list(zip(tokens, corrected_tags))


# BIO Map
BIO_MAP = {'O': 0, 'B': 1, 'I': 2}
idx2tag = {idx: tag for tag, idx in BIO_MAP.items()}
tag2idx = BIO_MAP

# Dataset preparation
train_hugging_face_dat = HFDataset.from_dict({
    "tokens": train_df["tokens"],
    "tags": train_df["tags"]
})

test_hugging_face_dat = HFDataset.from_dict({
    "tokens": test_df["tokens"],
    "tags": test_df["tags"]
})

MAX_LEN = 50
BATCH_SIZE = 32
EMBEDDING_DIM = 100
HIDDEN_DIM = 128
EPOCHS = 10

train_sentences, train_tags, word2idx, tag2idx = preprocess_data(train_hugging_face_dat, MAX_LEN)
test_sentences, test_tags, _, _ = preprocess_data(test_hugging_face_dat, MAX_LEN)

train_dataset = SequenceDataset(train_sentences, train_tags, word2idx, tag2idx, MAX_LEN)
test_dataset = SequenceDataset(test_sentences, test_tags, word2idx, tag2idx, MAX_LEN)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Model initialization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMTagger(len(word2idx), len(tag2idx), EMBEDDING_DIM, HIDDEN_DIM).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=tag2idx["<PAD>"])
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training
for epoch in range(EPOCHS):
    train_loss = train_model(model, train_loader, optimizer, criterion, device)
    print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {train_loss:.4f}")

# Evaluation
evaluate_model(model, test_loader, tag2idx, idx2tag, device)

NameError: name 'train_df' is not defined

In [None]:


# Predict example
tokens = ["This", "is", "natural", "language", "processing"]
predicted_tags = predict_tags(tokens)

for word, tag in predicted_tags:
    print(f"{word}: {tag}")
