In [1]:
import os
from collections import defaultdict

def aggregate_ann_entities(base_dir):
    ann_dir = os.path.join(base_dir, 'original')
    entities = defaultdict(set)
    
    if not os.path.exists(ann_dir):
        print(f" Error: Directory 'original' not found at {ann_dir}")
        return None

    allowed_labels = {"ADR", "Drug", "Disease", "Symptom"}  

    for file in os.listdir(ann_dir):
        if file.endswith('.ann'):
            full_path = os.path.join(ann_dir, file)
            with open(full_path, 'r', encoding='utf-8') as stream:
                for line in stream:
                    if line.startswith('#') or not line.strip():
                        continue
                    parts = line.strip().split('\t')
                    if len(parts) >= 3:
                        cat = parts[1].split()[0]
                        text_val = parts[2]
                        if cat in allowed_labels:   
                            entities[cat].add(text_val.lower())
    return entities

results = aggregate_ann_entities('.')
if results is not None:
    print("Distinct Entity Counts")
    print("=" * 28)
    for k, val in sorted(results.items()):
        print(f"Label: {k:<10} | Distinct Entities: {len(val)}")


Distinct Entity Counts
Label: ADR        | Distinct Entities: 3400
Label: Disease    | Distinct Entities: 164
Label: Drug       | Distinct Entities: 323
Label: Symptom    | Distinct Entities: 148


In [2]:
import os
from transformers import pipeline

#  Load pretrained biomedical NER model 
model_id = "d4data/biomedical-ner-all"
ner_pipeline = pipeline("token-classification", model=model_id, aggregation_strategy="simple")

# Map labels 
translation = {
    'Sign_symptom': 'Symptom',
    'Disease_disorder': 'Disease',
    'Adverse_drug_event': 'ADR',
    'Medication': 'Drug'
}

#  BIO tagging function 
def convert_to_bio(text, predictions):
    bio_output = []
    # Create a list of 'O' for each token (initialize as Outside)
    tokens = text.split()
    labels = ["O"] * len(tokens)

    for pred in predictions:
        label = pred["label"]
        start, end, entity_text = pred["start"], pred["end"], pred["text"]

        # find matching tokens
        for i, tok in enumerate(tokens):
            # crude alignment by checking substring presence
            if entity_text.lower() in tok.lower():
                if labels[i] == "O":  # only overwrite if still O
                    labels[i] = f"B-{label}"
                else:
                    labels[i] = f"I-{label}"

    # Combine tokens with BIO labels
    for t, l in zip(tokens, labels):
        bio_output.append(f"{t}\t{l}")
    return "\n".join(bio_output)


#  Run prediction on sample file 
sample_txt = 'ARTHROTEC.1.txt'
text_folder = os.path.join('.', 'text')
text_path = os.path.join(text_folder, sample_txt)

with open(text_path, 'r', encoding='utf-8') as file:
    sample_data = file.read()

raw_results = ner_pipeline(sample_data)

# Map results 
mapped_predictions = []
for item in raw_results:
    new_label = translation.get(item['entity_group'])
    if new_label:
        pred = {
            "label": new_label,
            "start": item['start'],
            "end": item['end'],
            "text": item['word']
        }
        mapped_predictions.append(pred)

#  Show first 10 predictions 
print("\n Sample Predictions (first 10):")
print("=" * 40)
for i, p in enumerate(mapped_predictions[:10], start=1):
    print(f"T{i}\t{p['label']} {p['start']} {p['end']}\t{p['text']}")

# Convert predictions to BIO format 
print("\n BIO Output:")
print("=" * 40)
bio_format = convert_to_bio(sample_data, mapped_predictions)
print(bio_format[:500])  # show first 500 chars


Device set to use cpu



 Sample Predictions (first 10):
T1	Symptom 13 15	dr
T2	Symptom 15 19	##owsy
T3	Symptom 36 43	blurred
T4	Disease 179 188	arthritis
T5	Symptom 412 417	pains

 BIO Output:
I	O
feel	O
a	O
bit	O
drowsy	B-Symptom
&	O
have	O
a	O
little	O
blurred	B-Symptom
vision,	O
so	O
far	O
no	O
gastric	O
problems.	O
I've	O
been	O
on	O
Arthrotec	O
50	O
for	O
over	O
10	O
years	O
on	O
and	O
off,	O
only	O
taking	O
it	O
when	O
I	O
needed	O
it.	O
Due	O
to	O
my	O
arthritis	B-Disease
getting	O
progressively	O
worse,	O
to	O
the	O
point	O
where	O
I	O
am	O
in	O
tears	O
with	O
the	O
agony,	O
gp's	O
started	O
me	O
on	O
75	O
twice	O
a	O
day	O
and	O
I	O
have	O
to	O
take	O
it.	O
every	O
day	O
for	


In [3]:
import os
from collections import defaultdict

print(" Task 3 & 4")

def load_ground_truth(file_path):
    gt_set = set()
    if not os.path.exists(file_path):
        print(f"Warning: Ground truth file missing at {file_path}")
        return gt_set
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            if line.startswith('T'):
                parts = line.strip().split('\t')
                if len(parts) > 1:
                    details = parts[1].split()
                    if len(details) >= 3:
                        label = details[0]
                        start, end = int(details[1]), int(details[-1])
                        gt_set.add((label, start, end))
    return gt_set

def compute_metrics(preds, truths):
    # Convert predictions into set for exact matching
    pred_set = {(p['label'], p['start'], p['end']) for p in preds}

    tp = len(pred_set.intersection(truths))
    fp = len(pred_set - truths)
    fn = len(truths - pred_set)
    
    precision_val = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall_val = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_val = 2 * precision_val * recall_val / (precision_val + recall_val) if (precision_val + recall_val) > 0 else 0
    
    return {"precision": precision_val, "recall": recall_val, "f1_score": f1_val}

def compute_metrics_per_label(preds, truths):
    # Group ground truths by label
    truth_by_label = defaultdict(set)
    for t in truths:
        truth_by_label[t[0]].add(t)

    # Group predictions by label
    pred_by_label = defaultdict(set)
    for p in preds:
        pred_by_label[p['label']].add((p['label'], p['start'], p['end']))

    results = {}
    for label in set(list(truth_by_label.keys()) + list(pred_by_label.keys())):
        tp = len(pred_by_label[label].intersection(truth_by_label[label]))
        fp = len(pred_by_label[label] - truth_by_label[label])
        fn = len(truth_by_label[label] - pred_by_label[label])
        
        prec = tp / (tp + fp) if (tp + fp) > 0 else 0
        rec = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * prec * rec / (prec + rec) if (prec + rec) > 0 else 0
        results[label] = {"precision": prec, "recall": rec, "f1_score": f1}
    return results


# Run on single file 
gt_filename = 'ARTHROTEC.1.ann'
ann_directory = os.path.join('.', 'original')
gt_file_path = os.path.join(ann_directory, gt_filename)
ground_truth = load_ground_truth(gt_file_path)

# Overall metrics
metrics = compute_metrics(mapped_predictions, ground_truth)
print(f"\n Performance for '{gt_filename}' (All Entities):")
print(f"  - Precision: {metrics['precision']:.2%}")
print(f"  - Recall:    {metrics['recall']:.2%}")
print(f"  - F1-Score:  {metrics['f1_score']:.2%}")

# Per-label metrics
print("\n Per-Label Performance:")
metrics_per_label = compute_metrics_per_label(mapped_predictions, ground_truth)
for lbl, vals in metrics_per_label.items():
    print(f"  {lbl:<8} | P: {vals['precision']:.2%}  R: {vals['recall']:.2%}  F1: {vals['f1_score']:.2%}")


 Task 3 & 4

 Performance for 'ARTHROTEC.1.ann' (All Entities):
  - Precision: 40.00%
  - Recall:    25.00%
  - F1-Score:  30.77%

 Per-Label Performance:
  Disease  | P: 100.00%  R: 100.00%  F1: 100.00%
  Symptom  | P: 25.00%  R: 50.00%  F1: 33.33%
  Drug     | P: 0.00%  R: 0.00%  F1: 0.00%
  ADR      | P: 0.00%  R: 0.00%  F1: 0.00%


In [4]:
import os
import random
from tqdm import tqdm

print("\n Running Task 5: Performance on 50 Random Posts ")

def parse_ground_truth(filepath):
    """Reads a .ann file and returns a set of (label, start, end) tuples."""
    ground_truths = set()
    if not os.path.exists(filepath):
        print(f"Warning: File {filepath} not found.")
        return ground_truths
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            if line.startswith('T'):
                parts = line.strip().split('\t')
                if len(parts) > 1:
                    label_info = parts[1].split()
                    if len(label_info) >= 3:
                        label = label_info[0]
                        start = int(label_info[1])
                        end = int(label_info[-1])
                        ground_truths.add((label, start, end))
    return ground_truths

# Directories 
text_dir = os.path.join('.', 'text')
original_dir = os.path.join('.', 'original')

# Retrieve the list of .txt files
all_files = [f for f in os.listdir(text_dir) if f.endswith('.txt')]
random.seed(42)  # for reproducibility
sample_size = min(50, len(all_files))
selected_files = random.sample(all_files, sample_size)

total_tp, total_fp, total_fn = 0, 0, 0

for filename in tqdm(selected_files, desc="Evaluating Files"):
    # Read the current text file
    file_path = os.path.join(text_dir, filename)
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    
    # Get model predictions using ner_pipeline (from Task 2)
    predictions = ner_pipeline(text)
    
    # Format predictions into a set of (label, start, end) tuples
    formatted_preds = set()
    for entity in predictions:
        entity_label = translation.get(entity['entity_group'])
        if entity_label:
            formatted_preds.add((entity_label, entity['start'], entity['end']))
    
    # Obtain ground truth from the corresponding .ann file
    ann_filename = filename.replace('.txt', '.ann')
    gt_filepath = os.path.join(original_dir, ann_filename)
    ground_truths = parse_ground_truth(gt_filepath)
    
    # Update true positives, false positives, and false negatives
    total_tp += len(formatted_preds.intersection(ground_truths))
    total_fp += len(formatted_preds - ground_truths)
    total_fn += len(ground_truths - formatted_preds)

# Calculate micro-averaged metrics
overall_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0
overall_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0
overall_f1 = (2 * overall_precision * overall_recall / (overall_precision + overall_recall)
              if (overall_precision + overall_recall) > 0 else 0)

print(f"\n Overall Performance on {sample_size} Random Files:")
print(f"  - Precision: {overall_precision:.2%}")
print(f"  - Recall:    {overall_recall:.2%}")
print(f"  - F1-Score:  {overall_f1:.2%}")


 Running Task 5: Performance on 50 Random Posts 


Evaluating Files: 100%|██████████| 50/50 [00:09<00:00,  5.34it/s]


 Overall Performance on 50 Random Files:
  - Precision: 7.20%
  - Recall:    5.28%
  - F1-Score:  6.09%





In [5]:
import os
from fuzzywuzzy import fuzz
from sentence_transformers import SentenceTransformer, util

print("\n Running Task 6: Combining Data and Entity Matching ")

# Check that mapped_predictions is available
try:
    _ = mapped_predictions
except NameError:
    raise NameError("mapped_predictions is not defined.")

# Define file paths
sample_filename_ann = 'ARTHROTEC.1.ann'
sct_dir = os.path.join('cadec', 'sct')
original_dir = os.path.join('cadec', 'original')
sct_file = os.path.join(sct_dir, sample_filename_ann)
original_file = os.path.join(original_dir, sample_filename_ann)

def create_combined_data_structure(original_ann_path, sct_ann_path):
    
    original_data = {}
    if os.path.exists(original_ann_path):
        with open(original_ann_path, 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('T'):
                    parts = line.strip().split('\t')
                    original_data[parts[0]] = {
                        'label_type': parts[1].split()[0],
                        'text_segment': parts[2]
                    }

    combined_list = []
    if os.path.exists(sct_ann_path):
        with open(sct_ann_path, 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('T'):
                    parts = line.strip().split('\t')
                    t_id = parts[0]
                    if t_id in original_data:
                        try:
                            info = parts[1].split('"')
                            code = info[0].split()[-1]
                            description = info[1]
                            entry = original_data[t_id]
                            entry['snomed_code'] = code
                            entry['snomed_description'] = description
                            combined_list.append(entry)
                        except IndexError:
                            continue  # Skip malformed SCT lines
    return combined_list

# Merge data
combined_data = create_combined_data_structure(original_file, sct_file)

# Ground truth ADRs (with SNOMED mappings)
ground_truth_adrs = [d for d in combined_data if d['label_type'] == 'ADR']

# Model predictions ADRs
model_predicted_adrs = [p for p in mapped_predictions if p['label'] == 'ADR']

# Guard checks
if not model_predicted_adrs:
    print("Model found no ADRs in the sample text. Cannot perform matching for this file.")
elif not ground_truth_adrs:
    print("No ground truth ADRs with SNOMED codes found in the sample file. Cannot perform matching.")
else:
    snomed_descriptions = [d['snomed_description'].strip().lower() for d in ground_truth_adrs if 'snomed_description' in d]
    if not snomed_descriptions:
        print("Ground truth ADRs exist but no valid SNOMED descriptions found.")
    else:
        print("Loading embedding model (this may take a moment)...")
        embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

        # Encode all descriptions at once
        description_embeddings = embedding_model.encode(snomed_descriptions, convert_to_tensor=True)

        # Batch encode predictions
        pred_texts = [p['text'].strip().lower() for p in model_predicted_adrs]
        pred_embeddings = embedding_model.encode(pred_texts, convert_to_tensor=True)

        print("Models ready. Performing matching...")

        # Iterate over predictions and compare
        for i, adr_pred in enumerate(model_predicted_adrs):
            pred_text = pred_texts[i]
            print(f"\n--- Matching for Predicted ADR: '{adr_pred['text']}' ---")

            # a) Fuzzy String Matching
            fuzzy_scores = [fuzz.token_set_ratio(pred_text, desc) for desc in snomed_descriptions]
            best_fuzzy_idx = int(max(range(len(fuzzy_scores)), key=fuzzy_scores.__getitem__))
            best_fuzzy_match = ground_truth_adrs[best_fuzzy_idx]

            # b) Embedding Matching
            cosine_scores = util.cos_sim(pred_embeddings[i], description_embeddings)[0]
            best_embedding_idx = int(cosine_scores.argmax())
            best_embedding_match = ground_truth_adrs[best_embedding_idx]

            print("  String Match Result:    '{}' (Score: {})".format(
                best_fuzzy_match["snomed_description"], fuzzy_scores[best_fuzzy_idx]))
            print("  Embedding Match Result: '{}' (Score: {:.2f})".format(
                best_embedding_match["snomed_description"], float(cosine_scores[best_embedding_idx])))

            if best_fuzzy_match['snomed_code'] == best_embedding_match['snomed_code']:
                print("  Comparison: Both methods agree.")
            else:
                print("  Comparison: Methods disagree. Embedding match is often semantically superior.")



 Running Task 6: Combining Data and Entity Matching 
Model found no ADRs in the sample text. Cannot perform matching for this file.
