In [1]:
# Install required packages
!pip install transformers seqeval pandas seaborn scikit-learn

# Unzip the CADEC zip file (adjust the path if necessary)
!unzip /content/cadec.zip -d /content/cadec

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/cadec/cadec/meddra/LIPITOR.95.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.95.ann  
  inflating: /content/cadec/cadec/meddra/LIPITOR.969.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.969.ann  
  inflating: /content/cadec/cadec/meddra/LIPITOR.81.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.81.ann  
  inflating: /content/cadec/cadec/meddra/LIPITOR.941.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.941.ann  
  inflating: /content/cadec/cadec/meddra/LIPITOR.799.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.799.ann  
  inflating: /content/cadec/cadec/meddra/LIPITOR.955.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.955.ann  
  inflating: /content/cadec/cadec/meddra/LIPITOR.1000.ann  
  inflating: /content/cadec/__MACOSX/cadec/meddra/._LIPITOR.1000.ann  
  inflating: /content/cadec/cadec/

In [None]:
import os
import random
import json
import numpy as np
import torch
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from seqeval.metrics import precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

# 1. Data Identification and Sample Printing

def list_ann_files(cadec_folder):
    """
    Scans the CADEC folder for subdirectories: 'original', 'meddra', and 'sct'.
    Returns a dictionary mapping each annotation type to its list of .ann file paths.
    """
    annotation_types = ['original', 'meddra', 'sct']
    ann_files = {}
    for ann_type in annotation_types:
        folder_path = os.path.join(cadec_folder, ann_type)
        if not os.path.isdir(folder_path):
            print(f"[!] Directory not found for {ann_type}: {folder_path}")
            ann_files[ann_type] = []
            continue
        files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.ann')]
        ann_files[ann_type] = files
        print(f"Found {len(files)} .ann files in folder '{ann_type}'.")
    return ann_files

def print_sample_ann(file_path, num_lines=3):
    """
    Opens a .ann file and prints the first num_lines non-empty lines.
    """
    print(f"--- {os.path.basename(file_path)} ---")
    try:
        with open(file_path, "r", encoding="utf-8", errors="replace") as f:
            count = 0
            for line in f:
                line = line.strip()
                if line:
                    print(line)
                    count += 1
                if count >= num_lines:
                    break
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
    print()

# Define CADEC folder
CADEC_FOLDER = "/content/cadec/cadec"
ann_files_dict = list_ann_files(CADEC_FOLDER)
for ann_type, files in ann_files_dict.items():
    print(f"\n--- Samples from '{ann_type}' folder ---")
    for f in files[:3]:
        print_sample_ann(f)

# 2. Parsing Functions with Two Preprocessing Strategies

def parse_and_split_ann_line(line, override_label=None):
    """
    Splits a multi-span .ann line into individual annotation dictionaries.
    """
    parts = line.strip().split("\t")
    if len(parts) < 3:
        return []
    ann_id = parts[0]
    ann_info = parts[1]
    ann_text = parts[2]
    if " | " in ann_info:
        info_parts = ann_info.split(" | ")
        if len(info_parts) < 3:
            return []
        offsets_str = info_parts[2].strip()
        label_type = override_label if override_label is not None else info_parts[1].strip()
    else:
        info_parts = ann_info.split()
        if len(info_parts) < 3:
            return []
        label_type = override_label if override_label is not None else info_parts[0]
        offsets_str = " ".join(info_parts[1:])
    subspan_strs = offsets_str.split(";")
    subspans = []
    sub_id_suffix = 0
    for sspan in subspan_strs:
        sspan_parts = sspan.strip().split()
        if len(sspan_parts) != 2:
            continue
        try:
            start = int(sspan_parts[0])
            end = int(sspan_parts[1])
        except ValueError:
            continue
        sub_id_suffix += 1
        new_id = f"{ann_id}_{sub_id_suffix}"
        subspans.append({
            "id": new_id,
            "label": label_type,
            "start": start,
            "end": end,
            "text": ann_text
        })
    return subspans

def merge_ann_line(line, override_label=None):
    """
    Merges multi-span annotations by taking the minimum start and maximum end.
    Returns a single annotation dictionary.
    """
    parts = line.strip().split("\t")
    if len(parts) < 3:
        return []
    ann_id = parts[0]
    ann_info = parts[1]
    ann_text = parts[2]
    if " | " in ann_info:
        info_parts = ann_info.split(" | ")
        if len(info_parts) < 3:
            return []
        offsets_str = info_parts[2].strip()
        label_type = override_label if override_label is not None else info_parts[1].strip()
    else:
        info_parts = ann_info.split()
        if len(info_parts) < 3:
            return []
        label_type = override_label if override_label is not None else info_parts[0]
        offsets_str = " ".join(info_parts[1:])
    subspan_strs = offsets_str.split(";")
    starts, ends = [], []
    for sspan in subspan_strs:
        sspan_parts = sspan.strip().split()
        if len(sspan_parts) != 2:
            continue
        try:
            starts.append(int(sspan_parts[0]))
            ends.append(int(sspan_parts[1]))
        except ValueError:
            continue
    if not starts or not ends:
        return []
    merged_start = min(starts)
    merged_end = max(ends)
    return [{
        "id": ann_id,
        "label": override_label if override_label is not None else label_type,
        "start": merged_start,
        "end": merged_end,
        "text": ann_text
    }]

def parse_brat_files_strategy(text_folder, ann_folder, target_label=None, strategy="split"):
    """
    Reads .txt and .ann files and applies the specified preprocessing strategy.
    """
    data_entries = []
    ann_files = [f for f in os.listdir(ann_folder) if f.endswith(".ann")]
    text_files = set(os.listdir(text_folder))
    for ann_file in ann_files:
        base_name = os.path.splitext(ann_file)[0]
        txt_file = base_name + ".txt"
        if txt_file not in text_files:
            continue
        txt_path = os.path.join(text_folder, txt_file)
        ann_path = os.path.join(ann_folder, ann_file)
        with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
            text_content = f.read()
        annotations = []
        with open(ann_path, "r", encoding="utf-8", errors="replace") as f:
            for line in f:
                if not line.startswith("T"):
                    continue
                if strategy == "merge":
                    ann_list = merge_ann_line(line, override_label=target_label)
                else:
                    ann_list = parse_and_split_ann_line(line, override_label=target_label)
                annotations.extend(ann_list)
        data_entries.append({
            "id": base_name,
            "text": text_content,
            "annotations": annotations
        })
    return data_entries

# 3. Preprocessing: Tokenization & Label Alignment

def create_label_map(entity_types=None):
    if entity_types is None:
        entity_types = ["ADR"]
    labels = ["O"]
    for etype in entity_types:
        labels.append(f"B-{etype}")
        labels.append(f"I-{etype}")
    return {lab: i for i, lab in enumerate(labels)}

def char_labeling(text, annotations, entity_types):
    n = len(text)
    char_labels = ["O"] * n
    for ann in annotations:
        lbl = ann["label"]
        start = ann["start"]
        end = ann["end"]
        if lbl not in entity_types or start < 0 or end > n or start >= end:
            continue
        char_labels[start] = f"B-{lbl}"
        for i in range(start+1, end):
            char_labels[i] = f"I-{lbl}"
    return char_labels

def tokenize_and_align_labels(data_entries, model_name, label2id, entity_types, max_length=128, ignore_index=-100):
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
    all_input_ids, all_attention_masks, all_labels = [], [], []
    for entry in data_entries:
        text = entry["text"]
        annotations = entry["annotations"]
        char_labels = char_labeling(text, annotations, entity_types)
        encoding = tokenizer(text, max_length=max_length, padding="max_length", truncation=True, return_offsets_mapping=True)
        token_labels = []
        for (start, end) in encoding["offset_mapping"]:
            if start == end:
                token_labels.append(ignore_index)
            else:
                token_labels.append(label2id.get(char_labels[start], label2id["O"]))
        all_input_ids.append(encoding["input_ids"])
        all_attention_masks.append(encoding["attention_mask"])
        all_labels.append(token_labels)
    return {"input_ids": all_input_ids, "attention_mask": all_attention_masks, "labels": all_labels}

class CadecDataset(torch.utils.data.Dataset):
    def __init__(self, encodings):
        self.encodings = encodings
    def __len__(self):
        return len(self.encodings["input_ids"])
    def __getitem__(self, idx):
        return {
            "input_ids": torch.tensor(self.encodings["input_ids"][idx]),
            "attention_mask": torch.tensor(self.encodings["attention_mask"][idx]),
            "labels": torch.tensor(self.encodings["labels"][idx])
        }

# 4. Training & Evaluation Functions

def build_id2label(label2id):
    return {v: k for k, v in label2id.items()}

def compute_metrics_factory(label2id):
    id2label = build_id2label(label2id)
    def compute_metrics(p):
        predictions = np.argmax(p.predictions, axis=2)
        labels = p.label_ids
        true_labels, pred_labels = [], []
        for pred_seq, gold_seq in zip(predictions, labels):
            curr_true, curr_pred = [], []
            for p_i, g_i in zip(pred_seq, gold_seq):
                if g_i == -100:
                    continue
                curr_true.append(id2label[g_i])
                curr_pred.append(id2label[p_i])
            true_labels.append(curr_true)
            pred_labels.append(curr_pred)
        precision = precision_score(true_labels, pred_labels)
        recall = recall_score(true_labels, pred_labels)
        f1 = f1_score(true_labels, pred_labels)
        return {"precision": precision, "recall": recall, "f1": f1}
    return compute_metrics

def train_ner_model(train_dataset, dev_dataset, label2id, model_name="bert-base-uncased",
                    output_dir="./model_out", epochs=3, batch_size=8, ignore_mismatched_sizes=False):
    num_labels = len(label2id)
    model = AutoModelForTokenClassification.from_pretrained(
        model_name, num_labels=num_labels, ignore_mismatched_sizes=ignore_mismatched_sizes
    )
    training_args = TrainingArguments(
        output_dir=output_dir,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        num_train_epochs=epochs,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        logging_steps=10,
        logging_dir=f"{output_dir}/logs",
        save_total_limit=1,
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        greater_is_better=True
    )
    compute_metrics = compute_metrics_factory(label2id)
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=dev_dataset,
        compute_metrics=compute_metrics
    )
    trainer.train()
    return trainer

# 5. Extended Experiment Pipeline

def split_data(data, train_ratio=0.7, dev_ratio=0.15):
    random.shuffle(data)
    n = len(data)
    train_end = int(train_ratio * n)
    dev_end = train_end + int(dev_ratio * n)
    return data[:train_end], data[train_end:dev_end], data[dev_end:]

def run_extended_experiments():
    """
    Runs experiments across annotation schemes (original, meddra, sct),
    preprocessing strategies (split and merge), and models.
    """
    # Define folder paths for text and annotation sets
    text_folder = os.path.join(CADEC_FOLDER, "text")
    annotation_sets = {
        "original": os.path.join(CADEC_FOLDER, "original"),
        "meddra": os.path.join(CADEC_FOLDER, "meddra"),
        "sct": os.path.join(CADEC_FOLDER, "sct")
    }
    target_label = "ADR"
    strategies = ["split", "merge"]
    # model list
    models = {
        "BERT": "bert-base-uncased",
        "BioBERT": "dmis-lab/biobert-v1.1",
        "ClinicalBERT": "emilyalsentzer/Bio_ClinicalBERT",
        "SpanBERT_finetuned": "abhibisht89/spanbert-large-cased-finetuned-ade_corpus_v2",
        "SpanBERT_normal": "SpanBERT/spanbert-large-cased",
        "RoBERTa": "roberta-base"
    }

    entity_types = [target_label]
    label2id = create_label_map(entity_types)
    results = {}

    for ann_name, ann_folder in annotation_sets.items():
        print(f"\n=== Loading {ann_name} annotations ===")
        for strategy in strategies:
            print(f"\n--- Processing {ann_name} with {strategy} strategy ---")
            data_entries = parse_brat_files_strategy(text_folder, ann_folder, target_label=target_label, strategy=strategy)
            print(f"Parsed {len(data_entries)} documents.")
            train_data, dev_data, test_data = split_data(data_entries)
            print(f"Train: {len(train_data)}, Dev: {len(dev_data)}, Test: {len(test_data)}")

            for model_key, model_ckpt in models.items():
                print(f"\n--- Training {model_key} on {ann_name} ({strategy} strategy) ---")
                train_enc = tokenize_and_align_labels(train_data, model_ckpt, label2id, entity_types)
                dev_enc = tokenize_and_align_labels(dev_data, model_ckpt, label2id, entity_types)
                test_enc = tokenize_and_align_labels(test_data, model_ckpt, label2id, entity_types)

                train_ds = CadecDataset(train_enc)
                dev_ds = CadecDataset(dev_enc)
                test_ds = CadecDataset(test_enc)

                out_dir = f"./{ann_name}_{model_key}_{strategy}_results"
                ignore_flag = True if model_key in ["SpanBERT_finetuned", "SpanBERT_normal"] else False

                trainer = train_ner_model(
                    train_dataset=train_ds,
                    dev_dataset=dev_ds,
                    label2id=label2id,
                    model_name=model_ckpt,
                    output_dir=out_dir,
                    epochs=3,
                    batch_size=8,
                    ignore_mismatched_sizes=ignore_flag
                )

                dev_metrics = trainer.evaluate(dev_ds)
                test_metrics = trainer.evaluate(test_ds)
                key = f"{ann_name}_{strategy}_{model_key}"
                results[key] = {"dev": dev_metrics, "test": test_metrics}

    with open("extended_results.json", "w") as f:
        json.dump(results, f, indent=2)

    print("\n=== Extended Experiment Results ===")
    for k, v in results.items():
        print(f"{k} => {v}")
    return results

ext_results = run_extended_experiments()

