### Stage 2 – Entity Linker (SNOMED CT)

This notebook performs candidate generation and selection for linking NER spans to SNOMED CT concepts.

Checklist
- [x] Candidate Generation: exact + fuzzy, collect top-N with scores
- [x] Candidate Selection: exact > best-score, optional rules
- [x] Evaluation: link accuracy on spans that exactly match gold
- [x] Output: JSONL candidates and CSV final links

Inputs
- Predicted spans from `A2/ner.ipynb` (`ner_predictions.csv`)
- Notes and gold span annotations for evaluation
- SNOMED dictionary (concept_id, preferred term, synonyms)

If a SNOMED dictionary file is not present, the notebook falls back to a lexicon built from training annotations (surface form → concept_id).


In [1]:
import json
import pandas as pd
import numpy as np
from pathlib import Path

DATA_DIR = Path('/Users/aryamanbahl/Desktop/IIITH/M25/NLP-H/Assignments /nlph_assignments-/A2/Assignment 2 Dataset')
PRED_SPANS = DATA_DIR.parent / 'ner_predictions.csv'
TRAIN_NOTES = DATA_DIR / 'training-notes.csv'
TRAIN_ANN = DATA_DIR / 'train-annotations.csv'
TEST_NOTES = DATA_DIR / 'test-notes.csv'
TEST_ANN = DATA_DIR / 'test-annotations.csv'

train_notes = pd.read_csv(TRAIN_NOTES)
train_ann = pd.read_csv(TRAIN_ANN)
test_notes = pd.read_csv(TEST_NOTES)
test_ann = pd.read_csv(TEST_ANN)
preds = pd.read_csv(PRED_SPANS)

print('Loaded:', preds.shape, train_notes.shape, train_ann.shape, test_notes.shape, test_ann.shape)



Loaded: (21264, 4) (204, 2) (51574, 4) (68, 2) (23234, 4)


In [2]:
# SNOMED dictionary loader
# Expected CSV format: snomed_id, term, synonyms (| separated)
SNOMED_CSV = DATA_DIR.parent / 'snomed_dictionary.csv'

def load_snomed_dict(path: Path):
    if path.exists():
        df = pd.read_csv(path)
        df['synonyms'] = df.get('synonyms', '').fillna('').astype(str)
        rows = []
        for _, r in df.iterrows():
            cid = str(r['snomed_id'])
            term = str(r['term'])
            rows.append({'snomed_id': cid, 'term': term})
            syns = [s.strip() for s in r['synonyms'].split('|') if s.strip()]
            for s in syns:
                rows.append({'snomed_id': cid, 'term': s})
        dict_df = pd.DataFrame(rows)
        print('Loaded SNOMED dictionary terms:', dict_df.shape)
        return dict_df
    else:
        # Fallback lexicon from training annotations (surface → concept_id)
        notes_map = train_notes.set_index('note_id')['text'].to_dict()
        rows = []
        for _, r in train_ann.iterrows():
            nid = r['note_id']
            s, e = int(r['start']), int(r['end'])
            mention = notes_map.get(nid, '')[s:e].strip()
            if mention:
                rows.append({'snomed_id': str(r['concept_id']), 'term': mention})
        dict_df = pd.DataFrame(rows).drop_duplicates()
        print('Built fallback lexicon terms:', dict_df.shape)
        return dict_df

snomed_terms = load_snomed_dict(SNOMED_CSV)


Built fallback lexicon terms: (12484, 2)


In [3]:
import re
import string
from typing import List, Dict, Tuple

_punct_tbl = str.maketrans('', '', string.punctuation)
_whitespace_re = re.compile(r"\s+")

def normalize(text: str) -> str:
    text = text.lower()
    text = text.translate(_punct_tbl)
    text = _whitespace_re.sub(' ', text).strip()
    return text

def tokenize(text: str) -> List[str]:
    return [t for t in _whitespace_re.split(text) if t]

# Pre-normalize dictionary terms
snomed_terms['norm_term'] = snomed_terms['term'].astype(str).map(normalize)
snomed_terms = snomed_terms.drop_duplicates(['snomed_id', 'norm_term']).reset_index(drop=True)

# Inverted index by surface form for exact lookups
exact_index: Dict[str, List[Tuple[str, str]]] = {}
for _, r in snomed_terms.iterrows():
    exact_index.setdefault(r['norm_term'], []).append((r['snomed_id'], r['term']))



In [4]:
def jaccard(a: List[str], b: List[str]) -> float:
    sa, sb = set(a), set(b)
    if not sa and not sb:
        return 0.0
    return len(sa & sb) / max(1, len(sa | sb))

# Levenshtein distance with DP; return similarity in [0,1]
def levenshtein_sim(a: str, b: str) -> float:
    if a == b:
        return 1.0
    if len(a) == 0 or len(b) == 0:
        return 0.0
    n, m = len(a), len(b)
    dp = list(range(m + 1))
    for i in range(1, n + 1):
        prev = dp[0]
        dp[0] = i
        for j in range(1, m + 1):
            cur = dp[j]
            cost = 0 if a[i-1] == b[j-1] else 1
            dp[j] = min(
                dp[j] + 1,      # deletion
                dp[j-1] + 1,    # insertion
                prev + cost     # substitution
            )
            prev = cur
    dist = dp[m]
    return 1.0 - (dist / max(n, m))



In [5]:
from collections import defaultdict

def generate_candidates(span_text: str, top_n: int = 5):
    norm = normalize(span_text)
    # Exact matches
    if norm in exact_index:
        cands = [{'snomed_id': cid, 'term': term, 'score': 1.0} for cid, term in exact_index[norm]]
        return sorted(cands, key=lambda x: -x['score'])[:top_n]
    # Fuzzy: compute combo score = 0.6 * levenshtein + 0.4 * jaccard tokens
    toks_q = tokenize(norm)
    scores = []
    for _, r in snomed_terms.iterrows():
        t = r['norm_term']
        lev = levenshtein_sim(norm, t)
        jac = jaccard(toks_q, tokenize(t))
        score = 0.6 * lev + 0.4 * jac
        if score > 0:
            scores.append({'snomed_id': r['snomed_id'], 'term': r['term'], 'score': float(score)})
    scores.sort(key=lambda x: -x['score'])
    return scores[:top_n]



In [None]:
def select_best(candidates: List[dict]):
    if not candidates:
        return None
    # exact match has score==1.0 by construction
    candidates = sorted(candidates, key=lambda x: (-x['score'], x['snomed_id']))
    return candidates[0]



In [6]:
# Build text map for span extraction
notes_map_test = test_notes.set_index('note_id')['text'].to_dict()

# Generate candidates for all predicted spans
cand_records = []
for _, r in preds.iterrows():
    nid = r['note_id']
    s, e = int(r['start']), int(r['end'])
    span = notes_map_test.get(nid, '')[s:e]
    cands = generate_candidates(span, top_n=5)
    cand_records.append({
        'note_id': nid,
        'start': s,
        'end': e,
        'span': span,
        'candidates': cands
    })

# Save candidates as JSONL
cand_path = DATA_DIR.parent / 'linker_candidates.jsonl'
with open(cand_path, 'w') as f:
    for rec in cand_records:
        f.write(json.dumps(rec) + '\n')
print('Saved candidates to', cand_path)

# Select best candidate per span
linked_rows = []
for rec in cand_records:
    best = select_best(rec['candidates'])
    linked_rows.append({
        'note_id': rec['note_id'],
        'start': rec['start'],
        'end': rec['end'],
        'span': rec['span'],
        'snomed_id': best['snomed_id'] if best else None,
        'matched_term': best['term'] if best else None,
        'score': best['score'] if best else None
    })

linked_df = pd.DataFrame(linked_rows)
link_path = DATA_DIR.parent / 'linker_links.csv'
linked_df.to_csv(link_path, index=False)
print('Saved links to', link_path)


KeyboardInterrupt: 

In [None]:
# Evaluation: compare linked concept_id to gold if spans exactly match a gold span
# Build gold mapping (test set): (note_id, start, end) -> concept_id

gold_map = {}
for _, r in test_ann.iterrows():
    key = (int(r['note_id']), int(r['start']), int(r['end']))
    gold_map[key] = str(r['concept_id'])

num, correct = 0, 0
errors = []
for _, r in linked_df.iterrows():
    key = (int(r['note_id']), int(r['start']), int(r['end']))
    pred_cid = None if pd.isna(r['snomed_id']) else str(r['snomed_id'])
    if key in gold_map:
        num += 1
        if pred_cid == gold_map[key]:
            correct += 1
        else:
            errors.append({
                'note_id': r['note_id'],
                'start': r['start'],
                'end': r['end'],
                'span': r['span'],
                'pred_snomed_id': pred_cid,
                'gold_concept_id': gold_map[key],
                'matched_term': r['matched_term'],
                'score': r['score']
            })

acc = (correct / num) if num else 0.0
print(f'Exact-span linking accuracy: {acc:.4f}  (N={num})')

# Store detailed errors for inspection
err_path = DATA_DIR.parent / 'linker_errors.jsonl'
with open(err_path, 'w') as f:
    for e in errors:
        f.write(json.dumps(e) + '\n')
print('Saved errors to', err_path)


#### Summary

- Candidate Generation: normalized exact lookup + fuzzy scoring (0.6·Levenshtein + 0.4·Jaccard over tokens). Top-5 retained per span.
- Candidate Selection: prefer exact matches (score 1.0), else pick highest score. Deterministic tie-breaker on concept ID.
- Evaluation: exact-span accuracy against gold `concept_id` on test spans with the same (note_id, start, end).
- Outputs:
  - `linker_candidates.jsonl`: per span list of `{snomed_id, term, score}`.
  - `linker_links.csv`: best link per span with score.
  - `linker_errors.jsonl`: mismatches for analysis.

Notes
- If `snomed_dictionary.csv` is not present, we fall back to a training-derived surface→concept lexicon.
- Improvements: add semantic tag filtering, abbreviation expansion, BM25/TF-IDF retrieval or embedding-based retrieval for better candidate recall, and supervised re-ranking.

