In [6]:
import os
import re
import zipfile
from datetime import datetime
from typing import Annotated, Any, Dict, List

import numpy as np
import pandas as pd
from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import PyPDFLoader
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel, Field
from sentence_transformers import CrossEncoder, SentenceTransformer
from tqdm import tqdm

load_dotenv()

True

In [7]:
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')

In [8]:
GOOGLE_API_KEY

'AIzaSyA4jFk1n_6Y36RqytWAMCdJFRSjMCQu-Qo'

In [9]:
llm = ChatGoogleGenerativeAI(
    model = "gemini-1.5-flash",
    google_api_key= GOOGLE_API_KEY
)

In [10]:
def load_pdf(file_path):
  loader = PyPDFLoader(file_path)
  documents = loader.load()

  text_content = "\n".join([doc.page_content for doc in documents])

  return text_content

def deidentify_and_strip(text: str) -> str:
    cleaned_lines = []
    for line in text.splitlines():
        line_stripped = line.strip()

        # Skip empty lines
        if not line_stripped:
            continue

        # Rules to DROP whole lines if they contain PHI
        if re.search(r"(Patient|Clinician|Participants|Supervisor?):", line_stripped, re.IGNORECASE):
            continue
        if re.search(r"DOB\s*[:\-]?\s*\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}", line_stripped):
            continue
        if re.search(r"\b\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}\b", line_stripped):  # Dates
            continue
        if re.search(r"\b\d{1,2}:\d{2}\s?(?:AM|PM|am|pm|ᴘᴍ|ᴀᴍ)\b", line_stripped):  # Times
            continue
        if re.search(r"Date and Time:", line_stripped, re.IGNORECASE):
            continue
        if re.search(r"(Location|Clinic|Hospital|Center|LLC|LLP|PC)\b", line_stripped):
            continue
        if re.search(r"License\s*[:\-]?\s*[A-Z]*\s*\d+", line_stripped, re.IGNORECASE):
            continue
        if re.search(r"http\S+|www\.\S+", line_stripped):
            continue
        if re.search(r"Page\s+\d+\s+of\s+\d+", line_stripped, re.IGNORECASE):
            continue  # remove "Page 1 of 2" style lines

        # Keep everything else
        cleaned_lines.append(line_stripped)

    return "\n".join(cleaned_lines)


In [None]:
def format_date(date_str: str) -> str:
    date_formats = [
        "%m/%d/%Y", "%m-%d-%Y", "%Y-%m-%d", "%d-%m-%Y", "%d/%m/%Y",
        "%B %d, %Y", "%b %d, %Y"
    ]
    for fmt in date_formats:
        try:
            return datetime.strptime(date_str.strip(), fmt).strftime("%d-%m-%Y")
        except ValueError:
            continue
    return date_str 

def split_date_time(datetime_str: str):
    parts = datetime_str.strip().split(" ", 1)
    if len(parts) == 2:
        return format_date(parts[0]), parts[1].strip()
    else:
        return format_date(datetime_str.strip()), ""


In [None]:
def get_phi(note_text: str) -> dict:
    details = {}
    
    clinician_match = re.search(r"Clinician:\s*(.+)", note_text)
    if clinician_match:
        details["Clinician"] = clinician_match.group(1).strip()
        
    # Supervisor
    supervisor_match = re.search(r"Supervisor:\s*(.+)", note_text)
    if supervisor_match:
        details["Supervisor"] = supervisor_match.group(1).strip()
        
    # Patient & DO
    patient_match = re.search(r"Patient:\s*([^,]+),\s*DOB\s*([^\n]+)", note_text)
    if patient_match:
        details["Patient"] = patient_match.group(1).strip()
        details["DOB"] = format_date(patient_match.group(2).strip())
        
    # Date & Time
    datetime_match = re.search(r"Date and Time:\s*([^\n]+)", note_text)
    if datetime_match:
        date_str, time_str = split_date_time(datetime_match.group(1))
        details["Date"] = date_str
        if time_str:
            details["Time"] = time_str
            
    # Duration
    duration_match = re.search(r"Duration:\s*([^\n]+)", note_text)
    if duration_match:
        details["Duration"] = duration_match.group(1).strip()
        
    # Service Code
    service_code_match = re.search(r"Service Code:\s*([A-Z0-9]+)", note_text)
    if service_code_match:
        details["Service Code"] = service_code_match.group(1).strip()
        
    # Diagnosis (ICD-10 extraction)
    diagnosis_section = None
    
    section_start = re.search(r"(Diagnosis|Diagnoses|Dx)[:\-]?", note_text, re.IGNORECASE)
    
    if section_start:
        diagnosis_section = note_text[section_start.start():]
    stop_match = re.search(r"(Plan|Treatment|Intervention|Procedure|Assessment)[:\-]?", diagnosis_section, re.IGNORECASE)
    
    if stop_match:
        diagnosis_section = diagnosis_section[:stop_match.start()]
    
    if diagnosis_section:
        icd_pattern = r"\b([A-TV-Z][0-9]{2}(?:\.[0-9A-Z]{1,4})?)"
        diagnosis_codes = list(set(re.findall(icd_pattern, diagnosis_section)))
    
    if diagnosis_codes:
        cleaned_codes = []
        for code in diagnosis_codes:
            if re.match(r".*[A-Z]$", code):
                code = code[:-1]
            cleaned_codes.append(code)
        details["Diagnosis Codes"] = list(set(cleaned_codes))

    return details

In [None]:
soap_text_1 = load_pdf("soap_note_1.pdf")
deidentify_note_1 = deidentify_and_strip(soap_text_1)

deidentify_note_1

In [11]:
cpt_prediction_prompt = """You are a medical coding assistant. Your task is to assign the correct CPT code(s) for the clinical note.

Rules:
- Only choose from the Allowed CPT list provided.  
- Base your selection strictly on the documented services performed, not on assumptions.  
- If multiple CPT codes are supported by the documentation, return them all.  
- If two codes are mutually exclusive, select the one most consistent with time/duration or note details.  
- Do not invent codes not in the Allowed CPTs list.  


Allowed CPTs:
- 90791: Psychiatric diagnostic evaluation
- 90832: Psychotherapy, 30 minutes with patient
- 90837: Psychotherapy, 60 minutes with patient
- H0004: Behavioral health counseling and therapy
- 96130: Psychological testing evaluation services, first hour
- 96131: Psychological testing evaluation services, each additional hour

Examples:
Note: "Patient presented for initial psychiatric diagnostic interview..." → CPT: 90791
Note: "Session lasted 60 minutes, focused on psychotherapy..." → CPT: 90837
Note: "Behavioral therapy session lasted 15 minutes..." → CPT: H0004

Now classify the following clinical note and return the result in the specified JSON format:
{soap_note}

Return the result in this JSON format:
{{
  "CPT": [ "code1", "code2" ]
}}
"""

In [12]:
class CPT_Output(BaseModel):
    CPT: List[Annotated[str, Field(min_length=5, max_length=5, description="CPT code descrbing the chart note")]]

structured_llm = llm.with_structured_output(CPT_Output)

In [13]:
prompt = PromptTemplate(
    input_variables=["soap_note"],
    template=cpt_prediction_prompt
)

In [None]:
deidentify_note_2 = deidentify_and_strip(load_pdf("soap_note_2.pdf"))
deidentify_note_2

In [None]:
soap_note_prompt = prompt.format(soap_note=deidentify_note_2)

response = structured_llm.invoke(soap_note_prompt)

print(response)

In [14]:
def predict_cpt_code(soap_note: str):
    prompt = PromptTemplate(
        input_variables=["soap_note"],
        template=cpt_prediction_prompt
    )

    soap_note_prompt = prompt.format(soap_note=soap_note)

    response = structured_llm.invoke(soap_note_prompt)

    return response.CPT


In [15]:
cpt_icd_mapping_df = pd.read_excel("Expanded_CPT_to_ICD_mapping.xlsx")
cpt_icd_mapping_df

Unnamed: 0,CPT,CPT Description,ICD-10 Code,ICD-10 Description
0,90832,"Psychotherapy, 30 minutes",F32.0,"Major depressive disorder, single episode, mild"
1,90832,"Psychotherapy, 30 minutes",F32.1,"Major depressive disorder, single episode, mod..."
2,90832,"Psychotherapy, 30 minutes",F32.2,"Major depressive disorder, single episode, sev..."
3,90832,"Psychotherapy, 30 minutes",F32.3,"Major depressive disorder, single episode, sev..."
4,90832,"Psychotherapy, 30 minutes",F32.4,"Major depressive disorder, single episode, in ..."
...,...,...,...,...
237,96131,"Psychological testing, evaluation (additional ...",F81.9,"Developmental disorder of scholastic skills, u..."
238,96131,"Psychological testing, evaluation (additional ...",F84.0,Autistic disorder
239,96131,"Psychological testing, evaluation (additional ...",F84.9,"Pervasive developmental disorder, unspecified"
240,96131,"Psychological testing, evaluation (additional ...",F02.0,Dementia in Alzheimer's disease with early onset


In [16]:
def get_cpt_mapping(df):
  cpt_mapping = {}

  for _, row in cpt_icd_mapping_df.iterrows():
    cpt = str(row["CPT"]).strip()
    cpt_desc = str(row["CPT Description"]).strip()
    icd = str(row["ICD-10 Code"]).strip()
    icd_desc = str(row["ICD-10 Description"]).strip()

    if cpt not in cpt_mapping:
      cpt_mapping[cpt] = {
          "description": cpt_desc,
          "applicable_icds": []
      }

    cpt_mapping[cpt]["applicable_icds"].append({
        icd: icd_desc,
    })

  return cpt_mapping

cpt_mapping = get_cpt_mapping(cpt_icd_mapping_df)


In [17]:
def get_icd_candidates(predicted_cpt_codes: list[str]) -> list[dict]:
    """Return ICD candidates as list of dicts with consistent keys."""
    icd_candidates = []
    for cpt in predicted_cpt_codes:
        if cpt in cpt_mapping:
            for icd_entry in cpt_mapping[cpt]["applicable_icds"]:
                for icd, desc in icd_entry.items():
                    icd_candidates.append({
                        "icd": icd,
                        "description": desc
                    })
    return icd_candidates



icd_candidadtes = get_icd_candidates(["90837", "90832"])

In [None]:
icd_candidadtes

In [18]:
embedder = SentenceTransformer("abhinand/MedEmbed-large-v0.1")

def embed_texts(texts: List[str]) -> np.ndarray:
    """Returns L2-normalized embeddings (np.ndarray) for stable cosine sim."""
    embs = embedder.encode(
        texts,
        convert_to_numpy=True,
        batch_size=32,
        normalize_embeddings=True,
        show_progress_bar=False
    )
    return embs


In [19]:
def build_icd_embedding_store(mapping_df, embed_fn):
    """
    Precompute embeddings for ICD codes/descriptions.
    Returns: dict[str, np.ndarray]
    """
    icd_store = {}
    for _, row in mapping_df.iterrows():
        key = f"{row['ICD-10 Code']}: {row['ICD-10 Description']}"
        if key not in icd_store:
            icd_store[key] = embed_fn([key])[0]
    return icd_store

# Build globally (only once, e.g. app startup)
ICD_STORE = build_icd_embedding_store(cpt_icd_mapping_df, embed_texts)


In [20]:
def rerank_icd_candidates(note_text: str,
                          icd_candidates: List[Dict[str, str]],
                          top_k: int = 5,
                          rerank_with_cross_encoder: bool = True) -> List[Dict[str, Any]]:
    """
    Re-rank ICD candidates:
      1. Fast filter with embeddings (MedEmbed bi-encoder + ICD_STORE lookup).
      2. Optional cross-encoder rerank for top-K.
    """
    cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    if not icd_candidates:
        return []

    # Normalize ICD candidates
    normalized_icds = []
    for c in icd_candidates:
        if "icd" in c and "description" in c:
            normalized_icds.append({"icd": c["icd"], "description": c["description"]})
        else:
            icd, desc = list(c.items())[0]
            normalized_icds.append({"icd": icd, "description": desc})

    # Step 1: Bi-encoder embedding filter (lookup instead of recompute)
    note_emb = embed_texts([note_text])[0]
    icd_texts = [f'{c["icd"]}: {c["description"]}' for c in normalized_icds]
    icd_embs = np.array([ICD_STORE[txt] for txt in icd_texts])  # lookup, no re-embed

    sims = note_emb @ icd_embs.T
    idxs = np.argsort(-sims)[: min(top_k * 3, len(normalized_icds))]

    preselected = [
        {
            "icd": normalized_icds[i]["icd"],
            "description": normalized_icds[i]["description"],
            "score": float(sims[i]),
        }
        for i in idxs
    ]

    # Step 2: Cross-encoder rerank
    if rerank_with_cross_encoder and preselected:
        pairs = [(note_text, f"{p['icd']}: {p['description']}") for p in preselected]
        cross_scores = cross_encoder.predict(pairs)

        for p, cs in zip(preselected, cross_scores):
            p["cross_score"] = float(cs)

        preselected = sorted(preselected, key=lambda x: -x["cross_score"])

    # Step 3: Deduplicate by ICD
    unique_ranked = {}
    for r in preselected[:top_k]:
        icd = r["icd"]
        if icd not in unique_ranked or r.get("cross_score", r["score"]) > unique_ranked[icd].get("cross_score", r["score"]):
            unique_ranked[icd] = r

    return sorted(unique_ranked.values(),
                  key=lambda x: -(x.get("cross_score", x["score"])))


In [None]:
ranked = rerank_icd_candidates(deidentify_note_1, icd_candidadtes)

ranked

In [None]:
ranked_2 = rerank_icd_candidates(deidentify_note_2, get_icd_candidates(["90791"]))

In [None]:
ranked_2

In [21]:
class ICD_Output(BaseModel):
    ICD10: List[str] = Field(
        default_factory=list,
        description="Final ICD-10 codes selected from the allowed list; return 1-4 most relevant."
    )


In [22]:
# Wrap your existing llm for structured output
icd_structured_llm = llm.with_structured_output(ICD_Output)

icd_selection_prompt = PromptTemplate(
    input_variables=["note", "cpts", "allowed_icds"],
    template=(
        "You are a medical coding assistant. Choose the most appropriate ICD-10 codes "
        "ONLY from the allowed list below, based on the clinical note and CPT context. "
        "Prefer diagnoses that are explicitly supported by the note. Do not guess.\n\n"
        "Clinical note:\n{note}\n\n"
        "CPT context (predicted): {cpts}\n\n"
        "Allowed ICD-10 candidates (code — description):\n{allowed_icds}\n\n"
        "Return JSON matching this schema:\n"
        "{{\"ICD10\": [\"code1\", \"code2\"]}}\n"
        "Constraints:\n"
        "- Pick the fewest codes that fully represent the encounter (typically 1-4).\n"
        "- Do not include screening codes unless the note is purely screening.\n"
        "- Do not include historical/resolved problems unless clearly treated/assessed today.\n"
    )
)


In [23]:
def select_icds_for_note(note_text: str,
                         predicted_cpts: List[str],
                         icd_candidates: List[Dict[str, str]],
                         top_k: int = 15) -> Dict[str, Any]:
    """
    Reranks ICD candidates with embeddings, then asks the LLM to pick final ICDs.
    Returns dict with ranked list and final selection.
    """
    # Step 1: re-rank by semantic similarity
    ranked = rerank_icd_candidates(note_text, icd_candidates, top_k=top_k)

    if not ranked:
        return {"ranked": [], "final": []}

    # Prepare allowed list text for the prompt (top-K only)
    allowed_lines = [f"- {r['icd']} — {r['description']} (score: {r['score']:.3f})" for r in ranked]
    allowed_icds_block = "\n".join(allowed_lines)

    # Step 2: LLM final selection (structured)
    prompt_str = icd_selection_prompt.format(
        note=note_text,
        cpts=", ".join(predicted_cpts),
        allowed_icds=allowed_icds_block
    )
    final = icd_structured_llm.invoke(prompt_str)

    return {
        "ranked": ranked,            # list of dicts with score
        "final": final.ICD10         # list of codes selected by the LLM
    }


In [None]:
final_selection = select_icds_for_note(deidentify_note_1, ["90837"], icd_candidadtes)

In [None]:
final_selection["final"]

In [103]:
def generate_coding_from_note(file_path: str,
                  top_k: int = 5) -> Dict[str, Any]:
    """
    Full CPT -> ICD pipeline.
    Input: PDF clinical note path
    Output: dict with CPTs, ranked ICD candidates, and final ICDs
    """
    # Step 1: Load and clean note
    text_content = load_pdf(file_path)
    deidentified_text = deidentify_and_strip(text_content)

    # Step 2: Get PHI
    phi_data = get_phi(text_content)
    
    # Step 3: Predict CPT(s)
    predicted_cpts = predict_cpt_code(deidentified_text)

    # Step 4: Collect ICD candidates
    icd_candidates = get_icd_candidates(predicted_cpts)

    # Step 5: Re-rank candidates and finalize with LLM
    final_selection = select_icds_for_note(
        note_text=deidentified_text,
        predicted_cpts=predicted_cpts,
        icd_candidates=icd_candidates,
        top_k=top_k
    )

    result = {**phi_data}
    result["CPTs"] = predicted_cpts
    result["Ranked ICDs"] = final_selection["ranked"]
    result["Final ICDs"] = final_selection["final"]

    return result


In [104]:
result = generate_coding_from_note("soap_note_1.pdf")

df = pd.DataFrame([result])

df

Unnamed: 0,Clinician,Patient,DOB,Date,Time,Duration,Service Code,Diagnosis Codes,CPTs,Ranked ICDs,Final ICDs
0,"Summer Taylor, LCSW",Tonya Ranae Critz,04-11-1964,11-08-2025,12:00 ᴘᴍ - 1:00 ᴘᴍ,60 minutes,90837,[F43.21],[90837],"[{'icd': 'F43.21', 'description': 'Adjustment ...",[F43.21]


In [None]:
files = ["soap_note_1.pdf", "soap_note_2.pdf", "soap_note_3.pdf"]

for file in files:

  result = generate_coding_from_note(file, cpt_icd_mapping_df)

  print(f"\nPredicted CPTs for {file}:")
  for cpt in result["cpts"]:
      print(f" - {cpt}")

  print(f"\nFinal ICDs for {file}:")
  for icd in result["final_icds"]:
      print(f" - {icd}")


In [None]:
def unzip_folder(zip_path, extract_to):
  os.makedirs(extract_to, exist_ok=True)
  with zipfile.ZipFile(zip_path, 'r') as zip_ref:
      zip_ref.extractall(extract_to)

unzip_folder("soap_notes_dir.zip", "/content")

In [None]:
predictions = []

for file in tqdm(os.listdir("/content/soap_notes_dir"), desc="Processing SOAP Notes"):
  if file.endswith(".pdf"):
    result = generate_coding_from_note(f"/content/soap_notes_dir/{file}", cpt_icd_mapping_df)
    predictions.append({
        "file": file,
        "cpts": result["cpts"],
        "final_icds": result["final_icds"]
    })


In [None]:
predictions_df = pd.DataFrame(predictions)
predictions_df

In [25]:
# Define headings + regex patterns for flexibility
required_sections = {
    "Interventions Used": r"\bInterventions\s+Used\b",
    # Match "Risk Assessment" OR "Assessment" optionally followed by
    # words, slashes, or spaces
    "Risk Assessment": r"\b(Risk\s+Assessment|Assessment(\s*[/\-\w\s]+)*)\b",
    "Current Mental Status": r"\bCurrent\s+Mental\s+Status\b"
}

def check_note(note_text: str, filename: str):
    missing = []
    
    for section, pattern in required_sections.items():
        if not re.search(pattern, note_text, flags=re.IGNORECASE):
            missing.append(section)

    if missing:
        return {
            "filename": filename,
            "status": "RED",
            "missing_sections": missing
        }
    else:
        return {
            "filename": filename,
            "status": "OK",
            "missing_sections": []
        }


In [None]:
flagged_notes = []


for file in tqdm(os.listdir("flag_red_notes_dir")):
    if file.endswith(".pdf"):
        with open(f"flag_red_notes_dir/{file}", "r") as f:
            text_content = load_pdf(f"flag_red_notes_dir/{file}")
            # cleaned_text = deidentify_and_strip(text_content)
            
            result = check_note(text_content, file)
            if result["status"] == "RED":
                flagged_notes.append(result)

In [None]:
flagged_notes

In [None]:
flagged_files = [n["filename"] for n in flagged_notes]
flagged_files

In [None]:
predictions = []

folder_name = "flag_red_notes_dir"

for file in tqdm(os.listdir(folder_name), desc="Processing SOAP Notes"):
  if file.endswith(".pdf") and file not in flagged_files:
    result = generate_coding_from_note(f"{folder_name}/{file}", cpt_icd_mapping_df)
    predictions.append({
        "file": file,
        "cpts": result["cpts"],
        "final_icds": result["final_icds"]
    })


In [None]:
predictions_df = pd.DataFrame(predictions)
predictions_df

In [None]:
text = load_pdf(r"flag_red_notes_dir\TN_Note-for-AM-8-20-2025_created-8-27-2025_58397489.pdf")
clean = deidentify_and_strip(text)

print(clean)

In [None]:
cpt = predict_cpt_code(clean)
cpt

In [None]:
predictions_df = pd.DataFrame(predictions)
predictions_df

In [None]:
pdf = "flag_red_notes_dir\TN_Note-for-IH-8-21-2025_created-8-27-2025_58397573.pdf"
text = load_pdf(pdf)
# clean = deidentify_and_strip(text)
# cpt = predict_cpt_code(clean)

# cpt

In [28]:
pdf = r"flag_red_notes_dir\TN_Note-for-WK-8-20-2025_created-8-27-2025_58397504.pdf"
text = load_pdf(pdf)
# clean = deidentify_and_strip(text)
# cpt = predict_cpt_code(clean)

# cpt

In [None]:
clean

In [96]:
print(text)

Diagnosis
F43.25Adjustment Disorder, With mixed disturbance of emotions and conduct
Current Mental Status
Orientation: X3: Oriented to Person, Place, and Time
General Appearance: Appropriate
Dress: Appropriate
Motor Activity: Unremarkable
Interview Behavior: Appropriate
Speech: Normal
Mood: Euthymic
Affect: Congruent
Insight: Fair
Judgment/Impulse Control: Fair
Memory: Intact
Attention/Concentration: Good
Thought Process: Unremarkable
Thought Content: Appropriate
Perception: Unremarkable
Functional Status: Moderately Impaired
Risk Assessment
Patient denies all areas of risk. No contrary clinical indications present.
Subjective Report and Symptom Description
This writer met with Weslie for an individual therapy session in the therapy office. Interactive complexity was utilized on this date to
address focus and attention through coregulation. He reported positive experiences in school.
Interventions Used
 dummylink
Progress Note
Robertson Counseling LLC
Clinician: Taylor Keller, LPCC
Pat

In [97]:
phi = get_phi(text)

In [98]:
phi

{'Clinician': 'Taylor Keller, LPCC',
 'Patient': 'Weslie Knudson',
 'DOB': '12-11-2015',
 'Date': '20-08-2025',
 'Time': '2:30 ᴘᴍ - 3:26 ᴘᴍ',
 'Duration': '56 minutes',
 'Service Code': 'H0004',
 'Diagnosis Codes': ['F43.25']}