### Fundamentals of Natural Language Processing
# Negation and Uncertainty Detection using a Machine-Learning Based Approach

*Authors:*

> *Anna Blanco, Agustina Lazzati, Stanislav Bultaskii, Queralt Salvadó*

*Aims:*
> Rewrite for DL

In [None]:
# Import necessary libraries and functions
import json
import spacy
from collections import defaultdict
import re
import pandas as pd
import numpy as np
import pickle

In [None]:
with open("lstm_data.pkl", "rb") as f:
    data_dict = pickle.load(f)

lstm_train_data_neg_cue = data_dict["lstm_train_data_neg_cue"]
lstm_train_data_neg_scope = data_dict["lstm_train_data_neg_scope"]
lstm_train_data_unc_cue = data_dict["lstm_train_data_unc_cue"]
lstm_train_data_unc_scope = data_dict["lstm_train_data_unc_scope"]

lstm_test_data_neg_cue = data_dict["lstm_test_data_neg_cue"]
lstm_test_data_neg_scope = data_dict["lstm_test_data_neg_scope"]
lstm_test_data_unc_cue = data_dict["lstm_test_data_unc_cue"]
lstm_test_data_unc_scope = data_dict["lstm_test_data_unc_scope"]

print(lstm_train_data_neg_cue[2])

(['antecedents', 'alergia', 'a', 'penicilina', 'y', 'cloramfenicol', '.'], [0, 0, 0, 0, 0, 0, 0])


In [None]:
def merge_labels(cue_labels, scope_labels, cue_prefix="CUE", scope_prefix="SCOPE"):
    merged = []
    for cue, scope in zip(cue_labels, scope_labels):
        if cue != 0:
            merged.append(f"{cue_prefix}_{str(cue)}")
        elif scope != 0:
            merged.append(f"{scope_prefix}_{str(scope)}")
        else:
            merged.append("0")
    return merged

In [None]:
# Merge negation data
lstm_train_data_neg = [
    (tokens, merge_labels(cue_labels, scope_labels, cue_prefix="NEG", scope_prefix="NSCO"))
    for (tokens, cue_labels), (_, scope_labels) in zip(lstm_train_data_neg_cue, lstm_train_data_neg_scope)
]

lstm_test_data_neg = [
    (tokens, merge_labels(cue_labels, scope_labels, cue_prefix="NEG", scope_prefix="NSCO"))
    for (tokens, cue_labels), (_, scope_labels) in zip(lstm_test_data_neg_cue, lstm_test_data_neg_scope)
]

# Similarly for uncertainty
lstm_train_data_unc = [
    (tokens, merge_labels(cue_labels, scope_labels, cue_prefix="UNC", scope_prefix="UNSCO"))
    for (tokens, cue_labels), (_, scope_labels) in zip(lstm_train_data_unc_cue, lstm_train_data_unc_scope)
]

lstm_test_data_unc = [
    (tokens, merge_labels(cue_labels, scope_labels, cue_prefix="UNC", scope_prefix="UNSCO"))
    for (tokens, cue_labels), (_, scope_labels) in zip(lstm_test_data_unc_cue, lstm_test_data_unc_scope)
]

print(lstm_train_data_neg[3])

(['no', 'habitos', 'toxicos', '.'], ['NEG_1', 'NSCO_1', 'NSCO_1', 'NSCO_1'])


In [None]:
!pip install fasttext

import fasttext

# Download the English fastText model
!wget https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.en.300.bin.gz
# Unzip the downloaded file
!gunzip cc.en.300.bin.gz

--2025-05-27 10:09:37--  https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.en.300.bin.gz
Resolving dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)... 3.163.189.51, 3.163.189.96, 3.163.189.108, ...
Connecting to dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)|3.163.189.51|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4503593528 (4.2G) [application/octet-stream]
Saving to: ‘cc.en.300.bin.gz’


2025-05-27 10:10:34 (74.3 MB/s) - ‘cc.en.300.bin.gz’ saved [4503593528/4503593528]

gzip: cc.en.300.bin already exists; do you wish to overwrite (y or n)? n
	not overwritten


In [None]:
# Load pretrained FastText model (English, 300-dimensional vectors)
fasttext_model = fasttext.load_model("cc.en.300.bin")

In [None]:
from collections import defaultdict

def build_vocab(sentences):
    word2idx = defaultdict(lambda: 0)  # unknown token index = 0
    idx = 1
    for sent in sentences:
        for word in sent:
            if word not in word2idx:
                word2idx[word] = idx
                idx += 1
    return dict(word2idx)

def build_label_vocab(labels_list):
    label_set = set()
    for labels in labels_list:
        label_set.update(labels)
    label2idx = {label: i for i, label in enumerate(sorted(label_set))}
    return label2idx

# Train and Test data for negations
all_train_sentences_neg = [x[0] for x in lstm_train_data_neg] # list of token lists
all_train_labels_neg = [x[1] for x in lstm_train_data_neg] # list of label lists

all_test_sentences_neg = [x[0] for x in lstm_test_data_neg]
all_test_labels_neg = [x[1] for x in lstm_test_data_neg]

# Train and Test data for uncertainties
all_train_sentences_unc = [x[0] for x in lstm_train_data_unc]
all_train_labels_unc = [x[1] for x in lstm_train_data_unc]

all_test_sentences_unc = [x[0] for x in lstm_test_data_unc]
all_test_labels_unc = [x[1] for x in lstm_test_data_unc]

# Merge all sentences and labels into single lists
all_sentences = (
    all_train_sentences_neg + all_test_sentences_neg +
    all_train_sentences_unc + all_test_sentences_unc
)

all_labels = (
    all_train_labels_neg + all_test_labels_neg +
    all_train_labels_unc + all_test_labels_unc
)

# Build vocabularies
word2idx = build_vocab(all_sentences)
label2idx = build_label_vocab(all_labels)

print(f"Vocabulary size (words): {len(word2idx)}")
print(f"Number of unique labels: {len(label2idx)}")

# Optional: check example mappings
print(f"Example word2idx: {list(word2idx.items())[:10]}")
print(f"Example label2idx: {list(label2idx.items())[:10]}")

Vocabulary size (words): 23359
Number of unique labels: 5
Example word2idx: [(' ', 1), ('nº', 2), ('historia', 3), ('clinica', 4), (':', 5), ('*', 6), ('nºepisodi', 7), ('sexe', 8), ('home', 9), ('data', 10)]
Example label2idx: [('0', 0), ('NEG_1', 1), ('NSCO_1', 2), ('UNC_1', 3), ('UNSCO_1', 4)]


In [None]:
import numpy as np

vocab_size = len(word2idx) + 1  # +1 for padding idx=0
embedding_matrix = np.zeros((vocab_size, 300))

for word, idx in word2idx.items():
    try:
        embedding_vector = fasttext_model.get_word_vector(word)
        embedding_matrix[idx] = embedding_vector
    except KeyError:
        embedding_matrix[idx] = np.random.normal(scale=0.6, size=(300,))

In [None]:
def encode_sentences(sentences, word2idx):
    encoded = []
    for sent in sentences:
        encoded.append([word2idx.get(word, 0) for word in sent])
    return encoded

def encode_labels(labels, label2idx):
    encoded = []
    for lab_seq in labels:
        encoded.append([label2idx[str(label)] for label in lab_seq])
    return encoded

# Negation data
X_train_neg = encode_sentences(all_train_sentences_neg, word2idx)
y_train_neg = encode_labels(all_train_labels_neg, label2idx)

X_test_neg = encode_sentences(all_test_sentences_neg, word2idx)
y_test_neg = encode_labels(all_test_labels_neg, label2idx)

# Uncertainty data
X_train_unc = encode_sentences(all_train_sentences_unc, word2idx)
y_train_unc = encode_labels(all_train_labels_unc, label2idx)

X_test_unc = encode_sentences(all_test_sentences_unc, word2idx)
y_test_unc = encode_labels(all_test_labels_unc, label2idx)

# Show an example
print(X_train_neg[2], y_train_neg[2])

[45, 46, 47, 48, 49, 50, 44] [0, 0, 0, 0, 0, 0, 0]


In [None]:
from torch.nn.utils.rnn import pad_sequence
import torch

def pad_sequences(sequences, pad_value=0):
    # Convert lists of indices to torch tensors
    tensor_seqs = [torch.tensor(seq) for seq in sequences]
    # Pad sequences to the max length in the batch
    padded_seqs = pad_sequence(tensor_seqs, batch_first=True, padding_value=pad_value)
    return padded_seqs

# Pad inputs and labels (negation)
X_train_neg_padded = pad_sequences(X_train_neg, pad_value=0)
y_train_neg_padded = pad_sequences(y_train_neg, pad_value=label2idx.get('0', 0))

X_test_neg_padded = pad_sequences(X_test_neg, pad_value=0)
y_test_neg_padded = pad_sequences(y_test_neg, pad_value=label2idx.get('0', 0))

# Pad inputs and labels (uncertainty)
X_train_unc_padded = pad_sequences(X_train_unc, pad_value=0)
y_train_unc_padded = pad_sequences(y_train_unc, pad_value=label2idx.get('0', 0))

X_test_unc_padded = pad_sequences(X_test_unc, pad_value=0)
y_test_unc_padded = pad_sequences(y_test_unc, pad_value=label2idx.get('0', 0))

In [None]:
from torch.utils.data import Dataset, DataLoader

class SequenceTaggingDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create dataset objects
train_dataset_neg = SequenceTaggingDataset(X_train_neg_padded, y_train_neg_padded)
test_dataset_neg = SequenceTaggingDataset(X_test_neg_padded, y_test_neg_padded)

train_dataset_unc = SequenceTaggingDataset(X_train_unc_padded, y_train_unc_padded)
test_dataset_unc = SequenceTaggingDataset(X_test_unc_padded, y_test_unc_padded)

# Create dataloaders
train_loader_neg = DataLoader(train_dataset_neg, batch_size=32, shuffle=True)
test_loader_neg = DataLoader(test_dataset_neg, batch_size=32, shuffle=False)

train_loader_unc = DataLoader(train_dataset_unc, batch_size=32, shuffle=True)
test_loader_unc = DataLoader(test_dataset_unc, batch_size=32, shuffle=False)

In [None]:
import torch.nn as nn
import torch

class BiLSTM(nn.Module):
    def __init__(self, embedding_matrix, hidden_dim_lstm, hidden_dim_gru, output_dim, pad_idx=0):
        super(BiLSTM, self).__init__()
        vocab_size, embedding_dim = embedding_matrix.shape

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.embedding.weight.data.copy_(torch.from_numpy(embedding_matrix))
        self.embedding.weight.requires_grad = False  # freeze embeddings if you want

        self.lstm = nn.LSTM(embedding_dim, hidden_dim_lstm, batch_first=True, bidirectional=True)
        self.gru = nn.GRU(hidden_dim_lstm * 2, hidden_dim_gru, batch_first=True, bidirectional=True)

        self.fc = nn.Linear(hidden_dim_gru * 2, output_dim)  # times 2 for bidirectional GRU output

    def forward(self, x):
        embedded = self.embedding(x)  # (batch_size, seq_len, embedding_dim)

        lstm_out, _ = self.lstm(embedded)  # (batch_size, seq_len, hidden_dim_lstm*2)
        gru_out, _ = self.gru(lstm_out)    # (batch_size, seq_len, hidden_dim_gru*2)

        logits = self.fc(gru_out)  # (batch_size, seq_len, output_dim)
        return logits

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn

# Parameters (adjust as needed)
hidden_dim_lstm = 128
hidden_dim_gru = 64
output_dim = len(label2idx)  # number of classes/tags
pad_idx = 0

# Instantiate model
model = BiLSTM(embedding_matrix, hidden_dim_lstm, hidden_dim_gru, output_dim, pad_idx)

# Move to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)  # ignore padding labels in loss
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
from torch.utils.data import DataLoader, TensorDataset

def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0

    for batch in dataloader:
        inputs, labels = batch
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)  # outputs shape: (batch_size, seq_len, output_dim)

        # Reshape for loss: combine batch and seq dims
        outputs = outputs.view(-1, outputs.shape[-1])  # (batch_size * seq_len, output_dim)
        labels = labels.view(-1)                       # (batch_size * seq_len)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

In [None]:
num_epochs = 5

print("NEGATIONS")
for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader_neg, optimizer, criterion, device)
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {train_loss:.4f}")