In [22]:
import re
import json
import os
from typing import List, Dict, Any

In [23]:
!pip install pdfplumber



In [24]:
import pdfplumber
import pandas as pd
import numpy as np

In [25]:
!pip install faiss-cpu



In [26]:
import spacy
from sentence_transformers import SentenceTransformer
import faiss

In [27]:
# --------- Config ---------
ICD_XLSX = "/content/ICD_code_Assignment.xlsx"
CPT_XLSX = "/content/cpt_code_assignment.xlsx"
INPUT_PDF = "/content/Input data for Assignment (1).pdf" # PDF that contains the 4 reports
OUTPUT_JSON = "results.json"
EMBED_MODEL_NAME = "all-MiniLM-L6-v2"
TOP_K = 5

In [28]:
def extract_text_from_pdf(path: str) -> str:
    texts = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            texts.append(page.extract_text() or "")
    return "\n".join(texts)

In [29]:
# --------- Preprocessing & simple rule-based entity extraction ---------


nlp = spacy.load("en_core_web_sm")
embed_model = SentenceTransformer(EMBED_MODEL_NAME)


ICD_CODE_REGEX = re.compile(r"\b[A-Z]\d{1,3}(?:\.\d+)?\b")
CPT_CODE_REGEX = re.compile(r"\b\d{4,5}\b")

In [30]:
def preprocess_text(text: str) -> str:
    """Minimal cleaning for clinical notes"""
    text = text.replace('\r', '\n')
    text = re.sub(r"\s+", " ", text)
    return text.strip()

In [31]:
# Define regex patterns for ICD and CPT codes
ICD_CODE_REGEX = r"\b[A-TV-Z][0-9][0-9AB]\.?[0-9A-TV-Z]{0,4}\b"
CPT_CODE_REGEX = r"\b\d{5}\b"

def extract_entities(text: str) -> Dict[str, Any]:
    """Combination of spaCy NER + regex heuristics to extract diagnoses, procedures, and codes."""
    doc = nlp(text)
    entities = {
        "DIAGNOSIS": [],
        "PROCEDURE": [],
        "ANATOMY": [],
        "ICD_CODES_IN_TEXT": [],
        "CPT_CODES_IN_TEXT": []
    }

    # --- 1️⃣ spaCy-based extraction ---
    for ent in doc.ents:
        if ent.label_ in {"DISEASE", "PROBLEM", "CONDITION"}:
            entities['DIAGNOSIS'].append(ent.text)
        elif ent.label_ in {"ANATOMY", "ORGANS", "GPE"}:
            entities['ANATOMY'].append(ent.text)

    # --- 2️⃣ Regex-based ICD/CPT extraction ---
    entities['ICD_CODES_IN_TEXT'] = list(set(re.findall(ICD_CODE_REGEX, text)))
    entities['CPT_CODES_IN_TEXT'] = list(set(re.findall(CPT_CODE_REGEX, text)))

    # --- 3️⃣ Heuristic extraction for diagnosis/procedure descriptions ---
    for line in text.split('\n'):
        ln = line.strip()
        if not ln:
            continue
        low = ln.lower()

        # Diagnosis-related keywords
        if any(k in low for k in ["diagnosis", "pre-operative diagnosis", "post-operative diagnosis", "impression", "indication"]):
            entities['DIAGNOSIS'].append(ln)

        # Procedure-related keywords
        if any(k in low for k in ["procedure", "colonoscopy", "egd", "endoscopy", "biopsy"]):
            entities['PROCEDURE'].append(ln)

        # Anatomy-related keywords
        if any(k in low for k in ["rectum", "colon", "esophagus", "stomach", "anus", "duodenum", "cecum"]):
            entities['ANATOMY'].append(ln)

    # --- 4️⃣ Deduplicate entities (preserve order) ---
    for k in ['DIAGNOSIS', 'PROCEDURE', 'ANATOMY']:
        seen = set()
        out = []
        for v in entities[k]:
            if v not in seen:
                out.append(v)
                seen.add(v)
        entities[k] = out

    return entities


In [32]:
# --------- Load ICD and CPT references and build vector index ---------

def load_codes_from_excel(icd_path: str, cpt_path: str):
    icd_df = pd.read_excel(icd_path)
    cpt_df = pd.read_excel(cpt_path)

    # Expect columns like: Code | Description
    # Normalize column names heuristically
    icd_df.columns = [c.strip() for c in icd_df.columns]
    cpt_df.columns = [c.strip() for c in cpt_df.columns]

    # Try to find probable column names
    def find_col(df, choices):
        for c in df.columns:
            if any(ch.lower() in c.lower() for ch in choices):
                return c
        return df.columns[0]

    icd_code_col = find_col(icd_df, ['code', 'icd'])
    icd_desc_col = find_col(icd_df, ['description', 'desc'])
    cpt_code_col = find_col(cpt_df, ['code', 'cpt'])
    cpt_desc_col = find_col(cpt_df, ['description', 'desc'])

    icd_df = icd_df[[icd_code_col, icd_desc_col]].rename(
        columns={icd_code_col: 'code', icd_desc_col: 'description'}
    )
    cpt_df = cpt_df[[cpt_code_col, cpt_desc_col]].rename(
        columns={cpt_code_col: 'code', cpt_desc_col: 'description'}
    )

    return icd_df, cpt_df


In [33]:
class VectorIndex:
    def __init__(self, model_name=EMBED_MODEL_NAME):
        self.model = SentenceTransformer(model_name)
        self.index = None
        self.meta = []
        self.dim = self.model.get_sentence_embedding_dimension()

    def build_index(self, texts: List[str], metas: List[Dict[str, str]]):
        embeddings = self.model.encode(texts, show_progress_bar=True, convert_to_numpy=True)
        self.index = faiss.IndexFlatIP(self.dim)
        # normalize for cosine similarity
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings)
        self.meta = metas

    def query(self, query_text: str, top_k=5):
        q_emb = self.model.encode([query_text], convert_to_numpy=True)
        faiss.normalize_L2(q_emb)
        D, I = self.index.search(q_emb, top_k)
        results = []
        for dist, idx in zip(D[0], I[0]):
            results.append({'meta': self.meta[idx], 'score': float(dist)})
        return results


In [34]:
# --------- RAG-style retrieval + LLM scoring (simple) ---------

def rag_predict(entities: Dict[str, Any], icd_index: VectorIndex, cpt_index: VectorIndex, top_k=TOP_K) -> Dict[str, Any]:
    # Build a query from extracted entities (diagnoses + procedures + anatomy)
    query_parts = []
    for k in ['DIAGNOSIS', 'PROCEDURE', 'ANATOMY']:
        query_parts.extend(entities.get(k, []))

    if query_parts:
        query_text = ' ; '.join(query_parts)
    else:
        query_text = ' '.join(
            entities.get('ICD_CODES_IN_TEXT', []) +
            entities.get('CPT_CODES_IN_TEXT', [])
        )

    # Query both ICD and CPT indices
    icd_candidates = icd_index.query(query_text, top_k=top_k)
    cpt_candidates = cpt_index.query(query_text, top_k=top_k)

    # If codes appear directly in text, upweight them
    for c in icd_candidates:
        if c['meta']['code'] in entities.get('ICD_CODES_IN_TEXT', []):
            c['score'] += 1.0

    for c in cpt_candidates:
        if c['meta']['code'] in entities.get('CPT_CODES_IN_TEXT', []):
            c['score'] += 1.0

    # Sort by score (descending)
    icd_candidates = sorted(icd_candidates, key=lambda x: -x['score'])
    cpt_candidates = sorted(cpt_candidates, key=lambda x: -x['score'])

    return {
        'query_text': query_text,
        'icd_candidates': icd_candidates,
        'cpt_candidates': cpt_candidates
    }


In [35]:
def main():
    # 1) Extract text from PDF
    raw_text = extract_text_from_pdf(INPUT_PDF)
    clean_text = preprocess_text(raw_text)

    # (Optional) Split into reports if the PDF contains multiple reports separated by 'Report X:'
    reports = []
    if 'Report 1' in clean_text:
        # naive split
        parts = re.split(r"Report \d+:", clean_text)
        for p in parts:
            p = p.strip()
            if p:
                reports.append(p)
    else:
        reports = [clean_text]

    # 2) Load references
    icd_df, cpt_df = load_codes_from_excel(ICD_XLSX, CPT_XLSX)

    # 3) Build embeddings and FAISS indices
    icd_texts = (icd_df['code'].astype(str) + ' - ' + icd_df['description'].astype(str)).tolist()
    icd_metas = [{'code': str(c), 'description': d} for c, d in zip(icd_df['code'].tolist(), icd_df['description'].tolist())]

    cpt_texts = (cpt_df['code'].astype(str) + ' - ' + cpt_df['description'].astype(str)).tolist()
    cpt_metas = [{'code': str(c), 'description': d} for c, d in zip(cpt_df['code'].tolist(), cpt_df['description'].tolist())]

    icd_index = VectorIndex(model_name=EMBED_MODEL_NAME)
    cpt_index = VectorIndex(model_name=EMBED_MODEL_NAME)
    icd_index.build_index(icd_texts, icd_metas)
    cpt_index.build_index(cpt_texts, cpt_metas)

    # 4) Process each report
    outputs = []
    for i, rpt in enumerate(reports):
        entities = extract_entities(rpt)
        prediction = rag_predict(entities, icd_index, cpt_index, top_k=TOP_K)
        outputs.append({
            'report_id': i + 1,
            'entities': entities,
            'prediction': prediction
        })

    # 5) Save JSON
    with open(OUTPUT_JSON, 'w') as f:
        json.dump(outputs, f, indent=2)

    print(f"✅ Saved results to {OUTPUT_JSON}")


if __name__ == '__main__':
    main()


Batches:   0%|          | 0/10 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Saved results to results.json
