In [15]:
# Install necessary libraries
!pip install transformers datasets scikit-learn matplotlib torch nltk

# Import required libraries
import numpy as np
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
import random
from torch.utils.data import DataLoader, Dataset, Subset, TensorDataset
from torch.nn.utils.rnn import pad_sequence
from torch.optim import Adam
from transformers import BertModel, BertTokenizer, BertForTokenClassification, Trainer, TrainingArguments, DataCollatorForTokenClassification, get_linear_schedule_with_warmup
from datasets import load_dataset
from torch.optim import Adam, AdamW
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt
from nltk.corpus import wordnet
from torch.cuda.amp import autocast, GradScaler
import json
import re
from sklearn.tree import DecisionTreeClassifier
import copy
import warnings
from sklearn.exceptions import UndefinedMetricWarning

# Set device to CPU
device = torch.device("cpu")




**Baseline For Snip Dataset**

In [16]:
# Suppress UndefinedMetricWarning
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)


# Set device to CPU
device = torch.device("cpu")

# Load datasets
snips_dataset = load_dataset("snips_built_in_intents")

# Split the train dataset into train and validation sets
train_dataset = snips_dataset["train"]
train_indices, val_indices = train_test_split(list(range(len(train_dataset))), test_size=0.2, random_state=42)

# Create custom datasets for training and validation
train_split = train_dataset.select(train_indices)
val_split = train_dataset.select(val_indices)

# Initialize BERT model for embeddings
bert_model = BertModel.from_pretrained("bert-base-uncased")
bert_model.to(device)
bert_model.eval()  # Set BERT to evaluation mode

class SnipsDataset(Dataset):
    def __init__(self, dataset, tokenizer, max_len=128):
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        data = self.dataset[index]
        input_text = data["text"]
        label = data["label"]

        inputs = self.tokenizer(
            input_text,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_tensors="pt"
        )

        input_ids = inputs["input_ids"].squeeze()
        attention_mask = inputs["attention_mask"].squeeze()

        with torch.no_grad():
            embeddings = bert_model(input_ids.unsqueeze(0).to(device), attention_mask=attention_mask.unsqueeze(0).to(device)).last_hidden_state.squeeze(0)

        return {
            "embeddings": embeddings,
            "label": torch.tensor(label, dtype=torch.long),
            "text": input_text
        }

def custom_collate_fn(batch):
    embeddings = torch.stack([item['embeddings'] for item in batch])
    labels = torch.stack([item['label'] for item in batch])
    texts = [item['text'] for item in batch]

    return {
        'embeddings': embeddings,
        'label': labels,
        'text': texts
    }

def create_data_loader(dataset, tokenizer, batch_size=16):
    ds = SnipsDataset(dataset, tokenizer)
    return DataLoader(ds, batch_size=batch_size, shuffle=True, collate_fn=custom_collate_fn)

snips_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
train_data_loader = create_data_loader(train_split, snips_tokenizer)
val_data_loader = create_data_loader(val_split, snips_tokenizer)

# 1. Rule-Based Baseline
def rule_based_response(input_text):
    rules = {
        "weather": "It looks like it will be sunny.",
        "location": "Your current location is being shared.",
        "restaurant": "Per Se is quite crowded right now.",
        "itinerary": "Here is the fastest itinerary for your trip.",
        "photos": "Here are some photos of Mondrian Soho."
    }
    for keyword, response in rules.items():
        if keyword in input_text.lower():
            return response
    return "I'm sorry, I don't understand your request."

# Map rule-based responses to numerical labels
rule_based_mapping = {
    "It looks like it will be sunny.": 0,
    "Your current location is being shared.": 1,
    "Per Se is quite crowded right now.": 2,
    "Here is the fastest itinerary for your trip.": 3,
    "Here are some photos of Mondrian Soho.": 4,
    "I'm sorry, I don't understand your request.": 5  # Assuming 5 is an appropriate label for unrecognized intents
}

def evaluate_rule_based(data_loader):
    all_preds = []
    all_labels = []
    all_texts = []

    for d in data_loader:
        texts = d["text"]
        labels = d["label"].detach().cpu().numpy().flatten()

        preds = [rule_based_mapping.get(rule_based_response(text), 5) for text in texts]  # Convert to numerical labels
        all_preds.extend(preds)
        all_labels.extend(labels)
        all_texts.extend(texts)

    return all_texts, all_preds, all_labels

# 2. Retrieval-Based Baseline
def prepare_tfidf_vectorizer(train_texts):
    vectorizer = TfidfVectorizer().fit(train_texts)
    return vectorizer

def retrieval_based_response(input_text, vectorizer, train_texts, train_labels):
    input_vector = vectorizer.transform([input_text])
    train_vectors = vectorizer.transform(train_texts)

    cosine_similarities = cosine_similarity(input_vector, train_vectors).flatten()
    most_similar_idx = cosine_similarities.argmax()

    return train_labels[most_similar_idx]

def evaluate_retrieval_based(data_loader, vectorizer, train_texts, train_labels):
    all_preds = []
    all_labels = []
    all_texts = []

    for d in data_loader:
        texts = d["text"]
        labels = d["label"].detach().cpu().numpy().flatten()

        preds = [retrieval_based_response(text, vectorizer, train_texts, train_labels) for text in texts]
        all_preds.extend(preds)
        all_labels.extend(labels)
        all_texts.extend(texts)

    return all_texts, all_preds, all_labels

# Prepare vectorizer and evaluate (assuming you have train_split and test_data_loader)
train_texts = [example["text"] for example in train_split]
train_labels = [example["label"] for example in train_split]
vectorizer = prepare_tfidf_vectorizer(train_texts)

# 3. LSTM-Based Seq2Seq Model
class LSTMSeq2Seq(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=1):
        super(LSTMSeq2Seq, self).__init__()
        self.encoder = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.LSTM(hidden_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Assuming x is of shape (batch_size, seq_length, input_dim)
        batch_size, seq_length, _ = x.size()
        _, (hidden, _) = self.encoder(x)
        hidden = hidden.repeat(seq_length, 1, 1).transpose(0, 1)  # Repeat hidden state for each time step
        output, _ = self.decoder(hidden)
        output = self.fc(output[:, -1, :])  # Use the last output for classification
        return output

input_dim = 768  # Size of input embeddings from BERT
hidden_dim = 256  # Size of hidden layer
output_dim = len(snips_dataset["train"].unique("label"))  # Number of output classes

lstm_model = LSTMSeq2Seq(input_dim, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(lstm_model.parameters(), lr=1e-3)

def train_lstm_model(model, data_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []
    for batch in data_loader:
        embeddings, labels = batch["embeddings"].to(device), batch["label"].to(device)
        optimizer.zero_grad()
        outputs = model(embeddings)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        preds = torch.argmax(outputs, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    return avg_loss, accuracy, precision, recall, f1

def evaluate_lstm_model(model, data_loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in data_loader:
            embeddings, labels = batch["embeddings"].to(device), batch["label"].to(device)
            outputs = model(embeddings)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=-1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    return avg_loss, accuracy, precision, recall, f1

# Early stopping class
class EarlyStopping:
    def __init__(self, patience=3, min_delta=0.01):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

train_data_loader_lstm = train_data_loader
val_data_loader_lstm = val_data_loader

# Training loop for LSTM
print("Training LSTM-Based Seq2Seq Model")
EPOCHS = 20
early_stopping = EarlyStopping(patience=3, min_delta=0.01)
for epoch in range(EPOCHS):
    train_loss, train_accuracy, train_precision, train_recall, train_f1 = train_lstm_model(lstm_model, train_data_loader_lstm, criterion, optimizer, device)
    val_loss, val_accuracy, val_precision, val_recall, val_f1 = evaluate_lstm_model(lstm_model, val_data_loader_lstm, criterion, device)
    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train Precision: {train_precision:.4f}, Train Recall: {train_recall:.4f}, Train F1: {train_f1:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1: {val_f1:.4f}")
    early_stopping(val_loss)
    if early_stopping.early_stop:
        print("Early stopping")
        break

# Evaluate on the test set
test_data_loader_lstm = val_data_loader  # Using val_data_loader as test data_loader

test_loss, test_accuracy, test_precision, test_recall, test_f1 = evaluate_lstm_model(lstm_model, test_data_loader_lstm, criterion, device)
print(f"LSTM-Based Seq2Seq Model - Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}, Test Precision: {test_precision:.4f}, Test Recall: {test_recall:.4f}, Test F1: {test_f1:.4f}")

# Evaluate rule-based baseline
print("\nEvaluating Rule-Based Baseline")
texts, preds, labels = evaluate_rule_based(val_data_loader)
print("Rule-Based Baseline Results:")
accuracy = accuracy_score(labels, preds)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")
for i in range(5):
    print(f"Text: {texts[i]}")
    print(f"Prediction: {preds[i]}")
    print(f"Label: {labels[i]}")
    print()

# Evaluate retrieval-based baseline
print("\nEvaluating Retrieval-Based Baseline")
texts, preds, labels = evaluate_retrieval_based(val_data_loader, vectorizer, train_texts, train_labels)
print("Retrieval-Based Baseline Results:")
accuracy = accuracy_score(labels, preds)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")
for i in range(5):
    print(f"Text: {texts[i]}")
    print(f"Prediction: {preds[i]}")
    print(f"Label: {labels[i]}")
    print()


Downloading builder script:   0%|          | 0.00/4.95k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/6.56k [00:00<?, ?B/s]

The repository for snips_built_in_intents contains custom code which must be executed to correctly load the dataset. You can inspect the repository content at https://hf.co/datasets/snips_built_in_intents.
You can avoid this prompt in future by passing the argument `trust_remote_code=True`.

Do you wish to run the custom code? [y/N] y


Downloading data:   0%|          | 0.00/172k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/328 [00:00<?, ? examples/s]

Training LSTM-Based Seq2Seq Model
Epoch 1, Train Loss: 1.9116, Train Accuracy: 0.3244, Train Precision: 0.2418, Train Recall: 0.3244, Train F1: 0.2560
Val Loss: 1.6958, Val Accuracy: 0.5303, Val Precision: 0.4916, Val Recall: 0.5303, Val F1: 0.4791
Epoch 2, Train Loss: 1.3131, Train Accuracy: 0.5344, Train Precision: 0.5612, Train Recall: 0.5344, Train F1: 0.4707
Val Loss: 0.9695, Val Accuracy: 0.7273, Val Precision: 0.6657, Val Recall: 0.7273, Val F1: 0.6841
Epoch 3, Train Loss: 0.8402, Train Accuracy: 0.7595, Train Precision: 0.7575, Train Recall: 0.7595, Train F1: 0.7550
Val Loss: 1.0943, Val Accuracy: 0.6667, Val Precision: 0.6350, Val Recall: 0.6667, Val F1: 0.6239
Epoch 4, Train Loss: 0.6436, Train Accuracy: 0.7519, Train Precision: 0.7294, Train Recall: 0.7519, Train F1: 0.7199
Val Loss: 0.6581, Val Accuracy: 0.7727, Val Precision: 0.8708, Val Recall: 0.7727, Val F1: 0.7771
Epoch 5, Train Loss: 0.4721, Train Accuracy: 0.8206, Train Precision: 0.8290, Train Recall: 0.8206, Train 

Baseline for MultiWOZ Dataset

In [17]:


# Load the MultiWOZ dataset
multiwoz_dataset = load_dataset("multi_woz_v22")

# Prepare data for the decision tree model
class SimpleDataset(Dataset):
    def __init__(self, dataset, tokenizer, act2idx, max_len=128):
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.act2idx = act2idx
        self.max_len = max_len
        self.data = self._prepare_data()

    def _prepare_data(self):
        data = []
        print("Starting to prepare data...")
        total_turns = sum(len(dialogue["turns"]["utterance"]) for dialogue in self.dataset)
        for dialogue in self.dataset:
            for turn_id, utterance in enumerate(dialogue["turns"]["utterance"]):
                dialogue_acts = []
                if "dialogue_acts" in dialogue["turns"] and len(dialogue["turns"]["dialogue_acts"]) > turn_id:
                    turn_dialogue_acts = dialogue["turns"]["dialogue_acts"][turn_id]["dialog_act"]
                    if "act_type" in turn_dialogue_acts and len(turn_dialogue_acts["act_type"]) > 0:
                        for act_type in turn_dialogue_acts["act_type"]:
                            dialogue_acts.append(act_type)

                if dialogue_acts:
                    label = self.act2idx.get(dialogue_acts[0], -1)
                else:
                    label = -1

                if label != -1:
                    data.append({"utterance": utterance, "label": label})
        print("Data preparation completed.")
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        data = self.data[index]
        input_text = data["utterance"]
        label = data["label"]

        inputs = self.tokenizer(
            input_text,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_tensors="pt"
        )

        input_ids = inputs["input_ids"].squeeze()
        attention_mask = inputs["attention_mask"].squeeze()

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "label": torch.tensor(label, dtype=torch.long),
            "text": input_text
        }

def collate_fn(batch):
    input_ids = torch.nn.utils.rnn.pad_sequence([item['input_ids'] for item in batch], batch_first=True, padding_value=0)
    attention_masks = torch.nn.utils.rnn.pad_sequence([item['attention_mask'] for item in batch], batch_first=True, padding_value=0)
    labels = torch.tensor([item['label'] for item in batch])
    texts = [item['text'] for item in batch]
    return {'input_ids': input_ids, 'attention_mask': attention_masks, 'label': labels, 'text': texts}

# Create data loaders
all_dialogue_acts = set()
for dialogue in multiwoz_dataset["train"]:
    if "turns" in dialogue:
        for turn_id in range(len(dialogue["turns"]["utterance"])):
            if "dialogue_acts" in dialogue["turns"] and len(dialogue["turns"]["dialogue_acts"]) > turn_id:
                turn_dialogue_acts = dialogue["turns"]["dialogue_acts"][turn_id]["dialog_act"]
                if "act_type" in turn_dialogue_acts and len(turn_dialogue_acts["act_type"]) > 0:
                    for act_type in turn_dialogue_acts["act_type"]:
                        all_dialogue_acts.add(act_type)
act2idx = {act: idx for idx, act in enumerate(all_dialogue_acts)}

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert_model = BertModel.from_pretrained("bert-base-uncased")

# Subset the dataset for quick testing
train_dataset_full = SimpleDataset(multiwoz_dataset["train"], tokenizer, act2idx)
val_dataset_full = SimpleDataset(multiwoz_dataset["validation"], tokenizer, act2idx)
test_dataset_full = SimpleDataset(multiwoz_dataset["test"], tokenizer, act2idx)

train_subset = Subset(train_dataset_full, range(500))  # Increased subset size
val_subset = Subset(val_dataset_full, range(200))      # Increased subset size
test_subset = Subset(test_dataset_full, range(200))    # Increased subset size

train_loader = DataLoader(train_subset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_subset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_subset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Train and evaluate decision tree model
def train_evaluate_decision_tree(train_loader, val_loader, test_loader):
    # Prepare data for the decision tree
    train_texts = []
    train_labels = []
    for batch in train_loader:
        train_texts.extend(batch["text"])
        train_labels.extend(batch["label"].numpy())

    val_texts = []
    val_labels = []
    for batch in val_loader:
        val_texts.extend(batch["text"])
        val_labels.extend(batch["label"].numpy())

    test_texts = []
    test_labels = []
    for batch in test_loader:
        test_texts.extend(batch["text"])
        test_labels.extend(batch["label"].numpy())

    # Vectorize the text data
    vectorizer = TfidfVectorizer()
    X_train = vectorizer.fit_transform(train_texts)
    X_val = vectorizer.transform(val_texts)
    X_test = vectorizer.transform(test_texts)

    # Train the decision tree model
    clf = DecisionTreeClassifier()
    clf.fit(X_train, train_labels)

    # Evaluate on train set
    train_preds = clf.predict(X_train)
    train_accuracy = accuracy_score(train_labels, train_preds)
    train_precision, train_recall, train_f1, _ = precision_recall_fscore_support(train_labels, train_preds, average='weighted', zero_division=0)

    print(f"Decision Tree Model Training Results:\nAccuracy={train_accuracy:.4f}, Precision={train_precision:.4f}, Recall={train_recall:.4f}, F1={train_f1:.4f}")

    # Evaluate on validation set
    val_preds = clf.predict(X_val)
    val_accuracy = accuracy_score(val_labels, val_preds)
    val_precision, val_recall, val_f1, _ = precision_recall_fscore_support(val_labels, val_preds, average='weighted', zero_division=0)

    print(f"Decision Tree Model Validation Results:\nAccuracy={val_accuracy:.4f}, Precision={val_precision:.4f}, Recall={val_recall:.4f}, F1={val_f1:.4f}")

    # Evaluate on test set
    test_preds = clf.predict(X_test)
    test_accuracy = accuracy_score(test_labels, test_preds)
    test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(test_labels, test_preds, average='weighted', zero_division=0)

    print(f"Decision Tree Model Test Results:\nAccuracy={test_accuracy:.4f}, Precision={test_precision:.4f}, Recall={test_recall:.4f}, F1={test_f1:.4f}")

    return clf, vectorizer

# Implement the FNN model for DST and DAR
class FeedforwardNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FeedforwardNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

# Define training loop for FNN model
def train_nn_model(model, data_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    all_labels = []
    all_preds = []
    for batch in data_loader:
        inputs, attention_mask, labels = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['label'].to(device)

        # Get BERT embeddings
        with torch.no_grad():
            embeddings = bert_model(input_ids=inputs, attention_mask=attention_mask).last_hidden_state[:, 0, :]

        optimizer.zero_grad()
        outputs = model(embeddings)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        preds = outputs.argmax(dim=1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted', zero_division=0)
    return avg_loss, accuracy, precision, recall, f1

# Define evaluation function for FNN model
def evaluate_nn_model(model, data_loader, criterion, device):
    model.eval()
    total_loss = 0
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for batch in data_loader:
            inputs, attention_mask, labels = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['label'].to(device)

            # Get BERT embeddings
            embeddings = bert_model(input_ids=inputs, attention_mask=attention_mask).last_hidden_state[:, 0, :]

            outputs = model(embeddings)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = outputs.argmax(dim=1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted', zero_division=0)
    return avg_loss, accuracy, precision, recall, f1

# Early stopping class
class EarlyStopping:
    def __init__(self, patience=3, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_score = None
        self.early_stop = False
        self.counter = 0

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        # Save the model here
        pass

# Training example
input_dim = 768  # BERT embedding size
hidden_dim = 256  # Reduced hidden size for the feedforward network
output_dim = len(act2idx)  # Number of dialogue acts
nn_model = FeedforwardNN(input_dim, hidden_dim, output_dim)

optimizer = optim.Adam(nn_model.parameters(), lr=5e-4)  # Adjusted learning rate
criterion = nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
nn_model = nn_model.to(device)
bert_model = bert_model.to(device)
epochs = 20
early_stopping = EarlyStopping(patience=3)

for epoch in range(epochs):
    print(f'Epoch {epoch+1}')

    # Training phase
    train_loss, train_acc, train_precision, train_recall, train_f1 = train_nn_model(nn_model, train_loader, optimizer, criterion, device)
    print(f'Feedforward Neural Network Model Training Results:\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, Train Precision: {train_precision:.4f}, Train Recall: {train_recall:.4f}, Train F1: {train_f1:.4f}')

    # Evaluation phase
    val_loss, val_acc, val_precision, val_recall, val_f1 = evaluate_nn_model(nn_model, val_loader, criterion, device)
    print(f'Feedforward Neural Network Model Validation Results:\nVal Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}, Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1: {val_f1:.4f}')

    early_stopping(val_loss, nn_model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

# Final evaluation on test set
test_loss, test_acc, test_precision, test_recall, test_f1 = evaluate_nn_model(nn_model, test_loader, criterion, device)
print(f"Feedforward Neural Network Model Test Results:\nTest Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Precision: {test_precision:.4f}, Test Recall: {test_recall:.4f}, Test F1: {test_f1:.4f}\n")

# Train and evaluate the decision tree model
clf, vectorizer = train_evaluate_decision_tree(train_loader, val_loader, test_loader)


Starting to prepare data...
Data preparation completed.
Starting to prepare data...
Data preparation completed.
Starting to prepare data...
Data preparation completed.
Epoch 1
Feedforward Neural Network Model Training Results:
Train Loss: 3.0916, Train Accuracy: 0.1500, Train Precision: 0.0653, Train Recall: 0.1500, Train F1: 0.0526
Feedforward Neural Network Model Validation Results:
Val Loss: 2.6001, Val Accuracy: 0.2050, Val Precision: 0.0420, Val Recall: 0.2050, Val F1: 0.0698
Epoch 2
Feedforward Neural Network Model Training Results:
Train Loss: 2.5124, Train Accuracy: 0.2940, Train Precision: 0.3231, Train Recall: 0.2940, Train F1: 0.1964
Feedforward Neural Network Model Validation Results:
Val Loss: 2.3539, Val Accuracy: 0.4450, Val Precision: 0.3424, Val Recall: 0.4450, Val F1: 0.3419
Epoch 3
Feedforward Neural Network Model Training Results:
Train Loss: 2.2492, Train Accuracy: 0.4320, Train Precision: 0.3839, Train Recall: 0.4320, Train F1: 0.3450
Feedforward Neural Network Mo

**Baseline for CoNLL-2012 Dataset**

In [20]:

warnings.filterwarnings("ignore", message="User provided device_type of 'cuda', but CUDA is not available. Disabling")


# Load the CoNLL-2012 dataset and use a subset for quick testing
from datasets import load_dataset
conll_dataset = load_dataset("conll2012_ontonotesv5", "english_v4")

def get_subset(dataset, num_samples=10):
    subset = dataset.select(range(min(num_samples, len(dataset))))
    return subset

# Use smaller subsets for quick testing
conll_dataset['train'] = get_subset(conll_dataset['train'], num_samples=100)
conll_dataset['validation'] = get_subset(conll_dataset['validation'], num_samples=50)
conll_dataset['test'] = get_subset(conll_dataset['test'], num_samples=50)

class RuleBasedCoreferenceResolver:
    def resolve(self, sentences):
        resolved = []
        for sentence in sentences:
            words = sentence['words']
            resolution = []
            for word in words:
                if self.is_pronoun(word):
                    antecedent = self.find_nearest_antecedent(word, words)
                    resolution.append(antecedent)
                else:
                    resolution.append(word)
            resolved.append(resolution)
        return resolved

    def is_pronoun(self, word):
        pronouns = {"he", "she", "it", "they", "him", "her", "them"}
        return word.lower() in pronouns

    def find_nearest_antecedent(self, pronoun, words):
        for i in range(len(words) - 1, -1, -1):
            if self.is_noun(words[i]):
                return words[i]
        return pronoun

    def is_noun(self, word):
        return word[0].isupper()

    def evaluate(self, dataset):
        all_labels = []
        all_preds = []
        for item in dataset:
            sentences = item['sentences']
            for sentence in sentences:
                words = sentence['words']
                coref_spans = sentence['coref_spans']
                labels = self.get_labels(len(words), coref_spans)
                preds = self.resolve([{'words': words}])
                all_labels.extend(labels)
                all_preds.extend([1 if p != w else 0 for p, w in zip(preds[0], words)])
        accuracy = accuracy_score(all_labels, all_preds)
        precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
        return accuracy, precision, recall, f1

    def get_labels(self, length, coref_spans):
        labels = [0] * length
        for span in coref_spans:
            for i in range(span[1], span[2] + 1):
                labels[i] = 1
        return labels

# Initialize the resolver
rule_based_resolver = RuleBasedCoreferenceResolver()

# Evaluate the resolver on train, validation, and test sets
train_accuracy, train_precision, train_recall, train_f1 = rule_based_resolver.evaluate(conll_dataset['train'])
val_accuracy, val_precision, val_recall, val_f1 = rule_based_resolver.evaluate(conll_dataset['validation'])
test_accuracy, test_precision, test_recall, test_f1 = rule_based_resolver.evaluate(conll_dataset['test'])

print(f"Rule-Based System - Train Accuracy: {train_accuracy:.4f}, Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, F1: {train_f1:.4f}")
print(f"Rule-Based System - Validation Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")
print(f"Rule-Based System - Test Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1: {test_f1:.4f}")

# Retrieval-Based Model
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics.pairwise import cosine_similarity

class RetrievalBasedModel:
    def __init__(self, model_name="bert-base-uncased"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)

    def encode(self, texts):
        inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True)
        with torch.no_grad():
            outputs = self.model(**inputs)
        embeddings = outputs.last_hidden_state.mean(dim=1)  # Mean pooling
        return embeddings

    def find_antecedent(self, pronoun, context):
        context_embeddings = self.encode(context)
        pronoun_embedding = self.encode([pronoun])

        similarities = cosine_similarity(pronoun_embedding.detach().numpy(), context_embeddings.detach().numpy())
        best_match_index = similarities.argmax()

        return context[best_match_index]

    def resolve(self, sentences):
        resolved = []
        for sentence in sentences:
            words = sentence['words']
            resolution = []
            for word in words:
                if self.is_pronoun(word):
                    antecedent = self.find_antecedent(word, words)
                    resolution.append(antecedent)
                else:
                    resolution.append(word)
            resolved.append(resolution)
        return resolved

    def is_pronoun(self, word):
        pronouns = {"he", "she", "it", "they", "him", "her", "them"}
        return word.lower() in pronouns

    def evaluate(self, dataset):
        all_labels = []
        all_preds = []
        for item in dataset:
            sentences = item['sentences']
            for sentence in sentences:
                words = sentence['words']
                coref_spans = sentence['coref_spans']
                labels = self.get_labels(len(words), coref_spans)
                preds = self.resolve([{'words': words}])
                all_labels.extend(labels)
                all_preds.extend([1 if p != w else 0 for p, w in zip(preds[0], words)])
        accuracy = accuracy_score(all_labels, all_preds)
        precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
        return accuracy, precision, recall, f1

    def get_labels(self, length, coref_spans):
        labels = [0] * length
        for span in coref_spans:
            for i in range(span[1], span[2] + 1):
                labels[i] = 1
        return labels

# Initialize the retrieval-based model
retrieval_model = RetrievalBasedModel()

# Evaluate the retrieval-based model on train, validation, and test sets
train_accuracy, train_precision, train_recall, train_f1 = retrieval_model.evaluate(conll_dataset['train'])
val_accuracy, val_precision, val_recall, val_f1 = retrieval_model.evaluate(conll_dataset['validation'])
test_accuracy, test_precision, test_recall, test_f1 = retrieval_model.evaluate(conll_dataset['test'])

print(f"Retrieval-Based Model - Train Accuracy: {train_accuracy:.4f}, Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, F1: {train_f1:.4f}")
print(f"Retrieval-Based Model - Validation Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")
print(f"Retrieval-Based Model - Test Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1: {test_f1:.4f}")

# Fine-Tuned BERT Model

class CoreferenceDataset(Dataset):
    def __init__(self, tokenizer, dataset, max_len=128):
        self.tokenizer = tokenizer
        self.dataset = dataset
        self.max_len = max_len

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        tokens = []
        labels = []

        for sentence in item['sentences']:
            words = sentence['words']
            coref_spans = sentence.get('coref_spans', [])
            tokens.extend(words)
            token_labels = [0] * len(words)
            for span in coref_spans:
                for i in range(span[1], span[2] + 1):
                    token_labels[i] = 1
            labels.extend(token_labels)

        inputs = self.tokenizer(tokens, is_split_into_words=True, padding='max_length', truncation=True, max_length=self.max_len, return_tensors='pt')
        labels = torch.tensor(labels[:self.max_len], dtype=torch.long)
        if len(labels) < self.max_len:
            labels = torch.cat([labels, torch.zeros(self.max_len - len(labels), dtype=torch.long)])

        return {'input_ids': inputs['input_ids'].squeeze(), 'attention_mask': inputs['attention_mask'].squeeze(), 'labels': labels}

def custom_collate_fn(batch):
    input_ids = pad_sequence([item['input_ids'] for item in batch], batch_first=True, padding_value=0)
    attention_mask = pad_sequence([item['attention_mask'] for item in batch], batch_first=True, padding_value=0)
    labels = pad_sequence([item['labels'] for item in batch], batch_first=True, padding_value=-100)

    return {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': labels}

# Initialize tokenizer and model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForTokenClassification.from_pretrained('bert-base-uncased', num_labels=2)

# Prepare datasets
train_dataset = CoreferenceDataset(tokenizer, conll_dataset['train'])
val_dataset = CoreferenceDataset(tokenizer, conll_dataset['validation'])
test_dataset = CoreferenceDataset(tokenizer, conll_dataset['test'])

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=custom_collate_fn)

# Optimizer and scheduler
optimizer = AdamW(model.parameters(), lr=2e-5)
total_steps = len(train_loader) * 3
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Mixed Precision Training
scaler = torch.cuda.amp.GradScaler()

def train_epoch(model, data_loader, optimizer, scheduler, device, scaler):
    model.train()
    losses = []
    all_labels = []
    all_preds = []

    for batch in data_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            logits = outputs.logits

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()

        preds = torch.argmax(logits, dim=2)
        active_labels = labels.view(-1)
        active_preds = preds.view(-1)

        active_labels = active_labels[active_labels != -100]
        active_preds = active_preds[active_labels != -100]

        all_labels.extend(active_labels.cpu().numpy())
        all_preds.extend(active_preds.cpu().numpy())
        losses.append(loss.item())

    avg_loss = np.mean(losses)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    return avg_loss, accuracy, precision, recall, f1

def eval_model(model, data_loader, device):
    model.eval()
    losses = []
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            logits = outputs.logits

            preds = torch.argmax(logits, dim=2)
            active_labels = labels.view(-1)
            active_preds = preds.view(-1)

            active_labels = active_labels[active_labels != -100]
            active_preds = active_preds[active_labels != -100]

            all_labels.extend(active_labels.cpu().numpy())
            all_preds.extend(active_preds.cpu().numpy())
            losses.append(loss.item())

    avg_loss = np.mean(losses)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    return avg_loss, accuracy, precision, recall, f1

# Custom training loop with early stopping
num_epochs = 20
patience = 3  # Number of epochs with no improvement after which training will be stopped
best_val_loss = float('inf')
epochs_no_improve = 0

for epoch in range(num_epochs):

    print(f'Epoch {epoch + 1}/{num_epochs}')
    print('-' * 10)

    train_loss, train_accuracy, train_precision, train_recall, train_f1 = train_epoch(model, train_loader, optimizer, scheduler, device, scaler)
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train Precision: {train_precision:.4f}, Train Recall: {train_recall:.4f}, Train F1: {train_f1:.4f}')

    val_loss, val_accuracy, val_precision, val_recall, val_f1 = eval_model(model, val_loader, device)
    print(f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1: {val_f1:.4f}')

    # Check early stopping criteria
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stopping triggered")
            break

# Evaluate the model on the test set after training
test_loss, test_accuracy, test_precision, test_recall, test_f1 = eval_model(model, test_loader, device)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}, Test Precision: {test_precision:.4f}, Test Recall: {test_recall:.4f}, Test F1: {test_f1:.4f}')


Rule-Based System - Train Accuracy: 0.7796, Precision: 0.7873, Recall: 0.7796, F1: 0.7035
Rule-Based System - Validation Accuracy: 0.7664, Precision: 0.7751, Recall: 0.7664, F1: 0.6856
Rule-Based System - Test Accuracy: 0.7772, Precision: 0.7869, Recall: 0.7772, F1: 0.6988
Retrieval-Based Model - Train Accuracy: 0.7647, Precision: 0.7994, Recall: 0.7647, F1: 0.6634
Retrieval-Based Model - Validation Accuracy: 0.7524, Precision: 0.8138, Recall: 0.7524, F1: 0.6469
Retrieval-Based Model - Test Accuracy: 0.7633, Precision: 0.7798, Recall: 0.7633, F1: 0.6616


Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch 1/20
----------
Train Loss: 0.5630, Train Accuracy: 0.7413, Train Precision: 0.6620, Train Recall: 0.7413, Train F1: 0.6870
Val Loss: 0.6038, Val Accuracy: 0.7542, Val Precision: 0.5688, Val Recall: 0.7542, Val F1: 0.6485
Epoch 2/20
----------
Train Loss: 0.5129, Train Accuracy: 0.7774, Train Precision: 0.6045, Train Recall: 0.7774, Train F1: 0.6801
Val Loss: 0.5633, Val Accuracy: 0.7542, Val Precision: 0.6919, Val Recall: 0.7542, Val F1: 0.6491
Epoch 3/20
----------
Train Loss: 0.4963, Train Accuracy: 0.7780, Train Precision: 0.7573, Train Recall: 0.7780, Train F1: 0.6824
Val Loss: 0.5559, Val Accuracy: 0.7542, Val Precision: 0.6920, Val Recall: 0.7542, Val F1: 0.6500
Epoch 4/20
----------
Train Loss: 0.4994, Train Accuracy: 0.7770, Train Precision: 0.6909, Train Recall: 0.7770, Train F1: 0.6817
Val Loss: 0.5559, Val Accuracy: 0.7542, Val Precision: 0.6920, Val Recall: 0.7542, Val F1: 0.6500
Epoch 5/20
----------
Train Loss: 0.4844, Train Accuracy: 0.7776, Train Precision: 0.721