In [1]:
# In[1] - Environment Setup

# Uncomment and run these if the packages are not yet installed:
# !pip install torch torchvision torchaudio
# !pip install transformers
# !pip install scispacy
# !pip install https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/en_ner_bc5cdr_md-0.5.0.tar.gz
# !pip install torchcrf
# !pip install seqeval

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchcrf import CRF
# from datasets import Dataset

from transformers import AutoModel, AutoTokenizer
import spacy
from seqeval.metrics import f1_score, precision_score, recall_score, classification_report

import json
import numpy as np
import random


In [2]:
# In[2] - GPU Check

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cuda


In [3]:
# In[3] - Load SciSpaCy Model for Knowledge Features

try:
    nlp = spacy.load("en_ner_bc5cdr_md")
    print("Loaded SciSpaCy model: en_ner_bc5cdr_md")
except Exception as e:
    nlp = None
    print("Could not load SciSpaCy model. Error:", e)




Loaded SciSpaCy model: en_ner_bc5cdr_md


  deserializers["tokenizer"] = lambda p: self.tokenizer.from_disk(  # type: ignore[union-attr]


In [4]:
# In[4] - Define a Simple Knowledge Feature Extraction Function

def get_knowledge_features(tokens):
    """
    For each token in the sentence, return a binary feature (0 or 1)
    indicating whether it is part of a recognized entity according to SciSpaCy.
    
    This is a simple placeholder; you can extend it to incorporate UMLS concept IDs
    or richer biomedical features.
    """
    if nlp is None:
        return [0] * len(tokens)
    
    text = " ".join(tokens)
    doc = nlp(text)
    feats = [0] * len(tokens)
    for ent in doc.ents:
        # Mark each token within the entity span as 1
        for i in range(ent.start, ent.end):
            if i < len(feats):
                feats[i] = 1
    return feats


In [5]:

# def load_jsonl(filepath):
#     data = []
#     with open(filepath, 'r', encoding='utf-8') as file:
#         for line in file:
#             data.append(json.loads(line.strip()))
#     return Dataset.from_list(data)

# # Load datasets
# train_dataset = load_jsonl('/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Batch 1/combined_train_1.jsonl')
# dev_dataset = load_jsonl('/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Batch 1/combined_dev_1.jsonl')
# test_dataset = load_jsonl('/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Batch 1/combined_test_1.jsonl')

# print(f"Train examples: {len(train_dataset)}")
# print(f"Dev examples: {len(dev_dataset)}")
# print(f"Test examples: {len(test_dataset)}")


In [6]:
# In[5] - Load Label Mapping from JSON

# Update the file path if needed.
with open("/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Hybrid Model/label2id.json", "r", encoding="utf-8") as f:
    label2id = json.load(f)

id2label = {v: k for k, v in label2id.items()}
print("Loaded label mapping with", len(label2id), "labels.")


Loaded label mapping with 3 labels.


In [7]:
# In[6] - Define the NER Dataset Class

class NERDataset(Dataset):
    def __init__(self, file_path, tokenizer, label2id, max_length=128):
        self.tokenizer = tokenizer
        self.label2id = label2id
        self.max_length = max_length
        self.samples = []  # renamed to avoid conflict with any built-in properties
        
        with open(file_path, "r", encoding="utf-8") as f:
            # Each line is a JSON object like: {"tokens": [...], "tags": [...]}
            for line in f:
                item = json.loads(line)
                self.samples.append(item)
                
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        item = self.samples[idx]
        tokens = item["tokens"]
        tags = item["tags"]
        
        # Get knowledge features (a list of 0/1 values)
        knowledge_feats = get_knowledge_features(tokens)
        
        # Tokenize using the Hugging Face tokenizer
        encoding = self.tokenizer(
            tokens,
            is_split_into_words=True,
            return_tensors="pt",
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_offsets_mapping=True
        )
        input_ids = encoding['input_ids'].squeeze(0)         # (max_length,)
        attention_mask = encoding['attention_mask'].squeeze(0)   # (max_length,)
        offset_mapping = encoding['offset_mapping'].squeeze(0)   # (max_length, 2)
        
        ner_ids = []
        knowledge_ids = []
        current_word_idx = 0
        current_label = self.label2id["O"]  # default label

        # Align the labels with the subword tokens:
        for i, offsets in enumerate(offset_mapping):
            # Check if the token is the first subword of a word.
            if offsets[0] == 0 and offsets[1] != 0:
                if current_word_idx < len(tags):
                    current_label = self.label2id.get(tags[current_word_idx], self.label2id["O"])
                    ner_ids.append(current_label)
                    knowledge_ids.append(knowledge_feats[current_word_idx])
                else:
                    ner_ids.append(self.label2id["O"])
                    knowledge_ids.append(0)
                current_word_idx += 1
            else:
                # For subsequent subwords, replicate the label of the first subword.
                ner_ids.append(current_label)
                knowledge_ids.append(knowledge_feats[current_word_idx-1] if current_word_idx > 0 else 0)
        
        # Truncate lists if they exceed max_length
        ner_ids = ner_ids[:self.max_length]
        knowledge_ids = knowledge_ids[:self.max_length]
        
        ner_ids = torch.tensor(ner_ids, dtype=torch.long)
        knowledge_ids = torch.tensor(knowledge_ids, dtype=torch.float)
        
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'ner_labels': ner_ids,
            'knowledge_feats': knowledge_ids
        }



In [None]:
# In[7] - Instantiate Tokenizer and Create DataLoaders

# Update the model name if you prefer PubMedBERT; here we use a Bio/clinical model.
model_name = "allenai/scibert_scivocab_uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Update file paths to your converted JSONL files.
train_file = "/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Batch 1/combined_train_1.jsonl"
dev_file   = "/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Batch 1/combined_dev_1.jsonl"
test_file  = "/media/smartdragon/WORK/6th Semester/22AIE315 - Natural Language Processing/Project/Batch 1/combined_test_1.jsonl"

train_dataset = NERDataset(train_file, tokenizer, label2id, max_length=4)
dev_dataset   = NERDataset(dev_file, tokenizer, label2id, max_length=4)
test_dataset  = NERDataset(test_file, tokenizer, label2id, max_length=4)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
dev_loader   = DataLoader(dev_dataset, batch_size=64, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=64, shuffle=False)

print("Datasets sizes:", len(train_dataset), len(dev_dataset), len(test_dataset))


Datasets sizes: 153823 58785 99976


In [9]:
# In[8] - Define the Hybrid NER Model (Transformer + Optional BiLSTM + CRF)

class HybridNERModel(nn.Module):
    def __init__(self, 
                 transformer_name=model_name, 
                 hidden_dim=128,
                 num_ner_labels=len(label2id),
                 knowledge_feature_dim=1,
                 use_bilstm=True):
        super(HybridNERModel, self).__init__()
        
        # Transformer backbone
        self.transformer = AutoModel.from_pretrained(transformer_name)
        transformer_hidden_size = self.transformer.config.hidden_size
        
        # Combine transformer output with knowledge features
        self.feature_dim = transformer_hidden_size + knowledge_feature_dim
        
        # BiLSTM layer (optional)
        self.use_bilstm = use_bilstm
        self.hidden_dim = hidden_dim
        if self.use_bilstm:
            self.bilstm = nn.LSTM(
                input_size=self.feature_dim,
                hidden_size=self.hidden_dim,
                batch_first=True,
                bidirectional=True
            )
            lstm_out_dim = self.hidden_dim * 2
        else:
            lstm_out_dim = self.feature_dim
        
        # CRF layer for NER
        self.num_ner_labels = num_ner_labels
        self.ner_classifier = nn.Linear(lstm_out_dim, self.num_ner_labels)
        self.crf = CRF(self.num_ner_labels)
    
    def forward(self, input_ids, attention_mask, knowledge_features, labels_ner=None):
        outputs = self.transformer(input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state  # (B, T, H)
        
        # Ensure knowledge_features has shape (B, T, 1)
        if len(knowledge_features.shape) == 2:
            knowledge_features = knowledge_features.unsqueeze(-1)
        
        combined_input = torch.cat([last_hidden_state, knowledge_features], dim=-1)
        
        if self.use_bilstm:
            lstm_out, _ = self.bilstm(combined_input)  # (B, T, 2*hidden_dim)
        else:
            lstm_out = combined_input
        
        emissions = self.ner_classifier(lstm_out)  # (B, T, num_ner_labels)
        
        ner_loss = None
        if labels_ner is not None:
            # CRF expects emissions of shape (T, B, num_labels), so transpose
            emissions_t = emissions.transpose(0, 1)
            labels_t = labels_ner.transpose(0, 1)
            mask_t = attention_mask.bool().transpose(0, 1)
            ner_loss = -1 * self.crf(emissions_t, labels_t, mask=mask_t)
        
        return emissions, ner_loss
    
    def decode(self, emissions, attention_mask):
        # Transpose from (B, T, num_labels) to (T, B, num_labels)
        emissions_t = emissions.transpose(0, 1)
        # Transpose mask from (B, T) to (T, B)
        mask_t = attention_mask.bool().transpose(0, 1)

        # Use the CRF's decode method
        pred_sequences = self.crf.decode(emissions_t, mask=mask_t)
        return pred_sequences




In [10]:
# In[9] - Define Training and Evaluation Functions

from tqdm import tqdm

def train_one_epoch(model, dataloader, optimizer):
    model.train()
    total_loss = 0.0
    
    # Counters for token-level accuracy
    total_correct = 0
    total_tokens = 0
    
    for batch in tqdm(dataloader, desc="Training", leave=False):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        ner_labels = batch['ner_labels'].to(device)
        knowledge_feats = batch['knowledge_feats'].to(device)
        
        optimizer.zero_grad()
        emissions, ner_loss = model(input_ids, attention_mask, knowledge_feats, labels_ner=ner_labels)
        
        # Some CRF implementations return a vector per batch element.
        # Ensure we reduce to a scalar:
        if ner_loss is None:
            continue
        ner_loss = ner_loss.mean()
        
        ner_loss.backward()
        optimizer.step()
        
        total_loss += ner_loss.item()
        
        # --- Compute token-level accuracy ---
        with torch.no_grad():
            # CRF decode => list of predicted label sequences
            pred_sequences = model.decode(emissions, attention_mask)
            
            # Compare predictions to gold labels
            for preds, golds, mask in zip(pred_sequences, ner_labels, attention_mask):
                valid_len = mask.sum().item()  # number of real tokens
                preds = preds[:valid_len]      # slice predictions to valid length
                golds = golds[:valid_len]      # slice gold labels
                
                # Convert preds to a tensor (so we can do a direct == compare)
                preds_tensor = torch.tensor(preds, device=golds.device)
                
                # Count how many are correct
                correct = (preds_tensor == golds).sum().item()
                total_correct += correct
                total_tokens += valid_len
    
    avg_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_tokens if total_tokens > 0 else 0.0
    return avg_loss, accuracy


def evaluate(model, dataloader, id2label):
    model.eval()
    all_preds = []
    all_true = []
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            ner_labels = batch['ner_labels'].to(device)
            knowledge_feats = batch['knowledge_feats'].to(device)
            
            emissions, _ = model(input_ids, attention_mask, knowledge_feats, labels_ner=None)
            pred_sequences = model.decode(emissions, attention_mask)
            
            for preds, golds, mask in zip(pred_sequences, ner_labels, attention_mask):
                valid_len = mask.sum().item()
                preds = preds[:valid_len]
                golds = golds[:valid_len].cpu().numpy()
                pred_labels = [id2label[p] for p in preds]
                gold_labels = [id2label[g] if g != -100 else "O" for g in golds]
                all_preds.append(pred_labels)
                all_true.append(gold_labels)
    
    print("SeqEval Classification Report:")
    print(classification_report(all_true, all_preds))
    p = precision_score(all_true, all_preds)
    r = recall_score(all_true, all_preds)
    f1 = f1_score(all_true, all_preds)
    print(f"Precision: {p:.4f}, Recall: {r:.4f}, F1: {f1:.4f}")



In [11]:
# In[10] - Instantiate Model and Run Training Loop

model = HybridNERModel(
    transformer_name=model_name,
    hidden_dim=128,
    num_ner_labels=len(label2id),
    knowledge_feature_dim=1,
    use_bilstm=True
).to(device)

optimizer = optim.AdamW(model.parameters(), lr=2e-5)
num_epochs = 1

for epoch in range(num_epochs):
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer)
    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")
    print("Evaluation on Dev Set:")
    evaluate(model, dev_loader, id2label)


                                                             

Epoch 1, Train Loss: 14.1038, Train Accuracy: 0.9711
Epoch 1, Train Loss: 14.1038
Evaluation on Dev Set:
SeqEval Classification Report:
              precision    recall  f1-score   support

     Anatomy       0.59      0.61      0.60      6015

   micro avg       0.59      0.61      0.60      6015
   macro avg       0.59      0.61      0.60      6015
weighted avg       0.59      0.61      0.60      6015

Precision: 0.5922, Recall: 0.6148, F1: 0.6033


In [12]:
# In[11] - Final Evaluation on Test Set

print("Final Evaluation on Test Set:")
evaluate(model, test_loader, id2label)


Final Evaluation on Test Set:
SeqEval Classification Report:
              precision    recall  f1-score   support

     Anatomy       0.65      0.62      0.63     13118

   micro avg       0.65      0.62      0.63     13118
   macro avg       0.65      0.62      0.63     13118
weighted avg       0.65      0.62      0.63     13118

Precision: 0.6496, Recall: 0.6199, F1: 0.6344


In [13]:
# Save the model's state_dict and optimizer's state_dict
torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': train_loss,
}, "hybrid_ner_model_1.pt")