In [28]:
import re
import os
import json
import pandas as pd
from typing import List, Dict

from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

load_dotenv()


True

In [2]:
DIR = "SOAP_notes"

In [None]:
def load_pdf(file_path):
    loader = PyPDFLoader(file_path)
    documents = loader.load()

    full_text = "\n".join([doc.page_content for doc in documents])
    return full_text


text_content = load_pdf(f"{DIR}/68750.pdf")

In [None]:
def deidentify_text(text):
    removed = {}

    # Patterns for PHI
    patterns = {
        "patient_name": r"(Patient:|^)[ \t]*([A-Z]+, [A-Za-z]+)",
        "dob": r"DOB: ?(\d{2}/\d{2}/\d{4})",
        "account_number": r"Acc(?:ount)? No\.?[: ]*(\d+)",
        "provider_name": r"Provider: ([A-Za-z .,'-]+, MD)",
        "phone": r"Phone: ?([\d-]{10,})",
        "address": r"Address: ([\w\d ,.-]+TX-\d{5})",
        "fax": r"Fax: ?([\d-]{10,})",
        "signed_by": r"Electronically signed by ([A-Za-z .,'-]+) on",
        "signature_date": r"on (\d{2}/\d{2}/\d{4}) at",
        "dos": r"DOS: ?(\d{2}/\d{2}/\d{4})",
    }

    # Remove and collect PHI
    for key, pattern in patterns.items():
        matches = re.findall(pattern, text, re.MULTILINE)
        if matches:
            # If the match is a tuple, get the last group
            if isinstance(matches[0], tuple):
                matches = [m[-1] for m in matches]
            removed[key] = matches
            text = re.sub(pattern, lambda m: m.group(0).replace(m.group(1), ""), text)

    # Remove any remaining names (doctor or patient) in the format: LAST, First
    name_pattern = r"\b([A-Z]+, [A-Za-z]+)\b"
    names = re.findall(name_pattern, text)
    if names:
        removed.setdefault("names", []).extend(names)
        text = re.sub(name_pattern, "", text)

    # Remove any remaining dates in MM/DD/YYYY format
    date_pattern = r"\b(\d{2}/\d{2}/\d{4})\b"
    dates = re.findall(date_pattern, text)
    if dates:
        removed.setdefault("dates", []).extend(dates)
        text = re.sub(date_pattern, "", text)

    # Remove any remaining phone numbers
    phone_pattern = r"\b\d{3}[-.]\d{3}[-.]\d{4}\b"
    phones = re.findall(phone_pattern, text)
    if phones:
        removed.setdefault("phones", []).extend(phones)
        text = re.sub(phone_pattern, "", text)

    return text, removed


In [5]:
deidentified_text, removed_dict = deidentify_text(text_content)
print("De-identified text:\n", deidentified_text)
print("\nRemoved PHI:\n", removed_dict)

De-identified text:
  DOB:  (17 yo M) Acc No.  DOS: 
 
 
Account Number: 28536 Provider: 
DOB:    Age: 17 Y   Sex: Male Date: 
Phone: 
Address: 
Subjective:
Chief Complaints:
   1. Well & Sick: HURT SHOULDER left side, 3 weeks ago while boxing. 2. Well Child Examination - EPSDT - 15 to
17 years (Male):NM. 3. Immunization follow-up.
HPI:
   Interval History: 
       Lives with: parents . 
       Family support: yes, partner involved with care . 
       Primary care giver: mother . 
       Interim Illness: none . 
       Accidents: none . 
       Sleep: sleeps through the night , ( ) hours per night , ( ) hours nap time during the day , no problems
reported . 
       Sees/Hears: well - as reported by parent , eyes straight always . 
       Early childhood intervention programs: no . 
       Vaccine reactions: none . 
       Emergency room visits: none . 
       Home remedies: none . 
       Review previous/interim laboratory studies: all laboratory results within normal limits , normal l

In [6]:
def extract_icd10_codes(text):
    """
    Extracts ICD10 codes and their descriptions from a text section.
    Returns a dict: {ICD10_code: description}
    """
    icd_pattern = r"([A-Z][0-9][0-9A-Z]\.[0-9A-Z]+|[A-Z][0-9][0-9A-Z]+)"  # e.g., Z00.129, Z68.52, Z23
    result = {}
    for line in text.splitlines():
        if line.strip().lower().startswith("plan"):
            break
        match = re.search(icd_pattern, line)
        if match:
            code = match.group(0)
            # Description is everything before the code
            desc = line.split(code)[0].strip(" .-:")
            result[code] = desc
    return result


icd_dict = extract_icd10_codes(deidentified_text)
print(icd_dict)

{'Z00.129': '1. Encounter for well child visit at 17 years of age', 'Z68.52': '2. BMI,pediatric 5% - <85%', 'Z71.82': '3. Exercise counseling', 'Z71.3': '4. Dietary counseling and surveillance', 'Z23': '5. Encounter for immunization', 'M25.512': '6. Left shoulder pain, unspecified chronicity'}


In [None]:
def extract_procedure_codes(note_text: str) -> dict:
    """
    Extracts CPT procedure codes and their descriptions
    from an encounter note section starting with 'Procedure Codes:'.
    Returns a dict {CPT: Description}.
    """
    results = {}

    # Find the "Procedure Codes:" section
    match = re.search(r"Procedure Codes:\s*(.+)", note_text, re.IGNORECASE | re.DOTALL)
    if not match:
        return results  # No procedure codes found

    # Get everything after "Procedure Codes:" until next header (like "Units:" or newline block)
    proc_section = match.group(1).split("Units:")[0].strip()

    # Split on commas
    codes = [c.strip() for c in proc_section.split(",") if c.strip()]

    for code_entry in codes:
        # Match CPT code at start (numbers or alphanumeric like G8431, J0696 etc.)
        m = re.match(r"([A-Z]?\d{4,5})\s+(.+)", code_entry)
        if m:
            code = m.group(1).strip()
            desc = m.group(2).strip()
            results[code] = desc

    return results


print(extract_procedure_codes(deidentified_text))


{'G8431': 'CLIN DEPRESSION SCREEN DOC', 'G9902': 'Pt scrn tbco and id as user', '96160': 'PT-', '90619': 'MENACWY-TT VACCINE IM', '90460': 'IMADM ANY ROUTE 1ST VAC/TOX'}


In [8]:
llm = ChatOpenAI(model="gpt-4o")

In [None]:
with open("cpt_reference.json", "r") as f:
    cpt_data = json.load(f)


def flatten_cpt(data, parent_keys=[]):
    records = []
    for key, value in data.items():
        if isinstance(value, dict):
            records.extend(flatten_cpt(value, parent_keys + [key]))
        else:
            records.append(
                {
                    "cpt_code": key,
                    "description": value,
                    "category": " > ".join(parent_keys),
                }
            )
    return records


cpt_records = flatten_cpt(cpt_data)

valid_cpts = set()
for record in cpt_records:
    valid_cpts.add(record["cpt_code"])

In [None]:
class CptOutput(BaseModel):
    cpt_list: List[str] = Field(
        ..., description="List of CPT codes supported by encounter note"
    )

In [None]:
em_prompt = PromptTemplate(
    input_variables=["note"],
    template="""
You are a certified professional medical coder specializing in Evaluation & Management (E/M) coding.

Task: Select **exactly ONE CPT code** from the list below based on the encounter note.

E/M Reference (Established Patients):
- 99211: Minimal, nurse-only or very limited, no physician involvement.
- 99212: Expanded problem-focused, straightforward, very minor issues.
- 99213: Low complexity, 1 stable chronic illness or minor acute issue, limited exam/ROS, minimal management.
- 99214: Moderate complexity, detailed history & exam, multiple new problems, prescription medications, labs/imaging/referrals ordered, or management of chronic illness with exacerbation.
- 99215: High complexity, extensive history & exam, life-threatening or severe acute illness, high-risk management.

E/M Reference (New Patients):
- 99201–99204 (parallel rules, higher documentation requirements).

Rules:
- If explicitly "new patient" → use new patient codes. Otherwise assume established.
- Weigh history, exam, and medical decision-making (MDM).
- Multiple new diagnoses + prescription medications + labs/referrals = usually 99214.
- Always output exactly one CPT code.

Encounter Note:
{note}

Your output must be strictly this format:
{{<the chosen code>}}
"""
)


In [None]:
class em_output(BaseModel):
    cpt_code : int = Field(..., description="The selected CPT code for the E/M encounter")

def em_coding_llm(note: str):
    chain = em_prompt | llm.with_structured_output(em_output)
    result = chain.invoke({"note" : note})

    return result.cpt_code

In [58]:
def pipeline(filepath: str, valid_cpts=valid_cpts):
    text_content = load_pdf(filepath)
    deidentified_text, removed = deidentify_text(text_content)

    cpt_structured_llm = llm.with_structured_output(CptOutput)

    prompt = PromptTemplate(
        input_variables=["note", "cpt_reference"],
        template="""
    You are a certified professional medical coder. Carefully review the following encounter note and identify all relevant CPT/HCPCS codes.

    Instructions:
    - Match services in the note to codes only from the provided reference list.
    - Consider ALL categories in the reference (Preventive Medicine, Immunization Products, Immunization Administration, Screenings/Assessments, Diagnostics/Labs, Procedures/Treatments, Medications/Injections, Nutrition Counseling, Administrative/Misc).
    - Capture every applicable code; do not skip valid ones.
    - If multiple codes overlap, choose the one that is the general default used across most payers (not restricted or program-specific).
    - Only output codes that are directly and fully supported by documentation.
    - Do not hallucinate codes that are not present in the provided reference.

    Encounter Note:
    {note}

    CPT/HCPCS Reference List:
    {cpt_reference}
    
    Important: Only include a CPT/HCPCS if the documentation supports the billing requirements.
    - Do NOT include depression related codes unless depression screening was positive with a documented follow-up plan.
    - Do NOT include tobacoo realted codes unless patient was identified as a tobacco user.
    """,
    )

    chain = prompt | cpt_structured_llm

    response = chain.invoke({"note": deidentified_text, "cpt_reference": cpt_data})
    filtered = [c for c in response.cpt_list if c in valid_cpts]

    em_code = em_coding_llm(text_content)
    filtered.append(em_code)
    
    return filtered

In [61]:
results = []

for note in os.listdir(DIR):
    filename = note.split(".")[0]
    
    if note.lower().endswith(".pdf"):
        result = pipeline(
            os.path.join(DIR, note),
        )
        results.append({"file": filename, "CPT list": result})


In [62]:
results

[{'file': '68241', 'CPT list': ['96372', 99214]},
 {'file': '68398',
  'CPT list': ['99393', '90619', '90715', '90460', '90461', 'S9470', 99213]},
 {'file': '68493', 'CPT list': ['99394', '96160', 99213]},
 {'file': '68595', 'CPT list': ['99051', 99213]},
 {'file': '68708', 'CPT list': ['87807', 99213]},
 {'file': '68750', 'CPT list': ['99394', '90619', '90460', '96160', 99214]},
 {'file': '68799',
  'CPT list': ['99393',
   '90619',
   '90620',
   '90651',
   '90715',
   '90460',
   '90461',
   99211]},
 {'file': '68903', 'CPT list': ['99394', '96160', 99214]},
 {'file': '68947',
  'CPT list': ['99391',
   '90700',
   '90460',
   '90461',
   '90713',
   '90648',
   '90671',
   '90744',
   '90680',
   99213]},
 {'file': '69131', 'CPT list': ['81002', 99214]},
 {'file': '69133',
  'CPT list': ['99391', '90697', '90460', '90461', '90677', '90680', 99214]},
 {'file': '69253', 'CPT list': ['99394', 99212]},
 {'file': '69262', 'CPT list': ['87804', '87880', 99213]},
 {'file': '69265',
  'CP

In [63]:
df = pd.DataFrame(results)

actual_cpts = pd.read_excel("Required Solution.xlsx", usecols=["Claim No", "CPT Code"])
actual_grouped = actual_cpts.groupby("Claim No")["CPT Code"].apply(list).reset_index()
actual_grouped.rename(columns={"CPT Code": "Actual CPT list"}, inplace=True)

df["file"] = df["file"].astype(int)
merged = pd.merge(df, actual_grouped, left_on="file", right_on="Claim No", how="inner")


def compare_cpts(row):
    predicted = set(map(str, row["CPT list"]))
    actual = set(map(str, row["Actual CPT list"]))
    missing = list(actual - predicted)
    same = list(actual & predicted)
    additional = list(predicted - actual)
    return pd.Series(
        [same, missing, additional], index=["Same", "Missing", "Additional"]
    )


merged[["Same", "Missing", "Additional"]] = merged.apply(compare_cpts, axis=1)


merged[["Claim No", "Same", "Missing", "Additional"]]

final_df = pd.DataFrame()
final_df = pd.concat([final_df, merged], axis=0, ignore_index=True)

In [64]:
final_df

Unnamed: 0,file,CPT list,Claim No,Actual CPT list,Same,Missing,Additional
0,68241,"[96372, 99214]",68241,[99214],[99214],[],[96372]
1,68398,"[99393, 90619, 90715, 90460, 90461, S9470, 99213]",68398,"[90460, 90460, 90461, 90619, 90715, 99393]","[90461, 90715, 99393, 90619, 90460]",[],"[S9470, 99213]"
2,68493,"[99394, 96160, 99213]",68493,"[96160, G9903, 99394]","[99394, 96160]",[G9903],[99213]
3,68595,"[99051, 99213]",68595,"[99051, 99213]","[99051, 99213]",[],[]
4,68708,"[87807, 99213]",68708,"[87807, 99213]","[87807, 99213]",[],[]
5,68750,"[99394, 90619, 90460, 96160, 99214]",68750,"[90460, 90619, 96160, 99213, 99394]","[99394, 90619, 90460, 96160]",[99213],[99214]
6,68799,"[99393, 90619, 90620, 90651, 90715, 90460, 904...",68799,"[90460, 90460, 90461, 90619, 90620, 90651, 907...","[90461, 90715, 90651, 90619, 90620, 99211, 90460]",[],[99393]
7,68903,"[99394, 96160, 99214]",68903,"[96160, G9903, 99213, 99384]",[96160],"[99384, G9903, 99213]","[99394, 99214]"
8,68947,"[99391, 90700, 90460, 90461, 90713, 90648, 906...",68947,"[90460, 90460, 90461, 90648, 90671, 90680, 907...","[90461, 90671, 99213, 90648, 90680, 90744, 993...",[Cards],[]
9,69131,"[81002, 99214]",69131,"[81002, 99214]","[81002, 99214]",[],[]
