In [None]:
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence
import numpy as np
from collections import Counter

# Efficiently extract data by processing line-by-line
def extract_data(file_path):
    sentences = []
    labels = []
    sentence = None
    label_set = []

    sentence_pattern = re.compile(r"([A-Z][^.!?]*[.!?])")
    label_pattern = re.compile(r"(ARG1|REL|ARG2|NONE|LOC|TIME)(\s+\w+)*")

    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()

            if sentence_pattern.match(line):
                if sentence:
                    sentences.append(sentence)
                    labels.append(label_set)
                    label_set = []
                sentence = line
            elif label_pattern.match(line):
                label_set.append(line.split())

    if sentence:
        sentences.append(sentence)
        labels.append(label_set)

    return sentences, labels

# Vocabulary building and encoding
def build_vocab(sentences, labels):
    word2idx = {'<PAD>': 0}  # Initialize with <PAD> token
    tag2idx = {}

    # Build vocabulary for words
    for sentence in sentences:
        for word in sentence.split():
            if word not in word2idx:
                word2idx[word] = len(word2idx)

    # Build vocabulary for tags (ARG1, REL, ARG2, etc.)
    for label_set in labels:
        for label_seq in label_set:
            for label in label_seq:
                if label not in tag2idx:
                    tag2idx[label] = len(tag2idx)

    return word2idx, tag2idx

def encode_sentence(sentence, word2idx):
    return [word2idx[word] for word in sentence.split()]

def encode_labels(label_set, tag2idx):
    return [tag2idx[label] for label_seq in label_set for label in label_seq]

# Padding function
def pad_and_convert_to_tensor(sequences, padding_value=0):
    return pad_sequence([torch.tensor(seq) for seq in sequences], batch_first=True, padding_value=padding_value)

# Truncate sequences function
def truncate_sequences(sequences, max_len):
    return [seq[:max_len] for seq in sequences]

# Function to calculate class distribution in labels
def calculate_label_distribution(labels, tag2idx):
    label_counts = Counter()
    for label_seq in labels:
        label_counts.update(label_seq)
    return {idx: label_counts.get(tag2idx[idx], 0) for idx in tag2idx}

# Function to balance the dataset by undersampling frequent labels and/or oversampling minority labels
def balance_dataset(sentences, labels, label_distribution, target_count_per_class):
    balanced_sentences = []
    balanced_labels = []

    class_counts = {label: 0 for label in label_distribution}

    for sentence, label_set in zip(sentences, labels):
        for label_seq in label_set:
            for label in label_seq:
                if class_counts[label] < target_count_per_class:
                    balanced_sentences.append(sentence)
                    balanced_labels.append(label_set)
                    class_counts[label] += 1

    return balanced_sentences, balanced_labels

# Dataloader preparation
def prepare_dataloaders(encoded_sentences, encoded_labels, batch_size=32):
    padded_sentences = pad_and_convert_to_tensor(encoded_sentences, padding_value=word2idx['<PAD>'])
    padded_labels = pad_and_convert_to_tensor(encoded_labels, padding_value=-1)

    print("Padded Sentences: Shape =", padded_sentences.shape, ", Type =", type(padded_sentences))
    print("Padded Labels: Shape =", padded_labels.shape, ", Type =", type(padded_labels))

    # Manually split the data
    dataset_size = len(padded_sentences)
    indices = list(range(dataset_size))
    split = int(np.floor(0.8 * dataset_size))  # Ensure np is defined
    np.random.shuffle(indices)

    train_indices, val_indices = indices[:split], indices[split:]

    train_sentences = padded_sentences[train_indices]
    val_sentences = padded_sentences[val_indices]
    train_labels = padded_labels[train_indices]
    val_labels = padded_labels[val_indices]

    train_data = TensorDataset(train_sentences, train_labels)
    val_data = TensorDataset(val_sentences, val_labels)

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader

# Define the BiLSTM-CRF model
class BiLSTM_CRF(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim=100, hidden_dim=256):
        super(BiLSTM_CRF, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, num_layers=3, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim, tagset_size)  # Ensure this outputs num_tags (tagset_size)
        self.transitions = nn.Parameter(torch.randn(tagset_size, tagset_size))

        self.start_transitions = nn.Parameter(torch.randn(tagset_size))
        self.end_transitions = nn.Parameter(torch.randn(tagset_size))

    def forward(self, sentences, labels=None, mask=None):
        embeds = self.embedding(sentences)
        lstm_out, _ = self.lstm(embeds)
        emissions = self.fc(lstm_out)  # Output: (batch_size, seq_len, num_tags)

        if labels is not None:
            loss = self._compute_loss(emissions, labels, mask)
            return loss
        else:
            # Apply Viterbi decoding for evaluation
            return self._viterbi_decode(emissions, mask)  # Return the best paths

    def _compute_loss(self, emissions, labels, mask):
        gold_score = self._compute_score(emissions, labels, mask)
        partition_score = self._compute_log_partition(emissions, mask)
        return partition_score - gold_score

    def _compute_score(self, emissions, labels, mask):
        batch_size, seq_len = emissions.size(0), emissions.size(1)
        score = self.start_transitions[labels[:, 0]]

        for t in range(seq_len - 1):
            score += emissions[torch.arange(batch_size), t, labels[:, t]] * mask[:, t]
            score += self.transitions[labels[:, t], labels[:, t + 1]] * mask[:, t + 1]

        score += emissions[torch.arange(batch_size), -1, labels[:, -1]] * mask[:, -1]
        score += self.end_transitions[labels[:, -1]] * mask[:, -1]

        return score.sum()

    def _compute_log_partition(self, emissions, mask):
        seq_len, num_tags = emissions.shape[1], emissions.shape[2]
        batch_size = emissions.size(0)

        alpha = self.start_transitions + emissions[:, 0]

        for t in range(1, seq_len):
            alpha_t = []
            for tag in range(num_tags):
                log_sum_exp = torch.logsumexp(alpha + self.transitions[:, tag], dim=1)
                alpha_t.append(log_sum_exp)
            alpha = torch.stack(alpha_t, dim=1) + emissions[:, t] * mask[:, t].unsqueeze(-1)

        alpha += self.end_transitions
        return torch.logsumexp(alpha, dim=1).sum()

    def _viterbi_decode(self, emissions, mask):
        seq_len, num_tags = emissions.shape[1], emissions.shape[2]
        batch_size = emissions.size(0)

        # Initialize Viterbi variables
        viterbi_scores = self.start_transitions + emissions[:, 0]
        backpointers = []

        for t in range(1, seq_len):
            viterbi_t = []
            backpointer_t = []

            for tag in range(num_tags):
                scores = viterbi_scores + self.transitions[:, tag]
                best_tag_id = torch.argmax(scores, dim=1)
                viterbi_t.append(scores[torch.arange(batch_size), best_tag_id])
                backpointer_t.append(best_tag_id)

            viterbi_scores = torch.stack(viterbi_t, dim=1) + emissions[:, t] * mask[:, t].unsqueeze(-1)
            backpointers.append(torch.stack(backpointer_t, dim=1))

        best_last_tag = torch.argmax(viterbi_scores + self.end_transitions, dim=1)
        best_path = [best_last_tag]

        for backpointer in reversed(backpointers):
            best_last_tag = backpointer[torch.arange(batch_size), best_last_tag]
            best_path.append(best_last_tag)

        best_path.reverse()
        return torch.stack(best_path, dim=1)

# Define training and evaluation functions
def train_model(model, train_loader, val_loader, epochs=10):
    optimizer = optim.Adam(model.parameters(), lr=0.0005)
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in train_loader:
            sentences, labels = batch
            optimizer.zero_grad()
            loss = model(sentences, labels=labels, mask=(labels != word2idx['<PAD>']).float())
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_train_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_train_loss:.4f}")
        model.eval()
        evaluate_model(model, val_loader)

def evaluate_model(model, val_loader):
    model.eval()
    total_correct = 0
    total_labels = 0
    with torch.no_grad():
        for batch in val_loader:
            sentences, labels = batch
            mask = (labels != word2idx['<PAD>']).float()
            predictions = model(sentences, mask=mask)  # Get the predicted best paths

            # The predictions should now be the best paths
            for pred, true, m in zip(predictions, labels, mask):
                valid_length = int(m.sum().item())  # Number of valid (non-padded) tokens
                if valid_length > 0:
                    total_correct += (pred[:valid_length] == true[:valid_length]).float().sum().item()
                    total_labels += valid_length

    accuracy = total_correct / total_labels if total_labels > 0 else 0
    print(f"Validation Accuracy: {accuracy:.4f}")

# Prepare the data
file_path = r"C:\Users\Kaushik Kadium\Downloads\original_cleaned"  
# Update the path as necessary
sentences, labels = extract_data(file_path)
print(f"Extracted {len(sentences)} sentences and {len(labels)} label sets")

# Select only a subset of sentences for training (modify as needed)
#sentences = sentences[:3000]
#labels = labels[:3000]

# Build vocabularies and encode sentences and labels
word2idx, tag2idx = build_vocab(sentences, labels)
idx2word = {v: k for k, v in word2idx.items()}  # For decoding sentences later
idx2tag = {v: k for k, v in tag2idx.items()}    # For decoding labels later
encoded_sentences = [encode_sentence(s, word2idx) for s in sentences]
encoded_labels = [encode_labels(l, tag2idx) for l in labels]

# Truncate and pad sequences
MAX_SEQ_LEN = 379
encoded_sentences = truncate_sequences(encoded_sentences, MAX_SEQ_LEN)
encoded_labels = truncate_sequences(encoded_labels, MAX_SEQ_LEN)

# Balance dataset based on label distribution
label_distribution = calculate_label_distribution(encoded_labels, tag2idx)
target_count_per_class = min(label_distribution.values())  # Target the minimum count class for balance

balanced_sentences, balanced_labels = balance_dataset(sentences, labels, label_distribution, target_count_per_class)

# Encode and truncate balanced data
encoded_sentences = [encode_sentence(s, word2idx) for s in balanced_sentences]
encoded_labels = [encode_labels(l, tag2idx) for l in balanced_labels]
encoded_sentences = truncate_sequences(encoded_sentences, MAX_SEQ_LEN)
encoded_labels = truncate_sequences(encoded_labels, MAX_SEQ_LEN)

# Prepare data loaders
train_loader, val_loader = prepare_dataloaders(encoded_sentences, encoded_labels)
print("Data preprocessing done!")

# Create the model
vocab_size = len(word2idx)
tagset_size = len(tag2idx)
model = BiLSTM_CRF(vocab_size, tagset_size)

# Train the model
train_model(model, train_loader, val_loader, epochs=10)

In [None]:
import torch
# Function to load test sentences from a file
def load_test_sentences(file_path):
    test_sentences = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            sentence = line.strip().split('\t')[0]  # Extract the sentence part
            test_sentences.append(sentence)
    return test_sentences

# Prediction Code for CaRB
def generate_predictions_for_carb(model, test_sentences, word2idx, idx2word):
    model.eval()
    all_predictions = []

    with torch.no_grad():
        for sentence in test_sentences:
            # Encode sentence into word indices
            encoded_sentence = torch.tensor([word2idx.get(word, word2idx['<PAD>']) for word in sentence.split()]).unsqueeze(0)

            # Generate mask based on valid tokens
            mask = (encoded_sentence != word2idx['<PAD>']).float()

            # Generate predictions using the model with Viterbi decoding
            predictions = model(encoded_sentence, mask=mask)

            # Decode predictions and map back to words
            predictions = predictions.squeeze(0).tolist()
            words = [idx2word[word_idx] for word_idx in encoded_sentence.squeeze(0).tolist() if word_idx != word2idx['<PAD>']]

            # Ensure length of predictions and words are the same
            if len(predictions) > len(words):
                predictions = predictions[:len(words)]
            elif len(predictions) < len(words):
                words = words[:len(predictions)]

            # Extract relation and arguments from predictions
            relation, arg1, arg2, additional_args = [], [], [], []

            for idx, label in enumerate(predictions):
                tag = idx2tag.get(label, 'NONE')

                if tag == "REL":
                    relation.append(words[idx])
                elif tag == "ARG1":
                    arg1.append(words[idx])
                elif tag == "ARG2":
                    arg2.append(words[idx])
                else:
                    additional_args.append(words[idx])

            # Skip if no valid extraction was found
            if relation and arg1 and arg2:
                # Join relation and arguments into the required format
                formatted_line = f"{sentence}\t1.00\t{' '.join(relation)}\t{' '.join(arg1)}\t{' '.join(arg2)}"
                if additional_args:
                    formatted_line += f"\t{' '.join(additional_args)}"

                all_predictions.append(formatted_line)

    return all_predictions

# Test Sentences Prediction
test_file_path = r"C:\Users\Kaushik Kadium\Documents\Python Scripts\CaRB\data\test.txt"  # Path to your test file
test_sentences = load_test_sentences(test_file_path)

# Generate predictions for CaRB
test_predictions = generate_predictions_for_carb(model, test_sentences, word2idx, idx2word)

# Save the predictions to a file in the format expected by CaRB
output_file_path = r"your_output.txt"
with open(output_file_path, 'w', encoding='utf-8') as f:
    for line in test_predictions:
        f.write(line + '\n')

print(f"Predictions saved to {output_file_path}")


In [None]:
import csv

def load_ground_truth(file_path):
    """Load the ground truth extractions from the gold test file."""
    ground_truths = []
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='\t')
        for row in reader:
            if len(row) >= 5:
                ground_truths.append({
                    'sentence': row[0],
                    'relation': row[2].split(),
                    'arg1': row[3].split(),
                    'arg2': row[4].split()
                })
            else:
                print(f"Skipping invalid row (less than 5 columns): {row}")
    return ground_truths

def load_model_predictions(file_path):
    """Load the model's predicted extractions from the generated output file."""
    predictions = []
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='\t')
        for row in reader:
            if len(row) >= 5:
                predictions.append({
                    'sentence': row[0],
                    'relation': row[2].split(),
                    'arg1': row[3].split(),
                    'arg2': row[4].split()
                })
            else:
                print(f"Skipping invalid row (less than 5 columns): {row}")
    return predictions

def error_analysis(predictions, ground_truths):
    """Perform error analysis between predictions and ground truth extractions."""
    incorrect_extractions = {
        "Correct relation phrase, incorrect arguments": 0,
        "Non-contiguous relation phrase": 0,
        "Overspecified relation phrase": 0,
        "N-ary relations (more than two arguments)": 0,
        "Imperative verb used in relation": 0,
        "Other errors": 0
    }

    missed_extractions = {
        "Could not identify correct arguments": 0,
        "Relation filtered out by lexical constraint": 0,
        "POS/chunking error": 0,
        "Identified a more specific relation": 0,
        "Other errors": 0
    }

    mismatched_sentences = []  # To track sentences with errors
    incorrect_sentences = {key: [] for key in incorrect_extractions}  # Store sentences for each incorrect extraction type
    missed_sentences = {key: [] for key in missed_extractions}  # Store sentences for each missed extraction type

    for truth in ground_truths:
        matching_pred = next((pred for pred in predictions if pred['sentence'] == truth['sentence']), None)
        
        if not matching_pred:
            # No prediction made for this sentence (missed entirely)
            missed_extractions["Relation filtered out by lexical constraint"] += 1
            missed_sentences["Relation filtered out by lexical constraint"].append(truth['sentence'])
            continue
        
        pred_relation = set(matching_pred['relation'])
        pred_arg1 = set(matching_pred['arg1'])
        pred_arg2 = set(matching_pred['arg2'])

        truth_relation = set(truth['relation'])
        truth_arg1 = set(truth['arg1'])
        truth_arg2 = set(truth['arg2'])

        # Additional argument checking for n-ary relations
        truth_arg_count = len([truth['arg1'], truth['arg2']])
        pred_arg_count = len([matching_pred['arg1'], matching_pred['arg2']])

        if truth_arg_count > 2 or pred_arg_count > 2:
            incorrect_extractions["N-ary relations (more than two arguments)"] += 1
            incorrect_sentences["N-ary relations (more than two arguments)"].append(truth['sentence'])

        mismatch_info = {"sentence": truth['sentence'], "error_type": None}

        # Check for imperative verbs in relations
        if pred_relation and pred_relation.intersection({"please", "do", "go", "stop", "give", "help"}):
            incorrect_extractions["Imperative verb used in relation"] += 1
            incorrect_sentences["Imperative verb used in relation"].append(truth['sentence'])

        # Incorrect extraction cases
        if pred_relation == truth_relation:
            if pred_arg1 != truth_arg1 or pred_arg2 != truth_arg2:
                incorrect_extractions["Correct relation phrase, incorrect arguments"] += 1
                incorrect_sentences["Correct relation phrase, incorrect arguments"].append(truth['sentence'])
        else:
            if len(pred_relation.intersection(truth_relation)) > 0:
                incorrect_extractions["Non-contiguous relation phrase"] += 1
                incorrect_sentences["Non-contiguous relation phrase"].append(truth['sentence'])
            elif len(pred_relation) > len(truth_relation):
                incorrect_extractions["Overspecified relation phrase"] += 1
                incorrect_sentences["Overspecified relation phrase"].append(truth['sentence'])
            else:
                incorrect_extractions["Other errors"] += 1
                incorrect_sentences["Other errors"].append(truth['sentence'])

        # Missed extraction cases
        if not pred_relation and truth_relation:
            missed_extractions["Relation filtered out by lexical constraint"] += 1
            missed_sentences["Relation filtered out by lexical constraint"].append(truth['sentence'])
        if not pred_arg1 and truth_arg1:
            missed_extractions["Could not identify correct arguments"] += 1
            missed_sentences["Could not identify correct arguments"].append(truth['sentence'])
        if not pred_arg2 and truth_arg2:
            missed_extractions["Could not identify correct arguments"] += 1
            missed_sentences["Could not identify correct arguments"].append(truth['sentence'])

        # Check if predicted relation is more specific than ground truth relation
        if pred_relation and truth_relation and len(pred_relation) > len(truth_relation):
            missed_extractions["Identified a more specific relation"] += 1
            missed_sentences["Identified a more specific relation"].append(truth['sentence'])

        # Add mismatched sentence for reporting
        if mismatch_info["error_type"]:
            mismatched_sentences.append(mismatch_info)

    return incorrect_extractions, missed_extractions, mismatched_sentences, incorrect_sentences, missed_sentences

# Load ground truth and predictions
ground_truth_file = r"C:\Users\Kaushik Kadium\Documents\Python Scripts\CaRB\data\gold\test.tsv"
prediction_file = r"C:\Users\Kaushik Kadium\Documents\Python Scripts\CaRB\your_output.txt"

ground_truths = load_ground_truth(ground_truth_file)
predictions = load_model_predictions(prediction_file)

# Run the error analysis
incorrect_extractions, missed_extractions, mismatched_sentences, incorrect_sentences, missed_sentences = error_analysis(predictions, ground_truths)

# Print analysis results
print("Incorrect Extractions:")
for key, value in incorrect_extractions.items():
    print(f"{key}: {value}")

print("\nMissed Extractions:")
for key, value in missed_extractions.items():
    print(f"{key}: {value}")

# Print 5 example sentences for each error type in incorrect extractions
print("\nExample Sentences for Incorrect Extractions:")
for error_type, sentences in incorrect_sentences.items():
    print(f"\n{error_type} (Showing up to 5 examples):")
    for sentence in sentences[:5]:
        print(f"- {sentence}")

# Print 5 example sentences for each error type in missed extractions
print("\nExample Sentences for Missed Extractions:")
for error_type, sentences in missed_sentences.items():
    print(f"\n{error_type} (Showing up to 5 examples):")
    for sentence in sentences[:5]:
        print(f"- {sentence}")

# Print mismatched sentences for analysis
print("\nMismatched Sentences:")
for mismatch in mismatched_sentences:
    print(f"Sentence: {mismatch['sentence']}, Error Type: {mismatch['error_type']}")
