<a href="https://colab.research.google.com/github/harishnemade100/Big_data_Project_Using_Kafka/blob/main/Adding_Feature_Reading_Pdf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [851]:
!pip install pdfplumber



In [852]:
import pdfplumber
from typing import List, Optional
from collections import defaultdict
import re
from pydantic import BaseModel

# Read PDF file

In [854]:
def read_pdf_pages(pdf_path):
    pages = []

    with pdfplumber.open(pdf_path) as pdf:
        for idx, page in enumerate(pdf.pages, start=1):
            text = page.extract_text() or ""
            pages.append({
                "page_no": idx,
                "text": text
            })

    return pages

In [855]:
pages = read_pdf_pages(pdf_path)

In [856]:
pages

[{'page_no': 1,
  'text': 'TAX INVOICE\nDate 15/10/2025 Page 1 of 2\nInsurance Company Name:\nVat Registration No. 200000763900002\nALLIANZ WORLDWIDE CARE GOLD CARD\nBAHRAIN SPECIALIST HOSPITAL\nFinancial Code : 00001946 Building: 2743, Road/Street: 2442,Block: 324,\nMANAMA, Bahrain\nCustomer Name : ALLIANZ WORLDWIDE CARE LIMITED\nInvoice No : AOP102502658\nInsurance Company Address : Invoice Date : 08/10/2025\nO2a, Orient Building, Al Badia Business park, dubai festival city, Customer Code : ALLIA100\nP.O.Box 27966, Dubai, UAE\nCustomer Type : INSURANCE\nCard /Policy No. : P003904429\nCustomer Hierarchy :\nMember/Certificate No. : P003904429\nALLIANZ WORLDWIDE CARE LIMITED-->ALLIANZ WORLDWIDE\nCARE GOLD CARD Insurance Auth. No. : NPR\nDate Of Visit : 08/10/2025\nBilling Currency : BD\nPatient ID : 327169 Patient Name : DIAN JACOBUS HANEKOM CPR No. : 591027801\nDoctor Name : DR. AKRAM HASANI OP No : 1987139 / 1987139\nDate Code Description Qty. Unit Price Amount VAT Rate Amount Patient

# Chunks with Line

In [857]:
def chunk_pages_with_lines(pages):
    """
    Returns line-level chunks WITH page numbers preserved
    """
    chunks = []

    for page in pages:
        page_no = page["page_no"]
        text = page.get("text", "")

        for line in text.splitlines():
            line = line.strip()
            if not line:
                continue

            chunks.append({
                "page_no": page_no,
                "line": line
            })

    return chunks

In [858]:
chunks = chunk_pages_with_lines(pages)

# Section Anchors

In [859]:
SECTION_ANCHORS = {

    # PATIENT / POLICY
    "patient_policy_details": [
        "Patient ID",
        "Patient Name",
        "CPR No",
        "Card /Policy No",
        "Policy No",
        "Gender",
        "Age",
        "Date of Birth",
    ],

    # INVOICE (FINANCIAL SOURCE OF TRUTH)
    "invoice_details": [
        "TAX INVOICE",
        "Invoice No",
        "Invoice Date",
        "Customer Type",
        "Billing Currency",
        "Vat Registration No",
        "Financial Code",
    ],

    # VISIT / ENCOUNTER
    "visit_details": [
        "Date Of Visit",
        "Consultation Date",
        "Service Date",
        "Doctor Name",
        "Physician Name",
        "OP No",
        "OP/IP No",
        "Emergency Case",
    ],

    # SERVICE TABLE (BILLABLE LINES)
    "service_details": [
        "Date Code Description Qty. Unit Price Amount VAT Rate Amount Patient Co Part",
        "Item Code Item Description",
        "ORDERDATE SERVICE AVAILED",
    ],

    # PRICE / PATIENT
    "price_details_invoice": [
        "Co-Payment Amt",
        "Deductible Amt",
        "Balance From The Sponsor",
        "Amount Received From The Patient",
    ],

    # DIAGNOSIS (RULE-2 CORE)
    "diagnosis_details": [
        "DIAGNOSIS",
        "ICD CODE",
        "ICD Name",
        "Principal Code",
        "2nd Code",
        "3rd Code",
        "4th Code",
        "Diagnosis Type",
    ],

    # DOCTOR / PROVIDER
    "doctor_details": [
        "Docotr Licence No.",
        "TREATING DOCTOR",
        "Physician Specialty",
        "Hospital",
    ],

    # APPROVAL / AUTHORIZATION (RULE-6)
    "approval_details": [
        "AUTO-AUTHORIZED",
        "TPA Name",
        "Approval No",
        "Insurance Auth. No",
        "Claim No",
        "Authorization Letter",
    ],

    # INSURANCE CARD (SECONDARY EVIDENCE)

    "insurance_card_details": [
        "INSURANCE CARD",
        "On Behalf of the Payer",
        "Sub Insurer",
        "TPA Name",
        "Front Back",
        "Sequence No",
    ],


    #  RULE-2 CLINICAL EVIDENCE (CORE)

    # LABORATORY REPORT
    "LAB": [
        "LABORATORY INVESTIGATION REPORT",
        "LAB REPORT",
        "PATHOLOGY REPORT",
        "CLINICAL PATHOLOGY",
        "BIOCHEMISTRY",
        "HEMATOLOGY",
        "MICROBIOLOGY",
        "TEST RESULT",
        "REFERENCE RANGE",
        "SPECIMEN",
    ],

    # RADIOLOGY REPORT
    "RAD": [
        "Radiology Investigation Report",
        "RADIOLOGY INVESTIGATION REPORT",
        "RADIOLOGY REPORT",
        "IMAGING REPORT",
        "FINDINGS",
        "IMPRESSION",
        "CONCLUSION",
        "XRAY",
        "X-RAY",
        "CT",
        "CT SCAN",
        "MRI",
        "ULTRASOUND",
    ],

    # CARDIOLOGY REPORT
    "CARDIO": [
        "ECHOCARDIOGRAM REPORT",
        "ELECTROCARDIOGRAM",
        "ECG REPORT",
        "EKG",
        "STRESS TEST",
        "CARDIOLOGY REPORT",
    ],

    # PROCEDURE / OPERATIVE NOTE
    "PROCEDURE": [
        "PROCEDURE NOTE",
        "OPERATIVE NOTE",
        "OPERATIVE REPORT",
        "SURGICAL SUMMARY",
        "INTERVENTION",
        "TECHNIQUE",
        "ADMINISTERED",
        "INSERTION",
    ],
}

In [860]:
def chunk_by_section(lines, SECTION_ANCHORS, debug=False):
    """
    lines: List[{"page_no": int, "line": str}]
    """
    chunks = defaultdict(list)
    current_section = "unknown"

    for idx, item in enumerate(lines):
        text = item["line"].strip()
        page_no = item["page_no"]

        section_found = None

        for section, anchors in SECTION_ANCHORS.items():
            for anchor in anchors:
                if anchor in text:
                    section_found = section
                    break
            if section_found:
                break

        if section_found:
            if debug:
                print(
                    f"[DEBUG] Line {idx} | Page {page_no} "
                    f"→ SECTION CHANGE: {current_section} → {section_found}"
                )
                print(f"        Trigger Line: {text}")
            current_section = section_found

        # Preserve BOTH text and page number
        chunks[current_section].append({
            "page_no": page_no,
            "text": text
        })

    return chunks

In [861]:
section_chunks = chunk_by_section(chunks, SECTION_ANCHORS)

# Visit Details

In [862]:
from pydantic import BaseModel
from typing import Optional

class VisitDetailsModel(BaseModel):
    date_of_visit: str = None
    service_date: str = None
    doctor_name: str = None
    physician_name: str = None
    physician_specialty: str = None
    op_no: str = None
    op_ip_no: str = None
    emergency_case: str = None

In [863]:
VISIT_DETAILS_REGEX = {
    "Date Of Visit": r"Date Of Visit\s*[:\-]\s*([0-9]{2}/[0-9]{2}/[0-9]{4})",
    "Service Date": r"Service Date\s*[:\-]\s*([0-9]{1,2}/[A-Za-z]{3}/[0-9]{4})",
    "Doctor Name": r"Doctor Name\s*[:\-]\s*([A-Za-z .]+?)(?:\s+OP No|$)",
    "Physician Name": r"Physician Name\s*[:\-]\s*([A-Za-z .]+?)(?:\s+Family of Benefits|$)",
    "Physician Specialty": r"Family of Benefits\s*[:\-]?\s*([A-Za-z]+)",
    "OP No": r"OP No\s*[:\-]\s*([\d/ ]+)",
    "OP/IP No": r"OP/IP No\s*[:\-]\s*([\d/ ]+)",
    "Emergency Case": r"Emergency Case\??\s*(Yes|No|þ)?",
}

In [864]:
def extract_visit_details(ocr_data) -> VisitDetailsModel:
    visit_info = {}

    for item in ocr_data:
        text = item.get("text", "").strip()
        for field, pattern in VISIT_DETAILS_REGEX.items():
            match = re.search(pattern, text)
            if match:
                key = field.lower().replace(" ", "_").replace("/", "_")
                visit_info[key] = match.group(1).strip()

    return VisitDetailsModel(**visit_info)

In [865]:
visit_details = extract_visit_details(section_chunks["visit_details"])

In [866]:
visit_details

VisitDetailsModel(date_of_visit='08/10/2025', service_date=None, doctor_name='DR. AKRAM HASANI', physician_name=None, physician_specialty=None, op_no='1987139 / 1987139', op_ip_no='', emergency_case='Yes')

In [867]:
section_chunks['approval_details']

[{'page_no': 1, 'text': 'CARE GOLD CARD Insurance Auth. No. : NPR'},
 {'page_no': 2, 'text': 'CARE GOLD CARD Insurance Auth. No. : NPR'},
 {'page_no': 3, 'text': 'Approval No. : NPR UCAF1.0 Claim No. : 1964790'},
 {'page_no': 3,
  'text': 'Provider Name: BAHRAIN SPECIALIST TPA Name: ALLIANZ WORLDWIDE CARE Patient: DIAN JACOBUS HANEKOM'},
 {'page_no': 3, 'text': 'HOSPITAL LIMITED'},
 {'page_no': 3,
  'text': 'For Insurance Company Use Only: Approved Not Approved Approval No.'},
 {'page_no': 3,
  'text': 'Comments : (include approved days/services, if different from the requested)'},
 {'page_no': 3, 'text': 'Insurance Officer Signature: Date:'},
 {'page_no': 3,
  'text': 'Clause: Note that this is an Electornic generated UCAF 1.0 hence signature is not required unless requested.'},
 {'page_no': 4, 'text': 'BAHRAIN SPECIALIST HOSPITAL'}]

## services Details

In [868]:
from pydantic import BaseModel
from typing import Optional, List

# Visit / Invoice Dates
class VisitDatesModel(BaseModel):
    invoice_date: Optional[str] = None
    treatment_start_date: Optional[str] = None  # maps to "Date Of Visit"


# Individual Service Line
class ServiceLineModel(BaseModel):
    page_no: int
    category: Optional[str] = None
    date: Optional[str] = None
    service_code: Optional[str] = None
    description: Optional[str] = None
    qty: Optional[float] = None
    unit_price: Optional[float] = None
    amount_excl_vat: Optional[float] = None
    vat_rate: Optional[float] = None
    amount_inc_vat: Optional[float] = None
    patient_part: Optional[float] = None
    co_part: Optional[float] = None

# All Services
class ServicesModel(BaseModel):
    services: List[ServiceLineModel] = []

In [869]:
import re

SERVICE_LINE_REGEX = re.compile(
    r"""
    (?P<date>\d{2}[/-]\d{2}[/-]\d{4})\s+
    (?P<code>[A-Z0-9]+)\s+
    (?P<desc>[A-Z0-9\-\( Thor Heyerdahl\)\/?&\s]+?)\s+  # Added '&' to allowed characters
    (?P<qty>\d+(\.\d+)?)\s+
    (?P<unit_price>\d+\.\d{3})\s+
    (?P<amt_excl>\d+\.\d{3})\s+
    (?P<vat>\d+\s*%)\s+
    (?P<amt_inc>\d+\.\d{3})\s+
    (?P<patient>\d+\.\d{3})\s+
    (?P<copart>\d+\.\d{3})
    """,
    re.VERBOSE
)

CATEGORY_HEADER_REGEX = re.compile(
    r"""
    ^(
        Dental\s+\d+|
        Pharmacy|
        Consultation\s+Fees\s+\d+|
        OB\s*/\s*GYN\s+\d+|
        Radiology\s+\d+|
        Laboratory\s+\d+|
        Lab\s+\d+
    )
    """,
    re.IGNORECASE | re.VERBOSE
)

TOTAL_LINE_REGEX = re.compile(r"^TOTAL|^Total|\bTOTALS\b", re.IGNORECASE)

In [870]:
def is_description_continuation(text):
    if any(char.isdigit() for char in text):
        return False
    if "TOTAL" in text.upper():
        return False
    if len(text.strip()) < 3:
        return False
    return text.isupper()

In [871]:
def extract_services(service_chunks) -> ServicesModel:
    """
    Extract services from OCR chunks.
    Input: list of dicts with 'text' and 'page_no'
    Output: ServicesModel containing list of ServiceLineModel
    """
    services_list = []
    current_category = None
    last_service = None

    for item in service_chunks:
        text = item["text"].strip()
        page_no = item["page_no"]

        # 1️⃣ Detect category header
        cat_match = CATEGORY_HEADER_REGEX.search(text)
        if cat_match:
            current_category = cat_match.group(1).strip()
            last_service = None
            continue

        # 2️⃣ Skip totals / headers
        if (
            TOTAL_LINE_REGEX.search(text)
            or "Date Code Description" in text
            or "Item Code Item Description" in text
            or text in {"VAT VAT", "excl. of Inc. of Part"}
        ):
            continue

        # 3️⃣ Match service line
        match = SERVICE_LINE_REGEX.search(text)
        if match:
            data = match.groupdict()
            last_service = ServiceLineModel(
                page_no=page_no,
                category=current_category,
                date=data.get("date"),
                service_code=data.get("code"),
                description=data.get("desc").strip() if data.get("desc") else None,
                qty=float(data.get("qty", 0)),
                unit_price=float(data.get("unit_price", 0)),
                amount_excl_vat=float(data.get("amt_excl", 0)),
                vat_rate=float(data.get("vat", "0").replace("%", "").strip()),
                amount_inc_vat=float(data.get("amt_inc", 0)),
                patient_part=float(data.get("patient", 0)),
                co_part=float(data.get("copart", 0)),
            )
            services_list.append(last_service)
            continue

        # 4️⃣ Attach continuation description
        if last_service and last_service.page_no == page_no:
            if is_description_continuation(text):
                last_service.description += " " + text.strip()

    return ServicesModel(services=services_list)

In [872]:
services_data = extract_services(section_chunks["service_details"])

In [873]:
services_data

ServicesModel(services=[ServiceLineModel(page_no=1, category='Consultation Fees 350', date='08/10/2025', service_code='99301', description='SPECIALIST CONSULT - INITIAL', qty=1.0, unit_price=15.0, amount_excl_vat=15.0, vat_rate=0.0, amount_inc_vat=15.0, patient_part=5.3, co_part=9.7), ServiceLineModel(page_no=1, category='Radiology 801', date='08/10/2025', service_code='76076', description='SPINE AND HIP (DEXA)', qty=1.0, unit_price=40.0, amount_excl_vat=40.0, vat_rate=0.0, amount_inc_vat=40.0, patient_part=0.0, co_part=40.0)])

# Invoice Price Details

In [874]:
class InvoicePriceDetailsModel(BaseModel):
    co_payment_amt: Optional[float] = None
    deductible_amt: Optional[float] = None
    amount_received_from_patient: Optional[float] = None
    balance_from_sponsor: Optional[float] = None
    vat_amount_due_from_sponsor: Optional[float] = None

In [875]:
PRICE_REGEX = {
    "co_payment_amt": r"Co-?Payment\s*Amt\s*[:\-]?\s*([\d]+\.\d{3})",
    "deductible_amt": r"Deductible\s*Amt\s*[:\-]?\s*([\d]+\.\d{3})",
    "amount_received_from_patient": r"Amount\s*Received\s*From\s*The\s*Patient\s*([\d]+\.\d{3})",
    "balance_from_sponsor": r"Balance\s*From\s*The\s*Sponsor(?:\s*\(.*?\))?\s*[:\-]?\s*([\d]+\.\d{3})",
    "vat_amount_due_from_sponsor": r"VAT\s*Amount\s*due\s*from\s*Sponsor\s*[:\-]?\s*([\d]+\.\d{3})",
}

In [876]:
def extract_invoice_price_details(
    invoice_prices_details
) -> InvoicePriceDetailsModel:
    raw_data = {}

    for row in invoice_prices_details:
        text = row.get("text", "")
        for field, pattern in PRICE_REGEX.items():
            if field not in raw_data:
                match = re.search(pattern, text, re.IGNORECASE)
                if match:
                    raw_data[field] = match.group(1)  # STRING on purpose


    return InvoicePriceDetailsModel.model_validate(raw_data)

In [877]:
invoice_price_details = extract_invoice_price_details(section_chunks["price_details_invoice"])

In [878]:
invoice_price_details

InvoicePriceDetailsModel(co_payment_amt=5.3, deductible_amt=0.0, amount_received_from_patient=5.3, balance_from_sponsor=49.7, vat_amount_due_from_sponsor=0.0)

In [879]:
VISIT_DETAILS_REGEX = {
    "Date Of Visit": r"Date Of Visit\s*[:\-]\s*(\d{2}/\d{2}/\d{4})",
    "Consultation Date": r"Consultation Date\s*[:\-]\s*([\d/]+)",
    "Service Date": r"Service Date\s*[:\-]\s*([\d/]+)",
    "Doctor Name": r"Doctor Name\s*[:\-]\s*([A-Z .]+)",
    "Physician Name": r"Physician Name\s*[:\-]\s*([A-Za-z .]+)",
    "OP No": r"OP No\s*[:\-]\s*([\d/ ]+)",
    "OP/IP No": r"OP/IP No\s*[:\-]\s*([\d/ ]+)",
    "Emergency Case": r"Emergency Case\??\s*(Yes|No|þ)?"
}

# Patient Details AND Policy

In [880]:
class PatientPolicyMasterModel(BaseModel):
    card_policy_no: Optional[str] = None
    card_policy_no_pages: List[int] = []
    policy_no: Optional[str] = None
    policy_no_pages: List[int] = []
    patient_id: Optional[str] = None
    patient_id_pages: List[int] = []
    patient_name: Optional[str] = None
    patient_name_pages: List[int] = []
    cpr_no: Optional[str] = None
    cpr_no_pages: List[int] = []
    gender: Optional[str] = None
    gender_pages: List[int] = []
    age: Optional[int] = None
    age_pages: List[int] = []
    date_of_birth: Optional[str] = None
    date_of_birth_pages: List[int] = []

In [881]:
PATIENT_FIELDS_REGEX = {
    "card_policy_no": r"Card\s*/?Policy\s*No\.?\s*[:\-]?\s*(\S+)",
    "policy_no": r"Policy\s*No\.?\s*[:\-]?\s*(\S+)",
    "patient_id": r"Patient\s*ID\s*[:\-]?\s*(\S+)",
    "patient_name": r"Patient\s*Name\s*[:\-]?\s*(.+?)(?:\s*CPR|OP|$)",
    "cpr_no": r"CPR\s*No\.?\s*[:\-]?\s*(\S+)",
    "gender": r"Gender\s*[:\-]?\s*(Male|Female)",
    "age": r"Age\s*[:\-]?\s*(\d+)\s*Y",
    "date_of_birth": r"(?:Date\s*Of\s*Birth|DOB)\s*[:\-]?\s*(\d{2}/\d{2}/\d{4})"
}

In [882]:
def extract_patient_policy_data(section_chunks: list[dict]) -> PatientPolicyMasterModel:
    temp_data = defaultdict(list)  # store values along with page numbers

    for chunk in section_chunks:
        page_no = chunk.get("page_no")
        text = chunk.get("text", "")

        for field, regex in PATIENT_FIELDS_REGEX.items():
            match = re.search(regex, text, flags=re.IGNORECASE)
            if match:
                value = match.group(1).strip()

                # Convert age to integer
                if field == "age":
                    try:
                        value = int(value)
                    except:
                        continue

                # Append value and page
                temp_data[field].append((value, page_no))

    master = PatientPolicyMasterModel()

    for field in PATIENT_FIELDS_REGEX.keys():
        if temp_data.get(field):
            # Pick first unique value
            values = [v for v, _ in temp_data[field]]
            pages = [p for _, p in temp_data[field]]
            unique_values = list(dict.fromkeys(values))  # remove duplicates, keep order
            unique_pages = list(dict.fromkeys(pages))

            setattr(master, field, unique_values[0])
            setattr(master, f"{field}_pages", unique_pages)

    return master


In [883]:
patient_data = extract_patient_policy_data(section_chunks["patient_policy_details"])

In [884]:
patient_data

PatientPolicyMasterModel(card_policy_no='P003904429', card_policy_no_pages=[1, 2], policy_no='P003904429', policy_no_pages=[1, 2, 3], patient_id='327169', patient_id_pages=[1, 2, 5, 6, 7, 8], patient_name='DIAN JACOBUS HANEKOM', patient_name_pages=[1, 2, 4], cpr_no='591027801', cpr_no_pages=[1, 2, 5, 6], gender='Male', gender_pages=[3, 5, 6, 7, 8], age=65, age_pages=[3], date_of_birth='19/10/1959', date_of_birth_pages=[5, 6, 7, 8])

# Invoice Extraction Function

In [885]:
class InvoiceMasterModel(BaseModel):
    tax_invoice: Optional[str] = None
    tax_invoice_pages: List[int] = []
    vat_registration_no: Optional[str] = None
    vat_registration_no_pages: List[int] = []
    financial_code: Optional[str] = None
    financial_code_pages: List[int] = []
    invoice_no: Optional[str] = None
    invoice_no_pages: List[int] = []
    invoice_date: Optional[str] = None
    invoice_date_pages: List[int] = []
    customer_type: Optional[str] = None
    customer_type_pages: List[int] = []
    billing_currency: Optional[str] = None
    billing_currency_pages: List[int] = []

In [886]:
INVOICE_PATTERN = {
    "tax_invoice": r"(TAX\s*INVOICE)",
    "vat_registration_no": r"Vat\s*Registration\s*No\.?\s*[:\-]?\s*(\S+)",
    "financial_code": r"Financial\s*Code\s*[:\-]?\s*(\S+)",
    "invoice_no": r"Invoice\s*No\s*[:\-]?\s*(\S+)",
    "invoice_date": r"Invoice\s*Date\s*[:\-]?\s*(\d{2}/\d{2}/\d{4})",
    "customer_type": r"Customer\s*Type\s*[:\-]?\s*(\S+)",
    "billing_currency": r"Billing\s*Currency\s*[:\-]?\s*(\S+)",
}

In [887]:
def extract_invoice_data(section_chunks: List[dict]) -> InvoiceMasterModel:
    temp_data = defaultdict(list)

    for chunk in section_chunks:
        page_no = chunk.get("page_no")
        text = chunk.get("text", "")

        for field, regex in INVOICE_PATTERN.items():
            match = re.search(regex, text, flags=re.IGNORECASE)
            if match:
                # Use group(1) if exists, else group(0)
                try:
                    value = match.group(1).strip()
                except IndexError:
                    value = match.group(0).strip()

                temp_data[field].append((value, page_no))

    # Prepare Master Model
    master = InvoiceMasterModel()

    for field in INVOICE_PATTERN.keys():
        if temp_data.get(field):
            values = [v for v, _ in temp_data[field]]
            pages = [p for _, p in temp_data[field]]

            # Keep only unique values & pages
            unique_values = list(dict.fromkeys(values))
            unique_pages = list(dict.fromkeys(pages))

            setattr(master, field, unique_values[0])
            setattr(master, f"{field}_pages", unique_pages)

    return master

In [888]:
invoice_data = extract_invoice_data(section_chunks["invoice_details"])

In [889]:
invoice_data

InvoiceMasterModel(tax_invoice='TAX INVOICE', tax_invoice_pages=[1, 2], vat_registration_no='200000763900002', vat_registration_no_pages=[1, 2], financial_code='00001946', financial_code_pages=[1, 2], invoice_no='AOP102502658', invoice_no_pages=[1, 2], invoice_date='08/10/2025', invoice_date_pages=[1, 2], customer_type='INSURANCE', customer_type_pages=[1, 2], billing_currency='BD', billing_currency_pages=[1, 2])

# Diagnosis Details

In [890]:
class DiagnosisModel(BaseModel):
    page_no: int
    principal_code: Optional[str] = None
    second_code: Optional[str] = None
    third_code: Optional[str] = None
    fourth_code: Optional[str] = None
    diagnosis_label: Optional[str] = None
    icd_name: Optional[str] = None
    diagnosis_type: Optional[str] = None

In [891]:
DIAGNOSIS_PATTERN = {
    "principal_code": r"Principal Code\s*:\s*([A-Z0-9\.]+)",
    "second_code": r"2nd Code\s*:\s*([A-Z0-9]*)",
    "third_code": r"3rd Code\s*:\s*([A-Z0-9]*)",
    "fourth_code": r"4th Code\s*:\s*([A-Z0-9]*)",
    "diagnosis_label": r"\bDIAGNOSIS\b",
    "icd_name": r"ICD Name\s*:\s*([^\n]+)",
    "diagnosis_type": r"Diagnosis Type\s*:\s*(\w+)",
    "principal_code_alt": r"ICD CODE\s*:\s*([A-Z0-9\.]+)",
}


In [892]:
def extract_diagnosis_data(section_chunks):
    diagnosis_map = {}
    last_principal_code = None  # track the last found code

    for chunk in section_chunks:
        page_no = chunk.get("page_no")
        text = chunk.get("text", "").strip()
        temp_info = {}

        # Extract all fields
        for field, pattern in DIAGNOSIS_PATTERN.items():
            match = re.search(pattern, text, flags=re.IGNORECASE)
            if match:
                value = match.group(1).strip() if match.groups() else match.group(0).strip()

                # Normalize placeholder codes
                if field in ["second_code", "third_code", "fourth_code"]:
                    if not value or value.lower() in ["2nd", "3rd", "4th"]:
                        value = None

                # Handle alternative principal_code
                if field == "principal_code_alt":
                    field = "principal_code"
                    if "principal_code" in temp_info:
                        continue

                temp_info[field] = value

        # If a principal code is found, update last_principal_code
        if "principal_code" in temp_info:
            last_principal_code = temp_info["principal_code"]

        # Merge data to the last known principal_code if present
        if last_principal_code:
            if last_principal_code not in diagnosis_map:
                diagnosis_map[last_principal_code] = {"page_no": page_no}
            existing = diagnosis_map[last_principal_code]

            for k, v in temp_info.items():
                if v and not existing.get(k):
                    existing[k] = v
            # Keep earliest page_no
            existing["page_no"] = min(existing.get("page_no", page_no), page_no)

    # Convert to DiagnosisModel list and sort
    result = [DiagnosisModel(**data) for data in diagnosis_map.values()]
    result.sort(key=lambda x: x.page_no)
    return result

In [893]:
diagnosis_data = extract_diagnosis_data(section_chunks['diagnosis_details'])

In [894]:
diagnosis_data

[DiagnosisModel(page_no=3, principal_code='M16.9', second_code=None, third_code=None, fourth_code=None, diagnosis_label='DIAGNOSIS', icd_name='Coxarthrosis, unspecified Diagnosis Type: Provisional', diagnosis_type='Provisional')]

# Doctor Details

In [895]:
class DoctorProviderMasterModel(BaseModel):
    hospital: Optional[str] = None
    hospital_pages: Optional[List[int]] = None

    provider_name: Optional[str] = None
    provider_name_pages: Optional[List[int]] = None

    treating_doctor: Optional[str] = None
    treating_doctor_pages: Optional[List[int]] = None

    physician_specialty: Optional[str] = None
    physician_specialty_pages: Optional[List[int]] = None


In [896]:
DOCTOR_PATTERN = {
    # Matches 'Bahrain Specialist Hospital' or similar
    "hospital": r"([A-Z][A-Za-z\s]+Specialist Hospital)",

    # Anchor for the doctor label
    "treating_doctor": r"TREATING\s+DOCTOR\s*:",

    # Capture hospital name but stop at 'Username:'
    "provider_name": r"Provider\s+Name\s*:\s*([^:\n]+?)(?=\s+Username|$)",

    # Added 'Dentist' and 'Dental' to the specialty list
    "physician_specialty": r"Physician\s+Specialty\s*:\s*([\w\s]+)"
}

In [897]:
def extract_doctor_master(section_chunks: List[dict]) -> DoctorProviderMasterModel:
    temp_data = defaultdict(lambda: {"value": None, "pages": []})

    # We use a set for pages to avoid duplicates like [1, 1, 1]
    for i, chunk in enumerate(section_chunks):
        page_no = chunk.get("page_no")
        text = chunk.get("text", "").strip()

        # --- 1. TREATING DOCTOR (Multi-Chunk Logic) ---
        if re.search(DOCTOR_PATTERN["treating_doctor"], text, re.IGNORECASE):
            if i + 1 < len(section_chunks):
                raw_name = section_chunks[i + 1]["text"].strip()
                # Clean the "DR. DR." prefix here
                clean_name = re.sub(r"^(DR\.\s*)+", "", raw_name, flags=re.IGNORECASE).strip()
                if clean_name:
                    temp_data["treating_doctor"]["value"] = clean_name
                    temp_data["treating_doctor"]["pages"].append(page_no)

        # --- 2. PHYSICIAN SPECIALTY ---
        specialty_match = re.search(DOCTOR_PATTERN["physician_specialty"], text, re.IGNORECASE)
        if specialty_match:
            value = specialty_match.group(1).strip()
            temp_data["physician_specialty"]["value"] = value
            temp_data["physician_specialty"]["pages"].append(page_no)

        # --- 3. PROVIDER NAME ---
        provider_match = re.search(DOCTOR_PATTERN["provider_name"], text, re.IGNORECASE)
        if provider_match:
            value = provider_match.group(1).strip()
            temp_data["provider_name"]["value"] = value
            temp_data["provider_name"]["pages"].append(page_no)

        # --- 4. HOSPITAL ---
        # Note: We check if the chunk text CONTAINS the hospital name
        hospital_match = re.search(DOCTOR_PATTERN["hospital"], text)
        if hospital_match:
            value = hospital_match.group(1).strip()
            temp_data["hospital"]["value"] = value
            temp_data["hospital"]["pages"].append(page_no)

    # Build master model
    return DoctorProviderMasterModel(
        hospital=temp_data["hospital"]["value"],
        hospital_pages=list(set(temp_data["hospital"]["pages"])) or None,
        provider_name=temp_data["provider_name"]["value"],
        provider_name_pages=list(set(temp_data["provider_name"]["pages"])) or None,
        treating_doctor=temp_data["treating_doctor"]["value"],
        treating_doctor_pages=list(set(temp_data["treating_doctor"]["pages"])) or None,
        physician_specialty=temp_data["physician_specialty"]["value"],
        physician_specialty_pages=list(set(temp_data["physician_specialty"]["pages"])) or None,
    )

In [898]:
doctor_data = extract_doctor_master(section_chunks["doctor_details"])

In [899]:
doctor_data

DoctorProviderMasterModel(hospital='Payment by cheque or Money Order payable to Bahrain Specialist Hospital', hospital_pages=[1, 2], provider_name=None, provider_name_pages=None, treating_doctor='AKRAM HASANI', treating_doctor_pages=[8], physician_specialty=None, physician_specialty_pages=None)

# Approval Details

In [900]:
KNOWN_TPAS = {
    "NEXTCARE": "NEXtCARE Network",
    "NEXtCARE": "NEXtCARE Network",
    "NEXtCARE NETWORK": "NEXtCARE Network",
    "MEDNET": "MedNet",
    "GLOBEMED": "GlobeMed",
    "GLOBE MED": "GlobeMed",
    "NAS": "National Health Services (NAS)",
    "SAICO": "SAICO TPA",
    "SAICO TPA": "SAICO TPA",
}

In [901]:

class ApprovalMasterModel(BaseModel):
    auto_authorized_claim: str | None = None
    auto_authorized_claim_pages: List[int] = []

    approval_no: str | None = None
    approval_no_pages: List[int] = []

    tpa_name: str | None = None
    tpa_name_pages: List[int] = []

    insurance_auth_no: str | None = None
    insurance_auth_no_pages: List[int] = []

    claim_no: str | None = None
    claim_no_pages: List[int] = []

    authorization_letter: str | None = None
    authorization_letter_pages: List[int] = []

    is_approval_copy: bool = False
    detected_tpa: str | None = None

In [902]:
APPROVAL_PATTERN = {
    "AUTO-AUTHORIZED": r"AUTO[\s-]?AUTHORIZED\s*CLAIM\s*FORM\s*\u2116?\s*[:\-]?\s*([A-Z0-9/]+)",
    "TPA Name": r"TPA Name\s*[:\-]?\s*(.+?)\s+Patient",
    "Approval No": r'Approval\s*No\.?\s*[:\-]?\s*([A-Z0-9\.]+)',
    "Insurance Auth. No": r"Insurance Auth\\. No\\.?\\s*:?[\\s]*([\\w\\-\\.]+)?",
    "Claim No": r"Claim\s*No\.?\s*[:\-]?\s*([\w\.\-\/]+)",
    "Authorization Letter": r"Authorization Letter[:]?[\\s\\S]*"
}

In [903]:
KNOWN_TPAS = {
    "NEXTCARE": "NEXtCARE Network",
    "MEDNET": "MedNet",
    "GLOBEMED": "GlobeMed",
    "NAS": "National Health Services (NAS)",
    "SAICO": "SAICO TPA",
}

APPROVAL_INDICATORS = [
    "APPROVAL NO",
    "AUTHORIZATION NO",
    "AUTO-AUTHORIZED",
    "INSURANCE AUTH",
]

In [904]:
def extract_approval_master(section_chunks: List[dict]) -> ApprovalMasterModel:
    temp_data = defaultdict(lambda: {"value": None, "pages": []})
    full_text_parts = []

    # Iterate over all text chunks
    for chunk in section_chunks:
        page_no = chunk["page_no"]
        text = chunk["text"].strip()
        full_text_parts.append(text)

        for field, pattern in APPROVAL_PATTERN.items():
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                value = None
                if match.groups():
                    captured_value = match.group(1)
                    if captured_value is not None:
                        value = captured_value.strip()
                else:
                    value = match.group(0).strip()

                if value is not None:
                    if not temp_data[field]["value"]:
                        temp_data[field]["value"] = value

                    if page_no not in temp_data[field]["pages"]:
                        temp_data[field]["pages"].append(page_no)

    # Build model
    model = ApprovalMasterModel(
        auto_authorized_claim=temp_data["AUTO-AUTHORIZED"]["value"],
        auto_authorized_claim_pages=temp_data["AUTO-AUTHORIZED"]["pages"],
        tpa_name=temp_data["TPA Name"]["value"],
        tpa_name_pages=temp_data["TPA Name"]["pages"],
        approval_no=temp_data["Approval No"]["value"],
        approval_no_pages=temp_data["Approval No"]["pages"],
        insurance_auth_no=temp_data["Insurance Auth. No"]["value"],
        insurance_auth_no_pages=temp_data["Insurance Auth. No"]["pages"],
        claim_no=temp_data["Claim No"]["value"],
        claim_no_pages=temp_data["Claim No"]["pages"],
        authorization_letter=temp_data["Authorization Letter"]["value"],
        authorization_letter_pages=temp_data["Authorization Letter"]["pages"],
    )


    # APPROVAL COPY DECISION LOGIC
    full_text = " ".join(full_text_parts).upper()

    detected_tpa = None
    if model.tpa_name:
        tpa_upper = model.tpa_name.upper()
        for key, official_name in KNOWN_TPAS.items():
            if key in tpa_upper:
                detected_tpa = official_name
                break

    has_approval_indicator = any(
        indicator in full_text for indicator in APPROVAL_INDICATORS
    )

    is_approval_copy = False

    # Golden Rule
    if detected_tpa and has_approval_indicator:
        is_approval_copy = True
    elif model.auto_authorized_claim:
        is_approval_copy = True

    model.is_approval_copy = is_approval_copy
    model.detected_tpa = detected_tpa

    return model

In [905]:
approval_data = extract_approval_master(section_chunks["approval_details"])

In [906]:
approval_data

ApprovalMasterModel(auto_authorized_claim=None, auto_authorized_claim_pages=[], approval_no='NPR', approval_no_pages=[3], tpa_name='ALLIANZ WORLDWIDE CARE', tpa_name_pages=[3], insurance_auth_no=None, insurance_auth_no_pages=[], claim_no='1964790', claim_no_pages=[3], authorization_letter=None, authorization_letter_pages=[], is_approval_copy=False, detected_tpa=None)

# Insurece Details

In [907]:

INSURANCE_CARD_PATTERN = {
    "Provider Name": r"Provider Name\s*[:\-]?\s*(.+?)\s+TPA Name",
    "TPA Name": r"TPA Name\s*[:\-]?\s*(.+?)\s+Patient",
    "Patient": r"Patient\s*[:\-]?\s*(.+?)(?:Patient File No|$)",
    "Patient File No": r"Patient File No\s*[:\-]?\s*(\d+)",
    "Sub Insurer": r"Sub Insurer\s*[:\-]?\s*(.+?)\s+ID No",
    "ID No": r"ID No\s*[:\-]?\s*(\d+)",
    "Card No": r"Card No\s*[:\-]?\s*(\S+)",
    "Date of Visit": r"Date of Visit\s*[:\-]?\s*(\d{2}/\d{2}/\d{4})",
    "INSURANCE CARD": r"INSURANCE CARD",
}

In [908]:
section_chunks['insurance_card_details']

[{'page_no': 3,
  'text': 'Patient File No: 327169 Sub Insurer: ALLIANZ WORLDWIDE CARE ID No: 591027801'},
 {'page_no': 3, 'text': 'LIMITED'},
 {'page_no': 3,
  'text': 'Date of Visit : 08/10/2025 Policy Name: Card No: P003904429'},
 {'page_no': 4, 'text': 'INSURANCE CARD'},
 {'page_no': 4, 'text': 'Reg. No. : 327169 Sequence No. : 16'},
 {'page_no': 4, 'text': 'Front Back'},
 {'page_no': 4,
  'text': '15/10/2025 9:39:14AM “ This is a computer generated printout and no signature / stamp is required.”'},
 {'page_no': 5, 'text': 'Sequence No : 16'},
 {'page_no': 6, 'text': 'Sequence No : 16'}]

In [909]:
class InsuranceCardModel(BaseModel):
    provider_name: str | None = None
    provider_name_pages: List[int] = []

    tpa_name: str | None = None
    tpa_name_pages: List[int] = []

    patient: str | None = None
    patient_pages: List[int] = []

    sub_insurer: str | None = None
    sub_insurer_pages: List[int] = []

    card_no: str | None = None
    card_no_pages: List[int] = []

    insurance_card: str | None = None
    insurance_card_pages: List[int] = []


In [910]:
def extract_insurance_card_master(text_chunks) -> InsuranceCardModel:
    """
    Extract insurance card details from OCR/text chunks and return a single InsuranceCardModel.
    Tracks pages for each field.
    """
    temp_data = defaultdict(lambda: {"value": None, "pages": []})

    for page_no, texts in group_by_page(text_chunks).items():
        combined_text = " ".join(texts)

        # Iterate over all regex patterns
        for field, pattern in INSURANCE_CARD_PATTERN.items():
            match = re.search(pattern, combined_text, re.IGNORECASE)
            if match:
                # For fields with capturing groups
                if match.groups():
                    value = match.group(1).strip()
                else:
                    # For flags like INSURANCE CARD / Front Back
                    value = field

                # Store value and page
                if temp_data[field]["value"] is None:
                    temp_data[field]["value"] = value
                elif temp_data[field]["value"] != value:
                    # Keep unique value
                    temp_data[field]["value"] += f" | {value}"
                temp_data[field]["pages"].append(page_no)

    # Build the Pydantic model
    model = InsuranceCardModel(
        provider_name=temp_data["Provider Name"]["value"],
        provider_name_pages=temp_data["Provider Name"]["pages"],

        tpa_name=temp_data["TPA Name"]["value"],
        tpa_name_pages=temp_data["TPA Name"]["pages"],

        patient=temp_data["Patient"]["value"],
        patient_pages=temp_data["Patient"]["pages"],

        sub_insurer=temp_data["Sub Insurer"]["value"],
        sub_insurer_pages=temp_data["Sub Insurer"]["pages"],

        card_no=temp_data["Card No"]["value"],
        card_no_pages=temp_data["Card No"]["pages"],

        insurance_card=temp_data["INSURANCE CARD"]["value"],
        insurance_card_pages=temp_data["INSURANCE CARD"]["pages"],
    )

    return model

# Helper function to group text by page
def group_by_page(text_chunks):
    pages = defaultdict(list)
    for chunk in text_chunks:
        page_no = chunk["page_no"]
        text = chunk["text"].strip()
        pages[page_no].append(text)
    return pages

In [911]:
insurance_card_data = extract_insurance_card_master(section_chunks["insurance_card_details"])

In [912]:
insurance_card_data

InsuranceCardModel(provider_name=None, provider_name_pages=[], tpa_name=None, tpa_name_pages=[], patient='File No: 327169 Sub Insurer: ALLIANZ WORLDWIDE CARE ID No: 591027801 LIMITED Date of Visit : 08/10/2025 Policy Name: Card No: P003904429', patient_pages=[3], sub_insurer='ALLIANZ WORLDWIDE CARE', sub_insurer_pages=[3], card_no='P003904429', card_no_pages=[3], insurance_card='INSURANCE CARD', insurance_card_pages=[4])

# Clincal Details

# Excel Data Read and Validation

In [913]:
import numpy as np
import pandas as pd

In [914]:
def excel_reader(excel_path: str) -> pd.DataFrame:
      try:
          df = pd.read_excel(excel_path)
          df.columns = df.columns.str.strip().str.upper()
          return df
      except Exception as e:
          raise RuntimeError(f"Failed to read Excel file: {e}")

In [915]:
REQUIRED_COLUMNS = {
        'ID',
        'CUSTOMER_CODE',
        'CPR',
        'POLICY_NUMBER',
        'PATIENTREGNO',
        'INVOICENO',
        'SERVICEDESCRIPTION',
        'APPROVAL_NO'
    }

In [916]:
def excel_validate(
    df: pd.DataFrame
    ) -> bool:
    missing = REQUIRED_COLUMNS - set(df.columns)

    if missing:
        raise ValueError(
            f"Excel missing required columns: {missing}"
        )

    return True


In [917]:
excel_data = excel_reader("/content/EDI Scrub 8-14 Oct 2025 15102025.xlsx")
excel_validate(excel_data)

True

In [918]:
class FullClaimModel(BaseModel):
    patient_policy: Optional[PatientPolicyMasterModel] = None
    invoice: Optional[InvoiceMasterModel] = None
    services: Optional[ServicesModel] = None
    diagnosis_list: List[DiagnosisModel] = []
    approval_data: Optional[ApprovalMasterModel] = None
    insurance_card: Optional[InsuranceCardModel] = None
    doctor_provider: Optional[DoctorProviderMasterModel] = None

In [919]:
approval_data = extract_approval_master(section_chunks.get("approval_details", {}))

In [920]:
def extract_full_claim(section_chunks) -> FullClaimModel:

    patient_policy = extract_patient_policy_data(section_chunks.get('patient_policy_details', {}))
    invoice = extract_invoice_data(section_chunks.get('invoice_details', {})) # Call to the correct invoice extraction
    service = extract_services(section_chunks.get('service_details', {}))
    diagnosis_list = extract_diagnosis_data(section_chunks.get('diagnosis_details', []))
    doctor_provider = extract_doctor_master(section_chunks.get("doctor_details", {}))
    approval_data = extract_approval_master(section_chunks.get("approval_details", {}))
    insurance_card = extract_insurance_card_master(section_chunks.get("insurance_card_details", {}))


    # Wrap all into the full claim model
    full_claim = FullClaimModel(
        patient_policy=patient_policy,
        invoice=invoice,
        services=service ,
        diagnosis_list=diagnosis_list,
        approval_data = approval_data,
        insurance_card=insurance_card,
        doctor_provider=doctor_provider,




           # Can add services extraction if you have a function for it
    )

    return full_claim


In [921]:
full_claim_details = extract_full_claim(section_chunks)

In [922]:
full_claim_details

FullClaimModel(patient_policy=PatientPolicyMasterModel(card_policy_no='P003904429', card_policy_no_pages=[1, 2], policy_no='P003904429', policy_no_pages=[1, 2, 3], patient_id='327169', patient_id_pages=[1, 2, 5, 6, 7, 8], patient_name='DIAN JACOBUS HANEKOM', patient_name_pages=[1, 2, 4], cpr_no='591027801', cpr_no_pages=[1, 2, 5, 6], gender='Male', gender_pages=[3, 5, 6, 7, 8], age=65, age_pages=[3], date_of_birth='19/10/1959', date_of_birth_pages=[5, 6, 7, 8]), invoice=InvoiceMasterModel(tax_invoice='TAX INVOICE', tax_invoice_pages=[1, 2], vat_registration_no='200000763900002', vat_registration_no_pages=[1, 2], financial_code='00001946', financial_code_pages=[1, 2], invoice_no='AOP102502658', invoice_no_pages=[1, 2], invoice_date='08/10/2025', invoice_date_pages=[1, 2], customer_type='INSURANCE', customer_type_pages=[1, 2], billing_currency='BD', billing_currency_pages=[1, 2]), services=ServicesModel(services=[ServiceLineModel(page_no=1, category='Consultation Fees 350', date='08/10/2

In [923]:
inv_no = full_claim_details.invoice.invoice_no
inv_no

'AOP102502658'

# match the invoice with master data

In [924]:
def filter_excel_by_invoice(
            df: pd.DataFrame,
            invoice_no: str
            )-> pd.DataFrame:

        if 'INVOICENO' not in df.columns:
            raise ValueError("Excel missing INVOICENO column")

        df['INVOICENO'] = df['INVOICENO'].astype(str).str.strip()
        invoice_no = str(invoice_no).strip()

        return df[df['INVOICENO'] == invoice_no]

In [925]:
filter_data = filter_excel_by_invoice(excel_data, inv_no)

In [926]:
filter_data.columns

Index(['ID', 'CUSTOMER_CODE', 'CCATEGORY_NAME', 'CPR', 'POLICY_NUMBER',
       'TREATMENTSTARTDATE', 'TREATMENTENDDATE', 'INVOICENO', 'APPROVAL_NO',
       'PATIENTREGNO', 'AGE', 'GENDER', 'CLAIMTYPE', 'BENEFIT',
       'DIAGNOSISCODED1', 'DIAGNOSISCODED2', 'DIAGNOSISCODED3',
       'DIAGNOSISCODED4', 'DOCTORLICENSENO', 'DOCTORSPECIALITY', 'DOCTORNAME',
       'SERVICE_TYPE', 'SERVICE_CODE', 'NHRA CODE', 'NOOFSERVICE', 'PRICE',
       'GROSS_AMT', 'DISCOUNT_AMOUNT', 'REJECTED', 'DEDUCTABLE', 'CO_INS',
       'NET_AMT', 'REJECTION_CODE', 'INTERNAL_REMARKS', 'EXTERNAL_REMARKS',
       'CREATEDBY', 'CREATEDDATE', 'SERVICEDESCRIPTION'],
      dtype='object')

In [927]:
filter_data

Unnamed: 0,ID,CUSTOMER_CODE,CCATEGORY_NAME,CPR,POLICY_NUMBER,TREATMENTSTARTDATE,TREATMENTENDDATE,INVOICENO,APPROVAL_NO,PATIENTREGNO,...,REJECTED,DEDUCTABLE,CO_INS,NET_AMT,REJECTION_CODE,INTERNAL_REMARKS,EXTERNAL_REMARKS,CREATEDBY,CREATEDDATE,SERVICEDESCRIPTION
1,1,CIGNA100,Amentum Dyncorp US Expat Plan BHR,791342093,210774139-01,2025-10-14,,AOP102502658,0034107397,322356,...,,0.0,0.0,200.0,,,,,,EXCISION OF SEBACEOUS CYST
53,13,ALLIA100,ALLIANZ WORLDWIDE CARE GOLD CARD,591027801,P003904429,2025-10-08,,AOP102502658,NPR,327169,...,,0.0,0.0,40.0,,,,,,SPINE AND HIP (DEXA)
54,13,ALLIA100,ALLIANZ WORLDWIDE CARE GOLD CARD,591027801,P003904429,2025-10-08,,AOP102502658,NPR,327169,...,,0.0,5.3,9.7,,,,,,SPECIALIST CONSULT - INITIAL


In [928]:
CLAIM_BASIC_FIELDS = [
    "INVOICENO", "CPR", "CUSTOMER_CODE", "APPROVAL_NO", "SERVICE_CODE",
    "SERVICE_TYPE", "DIAGNOSISCODED1", "SERVICEDESCRIPTION",
    "GROSS_AMT", "DEDUCTABLE", "CO_INS", "NET_AMT"
]

# Type of CheckActionRule AppliedData MatchingComparing Service_Code and Gross_Amt between Excel and PDF.
*   Rule 1 (Technical)Clinical ValidationVerifying the doctor's written notes (ICD Name) exist to support the treatment.Rule 9 (Clinical)




# RULE 1 (INV CHECK): If service in claim data is missing from PDF Invoice, mark Deficiency-Technical

In [929]:
def adjudicate_rule1(filter_row, full_claim_details):
    line_errors = []
    FLOAT_TOLERANCE = 0.001

    # Extract Master Data from PDF Models
    invoice = full_claim_details.invoice
    pdf_services = full_claim_details.services.services
    approval_details = full_claim_details.approval_data

    pdf_invoice_no = str(getattr(invoice, "invoice_no", "")).strip().upper()

    # Extract Data from Excel Row
    excel_invoice_no = str(filter_row.get("INVOICENO", "")).strip().upper()
    excel_code = str(filter_row.get("SERVICE_CODE", "")).strip()
    excel_amt = float(filter_row.get("GROSS_AMT", 0) or 0)
    excel_approval = str(filter_row.get("APPROVAL_NO", "")).strip().upper()

    # --- 1.1 Invoice Number Match ---
    if excel_invoice_no != pdf_invoice_no:
        line_errors.append(f"Rule 1: Invoice mismatch (Excel={excel_invoice_no}, PDF={pdf_invoice_no})")

    # --- 1.2 Service Code Presence ---
    pdf_service = next((s for s in pdf_services if str(s.service_code).strip() == excel_code), None)

    if not pdf_service:
        line_errors.append(f"Rule 1: Service code {excel_code} missing in PDF invoice")
    else:
        # --- 1.3 Gross Amount Match ---
        pdf_amt = float(pdf_service.amount_inc_vat or 0)
        if abs(excel_amt - pdf_amt) > FLOAT_TOLERANCE:
            line_errors.append(f"Rule 1: Gross amount mismatch (Excel={excel_amt}, PDF={pdf_amt})")

    # --- 1.4 Approval Match (Integrated Rule 8) ---
    pdf_auth_no = str(getattr(approval_details, "approval_no", "")).strip().upper()
    # Logic: Only flag mismatch if a valid Auth exists on PDF
    if pdf_auth_no and pdf_auth_no not in ["", "NPR", "NONE", "UCAF1.0"]:
        if pdf_auth_no != excel_approval:
            line_errors.append(f"Rule 8: Approval mismatch (Excel={excel_approval}, PDF={pdf_auth_no})")

    outcome = "Deficiency-Technical" if line_errors else "Approved"

    return {
        "ReviewOutcome": outcome,
        "Remarks": "; ".join(line_errors),
        "PDF_Service_Object": pdf_service # Useful for passing category to Rule 9
    }

# Rule 9 (DIAGNOSIS CHECK)
# Before looking at labs or prices, you must verify the medical necessity.

Action: Check if a diagnosis description/definition is present in the PDF medical reports.

Requirement: Diagnosis is COMPULSORY.

Outcome: If missing, mark Deficiency with the remark "Diagnosis description/definition is compulsory."

In [930]:
# Constants for Benefit Category Identification (Bahrain Market Standards)
MATERNITY_PREFIXES = ('O00', 'O01', 'O02', 'O03', 'O04', 'O05', 'O06', 'O07', 'O08', 'O09',
                      'O10', 'O20', 'O30', 'O40', 'O60', 'O70', 'O80', 'O90', 'O99',
                      'Z34', 'Z36', 'Z37', 'Z38')

DENTAL_PREFIXES = ('K00', 'K01', 'K02', 'K03', 'K04', 'K05')

# Services exempt from Approval Copy validation in Maternity/Dental
# 99301 is the Specialist Consultation - Initial
EXEMPT_APPROVAL_SERVICES = ('99301',)

In [931]:
def rule9_diagnosis(excel_row, full_claim_details):
    """
    Production-level Adjudicator for Rule 9 (DIAGNOSIS & SPECIAL VALIDATION).
    Strictly follows Bahrain Health Insurance requirements.
    """
    line_remarks = []
    missing_docs = []

    # Extracting sub-models from the wrapper object
    # Using getattr or .get depending on how your full_claim_details is structured
    approval_details = getattr(full_claim_details, "approval_data", None)
    diagnosis_list = getattr(full_claim_details, "diagnosis_list", [])

    # 1. Extract Data with Null Safety
    excel_code = str(excel_row.get("SERVICE_CODE", "")).strip()
    excel_diag = str(excel_row.get("DIAGNOSISCODED1", "")).strip().upper()
    pdf_auth_no = str(getattr(approval_details, "approval_no", "")).strip().upper()

    # 2. Category Lookup (Standard Bahrain Prefixes)
    benefit_category = "GENERAL"
    if excel_diag.startswith(MATERNITY_PREFIXES):
        benefit_category = "MATERNITY"
    elif excel_diag.startswith(DENTAL_PREFIXES):
        benefit_category = "DENTAL"

    # 3. Rule 9: COMPULSORY Diagnosis Description Check
    # Requirement: The PDF MUST contain a text-based medical definition (icd_name).
    pdf_diag_entry = next((d for d in diagnosis_list), None)
    diag_description = getattr(pdf_diag_entry, "icd_name", "") if pdf_diag_entry else ""

    # Check if the medical description is missing or non-substantive
    if not diag_description or len(str(diag_description).strip()) < 5:
        line_remarks.append(f"Rule 9: Diagnosis description/definition is COMPULSORY (Ref: {excel_diag})")

    # 4. Special Validation: MAT/DEN Approval Copy Logic
    # EXEMPTION: 99301 (Specialist Consultation) does not require prior approval
    if benefit_category in ["MATERNITY", "DENTAL"]:
        if excel_code in EXEMPT_APPROVAL_SERVICES:
            # We acknowledge it's MAT/DEN but it is exempt from the Auth check
            line_remarks.append(f"Consultation {excel_code} is EXEMPT from Approval requirement.")
        else:
            # For procedures, labs, or pharmacy in MAT/DEN, Approval is MANDATORY
            is_valid_auth = pdf_auth_no and pdf_auth_no not in ["", "NPR", "NONE", "UCAF1.0"]
            if not is_valid_auth:
                line_remarks.append(f"Special Validation: Approval Copy/No is MANDATORY for {benefit_category}")
                missing_docs.append("Approval Copy")

    # 5. Rule 6: Page Reference Generation
    page_no = getattr(pdf_diag_entry, "page_no", "N/A")
    page_prefix = f"(PDF Page {page_no}) "

    # 6. Final Outcome Logic (The Hierarchy)
    # Priority 1: Clinical (Missing Diagnosis text) -> Deficiency-Clinical
    # Priority 2: Administrative (Missing Approval Copy) -> Deficiency
    # Priority 3: All checks pass -> Approved

    outcome = "Approved"
    if any("Rule 9" in r for r in line_remarks):
        outcome = "Deficiency-Clinical"
    elif any("MANDATORY" in r for r in line_remarks):
        outcome = "Deficiency"

    return {
        "ReviewOutcome": outcome,
        "Remarks": page_prefix + "; ".join(line_remarks) if line_remarks else "Approved",
        "BenefitCategory": benefit_category,
        "MissingDocuments": ", ".join(missing_docs) if missing_docs else "None"
    }

## Rule 2: LAB CHECK	If the service is a test, check the report pages for results.

In [1]:
# These services DO NOT require clinical result reports
NOT_MANDATORY_CLINCAL_SERVICES = [
        "CONSULT", "VISIT", "OFFICE CALL",      # Consultations
        "SUPPLY", "CONSUMABLE", "CATHETER",     # Medical Supplies
        "INJECTION", "DRESSING", "VITAL SIGN",  # Routine Nursing
        "FILE CHARGE", "ADMIN", "REPORT FEE",   # Administrative
        "PHARMACY", "TABLET", "SYRUP"           # Medications
    ]

In [934]:
def rule2_clinical_evidence_check(excel_row, full_claim_details):
    """
    Rule 2: Clinical Evidence Verification.
    This version integrates the 'get_report_pages' logic directly inside the function.
    """
    service_code = str(excel_row.get("SERVICE_CODE", ""))
    service_desc = str(excel_row.get("SERVICEDESCRIPTION", "")).upper()
    service_type_edi = str(excel_row.get("SERVICE_TYPE", "")).upper()

    # 1. TPA EXEMPTION GATE (Bahrain Standard)
    # These services DO NOT require clinical result reports

    is_exempt = (
        any(kw in service_desc for kw in NOT_MANDATORY_CLINCAL_SERVICES) or
        service_type_edi in ["PHARMACY", "MEDICAL SUPPLIES"] or
        service_code in ["99301", "99213", "99201"] # Common Consult Codes
    )

    if is_exempt:
        return {
            "ReviewOutcome": "SKIPPED",
            "Remarks": f"Rule 2: Clinical report not required for service type: {service_desc}."
        }

   # 2. IDENTIFY DIAGNOSTIC CATEGORY
    service_cat = ""
    if any(k in service_desc for k in ["LAB", "PATHOLOGY", "BLOOD", "URINE"]):
        service_cat = "LAB"
    elif any(k in service_desc for k in ["X-RAY", "CT", "MRI", "ULTRASOUND", "DEXA", "SCAN"]):
        service_cat = "RAD"
    elif any(k in service_desc for k in ["ECG", "EKG", "ECHO", "CARDIOLOGY"]):
        service_cat = "CARDIO"

    # If it's not a diagnostic category and not exempt, we skip it
    # (e.g., General Procedures often covered by Rule 8)
    if not service_cat:
        return {"ReviewOutcome": "SKIPPED", "Remarks": "Rule 2: Not a targeted diagnostic report type."}

    # 3. DEFINE ANCHORS
    anchors_map = {
        "LAB": ["LABORATORY", "RESULT", "UNITS", "REFERENCE RANGE"],
        "RAD": ["FINDINGS", "IMPRESSION", "RADIOLOGY REPORT", "CLINICAL INDICATION"],
        "CARDIO": ["SINUS RHYTHM", "VENTRICLE", "INTERPRETATION", "EJECTION FRACTION"]
    }
    target_anchors = anchors_map.get(service_cat, [])

    # 4. SEARCH FOR REPORT PAGES
    found_pages = []
    seen_pages = set()

    for chunk in getattr(full_claim_details, "chunks", []):
        line_text = chunk["line"].upper()
        page_no = chunk["page_no"]

        if any(anchor in line_text for anchor in target_anchors):
            if page_no not in seen_pages:
                found_pages.append({"page_no": page_no, "text": line_text})
                seen_pages.add(page_no)

    # 5. FINAL DECISION
    if not found_pages:
        return {
            "ReviewOutcome": "Deficiency-Clinical",
            "Remarks": f"Rule 2: No {service_cat} clinical report/result found for {service_code}."
        }

    # Substance check to ensure it's a Report and not just an Order/Request
    combined_text = " ".join([p["text"] for p in found_pages])
    if service_cat == "RAD" and not any(k in combined_text for k in ["FINDINGS", "IMPRESSION", "RESULT"]):
        return {
            "ReviewOutcome": "Deficiency-Clinical",
            "Remarks": f"Rule 2: RAD Request found (Page {found_pages[0]['page_no']}), but clinical findings/results are missing."
        }

    return {
        "ReviewOutcome": "Approved",
        "Remarks": f"Rule 2: {service_cat} Evidence verified (Ref: Page {found_pages[0]['page_no']})",
        "EvidencePage": found_pages[0]['page_no']
    }

# Rule 8
Rule 8: Content Match
* "I have a number in Excel AND a PDF in hand."
* The specific string of digits/letters.
* "Approval Number Mismatch."
* (AUTH MATCH): Verify Approval Copy services match Invoice exactly

In [935]:
def adjudicate_rule_8(filter_data, full_claim_details):
    """
    Rule 8 (AUTH MATCH): Verify Excel Approval No matches PDF Approval No.
    """
    excel_auth = str(filter_data.get('APPROVAL_NO', "")).strip().upper()
    # The number extracted from the PDF Approval Copy
    pdf_auth_no = str(getattr(full_claim_details.approval_data, "approval_no", "")).strip().upper()

    # Logic: Only run if there's an actual number to compare (Skip if NPR)
    if excel_auth not in ["NPR", "NONE", "", "0", "UCAF1.0"]:

        # Check 1: Number Mismatch
        if excel_auth != pdf_auth_no:
            return {
                "ServiceCode": filter_data.get("SERVICE_CODE"),
                "ReviewOutcome": "Deficiency-Authorization", # Correct Category
                "Remarks": f"Rule 8: Authorization Match Failure. EDI No: {excel_auth} does not match PDF Copy No: {pdf_auth_no}.",
                "MissingDoc": "Corrected Authorization Copy"
            }

        # Check 2: Service Match (As per your Rule 8 requirement)
        # Verify the service code in Excel exists in the list of approved services in the PDF
        approved_services = getattr(full_claim_details.approval_data, "approved_service_codes", [])
        service_code = str(filter_data.get("SERVICE_CODE")).strip()

        if approved_services and service_code not in approved_services:
            return {
                "ServiceCode": service_code,
                "ReviewOutcome": "Deficiency-Authorization",
                "Remarks": f"Rule 8: Service {service_code} not found in the attached Approval Copy list.",
                "MissingDoc": "None"
            }

    return {
        "ServiceCode": filter_data.get("SERVICE_CODE"),
        "ReviewOutcome": "Approved",
        "Remarks": "Rule 8 Verified: Authorization matches service.",
        "MissingDoc": "None"
    }

# Rule 5

The primary goal of Rule 5 is to prevent Financial Leakage. It validates that

the hospital has correctly calculated and collected the patient’s share.

* Reconciliation: The sum of CO_INS and DEDUCTABLE in your Excel must equal the patient_part or "Patient Share" found on the physical invoice.

* Rounding Standards: In Bahrain, calculations must be precise to three decimal places (e.g., 0.319 BD).

* Mandatory Co-pay: If the diagnosis is specialized (Maternity/Dental), Rule 5 checks if the mandatory patient share was applied.

In [936]:
import numpy as np

def adjudicate_rule_5(excel_row, full_claim_details, global_pdf_share):
    """
    Final Rule 5 Logic for Bahrain Market.

    Args:
        excel_row: The current row from the hospital's EDI/Excel.
        full_claim_details: The Pydantic model containing extracted PDF data.
        global_pdf_share: The TOTAL patient share found on the PDF Invoice header/footer.
    """
    # --- 1. Identify Service & Category ---
    s_code = str(excel_row.get("SERVICE_CODE", "")).strip()
    diag_list = getattr(full_claim_details, "diagnosis_list", [])
    principal_diag = str(diag_list[0].principal_code).upper() if diag_list else ""

    # Benefit Categories mapping
    is_maternity = principal_diag.startswith(('O', 'Z34', 'Z36', 'Z37', 'Z38'))
    is_dental = principal_diag.startswith(('K00', 'K01', 'K02', 'K03', 'K04', 'K05'))
    benefit_cat = "MATERNITY" if is_maternity else ("DENTAL" if is_dental else "GENERAL")

    # --- 2. Extract Claimed Share (Excel) ---
    excel_co_ins = float(excel_row.get('CO_INS', 0))
    excel_deduct = float(excel_row.get('DEDUCTABLE', 0))
    # Requirement: Total Claimed = Co-insurance + Deductible
    excel_share_claimed = round(excel_co_ins + excel_deduct, 3)

    # --- 3. Extract Evidenced Share (PDF Line Level) ---
    pdf_match = next((s for s in full_claim_details.services.services if str(s.service_code) == s_code), None)
    pdf_line_share = round(float(pdf_match.patient_part), 3) if pdf_match and hasattr(pdf_match, 'patient_part') else 0.0

    # --- 4. Decision Logic Waterfall ---
    review_outcome = "Approved"
    remarks = "Rule 5 Verified: Financials reconciled."

    # A. GLOBAL OVERRIDE: If Invoice says patient paid, but Excel says 0
    if global_pdf_share > 0 and excel_share_claimed <= 0:
        review_outcome = "Deficiency-Financial"
        remarks = f"Rule 5: Invoice shows Patient Share of {global_pdf_share} BD, but line {s_code} claims 0.000 BD."

    # B. MATERNITY/DENTAL MANDATORY CHECK (Exempting 99301)
    elif (is_maternity or is_dental) and excel_share_claimed <= 0 and s_code != '99301':
        review_outcome = "Deficiency-Financial"
        remarks = f"Rule 5: Mandatory Patient Share missing for {benefit_cat} benefit (Code 99301 exempt)."

    # C. LINE-LEVEL VARIANCE CHECK
    elif not np.isclose(excel_share_claimed, pdf_line_share, atol=0.001):
        review_outcome = "Deficiency-Financial"
        remarks = f"Rule 5: Variance detected. Excel: {excel_share_claimed:.3f} BD vs Invoice: {pdf_line_share:.3f} BD."

    # --- 5. Return Output Fields as per Requirement ---
    return {
        "ReviewOutcome": review_outcome,
        "Remarks": remarks,
        "BenefitCategory": benefit_cat,
        "PatientShareExpected": excel_share_claimed, # What the EDI claims
        "PatientShareInvoice": pdf_line_share,       # What the PDF shows
        "PatientShareVariance": round(abs(excel_share_claimed - pdf_line_share), 3)
    }

## **Rule** 7
* Prefix Dependency: It looks for XI (Inpatient) and XP (Specialized) before making a decision.

* No "NPR" for XI: If a hospital bills a surgery on an XI invoice and puts "NPR," this function will catch it and flag it as Deficiency-Authorization.

* Audit Trail: It ensures the physical PDF "Authorization Copy" is checked, not just the text in the Excel.

In [937]:
def adjudicate_rule_7(row, full_claim_details, high_value_flag):
    """
    Rule 7: Mandatory Authorization & Evidence.
    Logic: Triggers on Specialized Prefixes (XI/XP) OR High Invoice Value (>100 BD).
    """
    # 1. Identity the Invoice Type
    invoice_no = str(getattr(full_claim_details.invoice, "invoice_no", "")).upper().strip()
    is_specialized = invoice_no.startswith(('XI', 'XP'))

    # 2. Check for Physical Evidence
    # We check if a physical letter was detected by the OCR engine
    has_physical_copy = getattr(full_claim_details.approval_data, "authorization_letter", False)

    # 3. Get the Approval Number from EDI
    raw_auth = row.get("APPROVAL_NO")
    excel_auth = str(raw_auth).strip().upper() if raw_auth else ""

    # 4. Define placeholders that are illegal for high-value/specialized claims
    invalid_placeholders = ["", "NPR", "N", "UCAF 1.0", "UCAF1.0", "PENDING", "NAN"]

    outcome = "Approved"
    remarks = []

    # --- THE WATERFALL LOGIC ---

    # TRIGGER 1: Specialized Prefix Scrutiny
    if is_specialized:
        if excel_auth in invalid_placeholders:
            outcome = "Deficiency-Authorization"
            remarks.append(f"Rule 7: Specialized Prefix ({invoice_no}) requires valid Auth; '{excel_auth}' not allowed.")
        elif not has_physical_copy:
            outcome = "Deficiency-Authorization"
            remarks.append("Rule 7: Physical Authorization Letter missing for specialized invoice.")

    # TRIGGER 2: Total Invoice High-Value Scrutiny (>100 BD)
    # This uses the flag calculated BEFORE the loop starts
    elif high_value_flag:
        if excel_auth in invalid_placeholders:
            outcome = "Deficiency-Authorization"
            remarks.append(f"Rule 7: High-Value Claim (>100 BD) cannot use placeholder '{excel_auth}'.")
        elif not has_physical_copy:
            outcome = "Deficiency-Authorization"
            remarks.append("Rule 7: High-Value Claim requires physical Authorization Letter attachment.")

    return {
        "ReviewOutcome": outcome,
        "Remarks": " | ".join(remarks) if remarks else "Rule 7 Verified.",
        "IsSpecialized": is_specialized,
        "IsHighValue": high_value_flag
    }

# Rule 4 (AUTH COPY): Requirement Fulfillment.
Verifies that a physical 'Approval Copy' exists if an Auth No is claimed.

In [938]:
def adjudicate_rule_4(row, full_claim_details):
    """
    Rule 4 (AUTH COPY): Requirement Fulfillment.
    Verifies that a physical 'Approval Copy' exists if an Auth No is claimed.
    """
    excel_auth = str(row.get("APPROVAL_NO", "")).strip().upper()

    # 2. Get Model Evidence (ApprovalMasterModel)
    # is_approval_copy is the PRIMARY flag for Rule 4
    is_copy_attached = getattr(full_claim_details.approval_data, "is_approval_copy", False)

    # 3. Check for specific TPA Auth Letter identification
    # In your model, if authorization_letter is None, evidence is missing.
    auth_letter_ref = getattr(full_claim_details.approval_data, "authorization_letter", None)

    # Combined logic for Evidence Presence
    has_physical_evidence = is_copy_attached or (auth_letter_ref is not None)

    # Define placeholders that count as "No Auth Claimed"
    placeholders = ["", "NPR", "NONE", "0", "UCAF1.0", "UCAF 1.0"]

    outcome = "Approved"
    remarks = ""

    # CASE A: Hospital claims to have an Auth No, but no paper is attached
    if excel_auth not in placeholders:
        if not has_physical_evidence:
            outcome = "Deficiency-Authorization"
            remarks = f"Rule 4: Authorization Number {excel_auth} provided in EDI, but physical Approval Copy is missing from PDF attachments."

    # CASE B: Placeholder 'UCAF1.0' used (Audit Warning)
    elif excel_auth == "UCAF1.0":
        # Even if UCAF is used, Rule 4 requires evidence for many TPAs
        if not has_physical_evidence:
            outcome = "Deficiency-Authorization"
            remarks = "Rule 4: Placeholder 'UCAF1.0' detected. Actual TPA Approval Letter is missing."

    return {
        "ReviewOutcome": outcome,
        "Remarks": remarks if remarks else "Rule 4 Passed: Physical evidence confirmed.",
        "MissingDoc": "Approval Letter" if outcome != "Approved" else "None"
    }

# Rule 3 (HEALTH 360): OCR E-card verification.
Triggers if TPA or Insurance Name is Health 360.

In [939]:
def adjudicate_rule_3(row, full_claim_details):

    # 1. Identify Health 360 from either TPA Name or Insurance Company Name
    tpa_name = str(getattr(full_claim_details.approval_data, "tpa_name", "")).upper()
    insurance_name = str(getattr(full_claim_details.invoice, "insurance_company", "")).upper()

    is_health360 = any(kw in tpa_name or kw in insurance_name for kw in ["HEALTH 360", "HEALTH360", "H360"])

    if not is_health360:
        return {"ReviewOutcome": "Approved", "Remarks": "Rule 3: Not a Health 360 claim."}

    # 2. Extract Data from E-Card (Model) and Invoice (Header)
    card_cpr = str(getattr(full_claim_details.card_data, "cpr_no", "")).strip()
    inv_cpr = str(getattr(full_claim_details.invoice, "patient_id", "")).strip()

    card_policy = str(getattr(full_claim_details.card_data, "policy_no", "")).strip()
    inv_policy = str(getattr(full_claim_details.invoice, "policy_no", "")).strip()


    # 3. Validation Logic
    outcome = "Approved"
    remarks = []

    # Check A: CPR Identity
    if card_cpr and inv_cpr and card_cpr != inv_cpr:
        outcome = "Deficiency-Technical"
        remarks.append(f"Rule 3: CPR Mismatch [Card: {card_cpr} vs Inv: {inv_cpr}]")

    # Check B: Policy Integrity
    if card_policy and inv_policy and card_policy != inv_policy:
        outcome = "Deficiency-Technical"
        remarks.append(f"Rule 3: Policy No Mismatch [Card: {card_policy} vs Inv: {inv_policy}]")


    return {
        "ReviewOutcome": outcome,
        "Remarks": " | ".join(remarks) if remarks else "Rule 3: Health 360 Eligibility Confirmed.",
        "IsHealth360": True
    }

## **Rule 6**
remark tells the auditor exactly which PDF page justifies the decision.

In [940]:
def get_page_ref(page_data):
    """Utility to turn [1, 2] or 1 into '1, 2' or '1'."""
    if isinstance(page_data, list):
        return ", ".join(map(str, sorted(list(set(page_data)))))
    return str(page_data) if page_data else "N/A"

Rule,If it Fails...,Impact on the Waterfall
* Rule 1,FAIL,"SKIP Rule 5. Continue to Rule 9, 2, 8, 7."
* Rule 9,FAIL,"CONTINUE. Even if the diagnosis is wrong, we must check for reports and auth."
* Rule 8,FAIL,CONTINUE. An unauthorized service still needs a price and financial check.
* Rule 5,FAIL,CONTINUE. This is the final math check; we proceed to Rule 6.
* Rule 6,N/A,ALWAYS RUN. This is the audit trail (Page Ref) for whatever outcome we reached.

# Run the Rules

In [2]:
import pandas as pd

def process_claim_adjudication(excel_df, full_claim_details):
    """
    Core Adjudication Engine with Waterfall Stops for Multi-Service Claims.
    """
    final_adjudicated_report = []

    total_invoice_amt = excel_df['GROSS_AMT'].sum() if 'GROSS_AMT' in excel_df else 0.0
    high_value_flag = total_invoice_amt > 100.000

    # 1. Capture the "Source of Truth" Global Share from PDF
    # In your FullClaimModel, 'invoice.patient_share' usually contains the total share printed on the PDF
    global_pdf_share = getattr(full_claim_details.invoice, "patient_share", 0.0)

    # 1. Global PDF Data Extraction (Header Level)
    # Checking for physical evidence presence once at the start
    is_copy_attached = getattr(full_claim_details.approval_data, "is_approval_copy", False)

    # 2. Iterate through the Excel Rows (Each Service Line)
    for index, row in excel_df.iterrows():
        service_code = str(row.get("SERVICE_CODE", "")).strip()
        service_desc = str(row.get("SERVICEDESCRIPTION", "")).upper()

        # --- GATE 1: TECHNICAL, ELIGIBILITY & CLINICAL (The Foundation) ---
        r1_result = adjudicate_rule1(row, full_claim_details)      # Invoice Check
        r9_result = rule9_diagnosis(row, full_claim_details)      # Compulsory Diagnosis
        r3_result = adjudicate_rule_3(row, full_claim_details)    # Health 360 / E-Card
        r2_result = rule2_clinical_evidence_check(row, full_claim_details) # Clinical Evidence

        # --- GATE 2: AUTHORIZATION WATERFALL (Rule 4 -> Rule 8) ---
        # Rule 4: Verify physical document exists
        r4_result = adjudicate_rule_4(row, full_claim_details)

        # WATERFALL STOP: Rule 8 (Match) runs ONLY if Rule 4 (Paper) passes
        if r4_result["ReviewOutcome"] == "Approved":
            r8_result = adjudicate_rule_8(row, full_claim_details)
        else:
            r8_result = {
                "ReviewOutcome": "SKIPPED",
                "Remarks": "Rule 8 Skipped: No Approval Copy available to match services."
            }

        # Rule 7: Prefix (XI/XP) & High-Value Scrutiny
        r7_result = adjudicate_rule_7(row, full_claim_details, high_value_flag)

        # --- GATE 3: FINANCIAL WATERFALL (Rule 1 -> Rule 5) ---
        # WATERFALL STOP: Rule 5 (Math) runs ONLY if Rule 1 (Price) is confirmed
        if r1_result["ReviewOutcome"] == "Approved":
            r5_result = adjudicate_rule_5(row, full_claim_details, global_pdf_share)
        else:
            r5_result = {
                "ReviewOutcome": "SKIPPED",
                "Remarks": "Rule 5 Skipped: Financials cannot be verified due to Rule 1 price mismatch."
            }

        # --- STEP 4: MERGE & PRIORITY LOGIC ---
        # Collect all active rules for this line item
        all_results = [r1_result, r9_result, r3_result, r2_result, r4_result, r8_result, r7_result, r5_result]
        current_outcomes = [res["ReviewOutcome"] for res in all_results]

        # Determine Final Outcome based on Bahrain Priority:
        # Clinical > Authorization > Financial > Technical
        if "Deficiency-Clinical" in current_outcomes:
            final_outcome = "Deficiency-Clinical"
        elif "Deficiency-Authorization" in current_outcomes:
            final_outcome = "Deficiency-Authorization"
        elif "Deficiency-Financial" in current_outcomes:
            final_outcome = "Deficiency-Financial"
        elif "Deficiency-Technical" in current_outcomes:
            final_outcome = "Deficiency-Technical"
        else:
            final_outcome = "Approved"

        # --- EXTRACT MISSING DOCUMENTS ---
        missing_docs_list = []
        if r4_result["ReviewOutcome"] != "Approved": missing_docs_list.append("Physical Approval Copy")
        if r2_result["ReviewOutcome"] != "Approved" and row.get("CATEGORY") == "LAB": missing_docs_list.append("Lab Report")
        if r9_result["ReviewOutcome"] != "Approved": missing_docs_list.append("Diagnosis/ICD-10 Description")
        if r7_result["ReviewOutcome"] != "Approved": missing_docs_list.append("Valid Authorization Letter")

        # Step 5: Final Remarks Formatting (Traceability & Page Referencing)
        service_page = row.get("page_no", "N/A")
        # Combine only actual deficiencies/remarks
        final_remarks_list = [res["Remarks"] for res in all_results if res["ReviewOutcome"] not in ["Approved", "SKIPPED", "PASS"]]
        remarks_str = "; ".join(final_remarks_list) if final_remarks_list else "Approved"
        remarks_with_page = f"{remarks_str} [Ref: Page {service_page}]"

        # --- NEW: SAFE RETRIEVAL FOR FINANCIAL FIELDS ---
        # Using .get(key, default) prevents the KeyError if Rule 5 was skipped
        expected = r5_result.get("PatientShareExpected", 0.0)
        invoice_val = r5_result.get("PatientShareInvoice", 0.0)
        variance = r5_result.get("PatientShareVariance", 0.0)

        # Step 6: Create the JSON-style dictionary for the report
        adjudicated_line = {
            "ServiceCode": service_code,
            "Description": service_desc,
            "ReviewOutcome": final_outcome,
            "Remarks": remarks_with_page,
            "BenefitCategory": r9_result.get("BenefitCategory", "GENERAL"),
            "MissingDocuments": ", ".join(missing_docs_list) if missing_docs_list else "None",
            "PatientShareExpected": expected,
            "PatientShareInvoice": invoice_val,
            "PatientShareVariance": variance,
            "ValidationStatus": {
                "Rule1": r1_result["ReviewOutcome"],
                "Rule9": r9_result["ReviewOutcome"],
                "Rule3": r3_result["ReviewOutcome"],
                "Rule2": r2_result["ReviewOutcome"],
                "Rule4": r4_result["ReviewOutcome"],
                "Rule8": r8_result["ReviewOutcome"],
                "Rule7": r7_result["ReviewOutcome"],
                "Rule5": r5_result["ReviewOutcome"]
            }
        }

        final_adjudicated_report.append(adjudicated_line)

    # 3. Return final result as a DataFrame
    return pd.DataFrame(final_adjudicated_report)

In [942]:
ouput = process_claim_adjudication(filter_data, full_claim_details)

In [943]:
ouput

Unnamed: 0,ServiceCode,Description,ReviewOutcome,Remarks,BenefitCategory,MissingDocuments,PatientShareExpected,PatientShareInvoice,PatientShareVariance,ValidationStatus
0,64737,EXCISION OF SEBACEOUS CYST,Deficiency-Authorization,Rule 1: Service code 64737 missing in PDF invo...,GENERAL,Physical Approval Copy,0.0,0.0,0.0,"{'Rule1': 'Deficiency-Technical', 'Rule9': 'Ap..."
1,76076,SPINE AND HIP (DEXA),Deficiency-Clinical,Rule 2: No RAD report found for service 76076....,GENERAL,,0.0,0.0,0.0,"{'Rule1': 'Approved', 'Rule9': 'Approved', 'Ru..."
2,99301,SPECIALIST CONSULT - INITIAL,Approved,Approved [Ref: Page N/A],GENERAL,,5.3,5.3,0.0,"{'Rule1': 'Approved', 'Rule9': 'Approved', 'Ru..."
