# Saskatchewan Tariff Extraction Pipeline

**Phase 1**: Extract tariff codes with L1/L2/L3/L4 hierarchy  
**Phase 2**: GPT enrichment for metadata extraction

**Features:**
- Extracts physician billing codes from Saskatchewan's Payment Schedule
- Hierarchical categorization (Section → Category → Subcategory)
- AI-powered enrichment: parent codes, add-ons, restrictions, exclusions
- Checkpointing for crash recovery during long runs

**Key differences from Manitoba:**
- Code format: alphanumeric (70A, 153A) vs 4-digit
- Fee format: $30.00 vs ...30.00
- Section structure: Section A-Y vs body systems

---

In [None]:
!pip install openai -q
import re
import json
import time
import pandas as pd
from typing import Optional, List, Tuple, Dict
from openai import OpenAI
from google.colab import files, userdata
import getpass
import os

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 100)
print("✓ Imports ready")

In [None]:
# Upload source file
print("Upload the marked-up SK payment schedule text file (sk_marked.txt)")
uploaded = files.upload()
SOURCE_FILE = list(uploaded.keys())[0]
with open(SOURCE_FILE, 'r', encoding='utf-8') as f:
    RAW_TEXT = f.read()
LINES = RAW_TEXT.split('\n')
print(f"✓ {len(LINES):,} lines loaded")

---
## Configuration & Constants
---

In [None]:
# =============================================================================
# CONFIGURATION CONSTANTS - MODIFIED FOR SASKATCHEWAN
# =============================================================================

# Extraction settings - SK uses "S E C T I O N  A" format
CONTENT_START_MARKER = '«L1:S E C T I O N'
CONTENT_START_MIN_LINE = 1000
HIERARCHY_LOOKAHEAD_LINES = 5

# Output settings
MAX_DESCRIPTION_LENGTH = 500
MAX_NOTES_LENGTH = 1000
MAX_CONTEXT_LENGTH = 3500

# GPT settings
GPT_MODEL = "gpt-4o"
GPT_TEMPERATURE = 0
API_CALL_DELAY_SECONDS = 0.1
COST_PER_ENTRY_ESTIMATE = 0.005
TIME_PER_ENTRY_SECONDS = 0.5

# Progress reporting
PROGRESS_REPORT_INTERVAL = 25

# Checkpointing settings
CHECKPOINT_INTERVAL = 100  # Save checkpoint every N entries
CHECKPOINT_FILE = 'SK_phase2_checkpoint.json'

# Fee parsing - SK uses dollar amounts like $30.00
# No unit value detection for SK (different format)

# Section code mappings - SK uses Section letters A-Y
# SK sections are explicitly named (Section A, B, C...) not body systems
SECTION_PATTERNS = [
    (r'section\s*a\s*[–-]|general\s+services', 'A'),
    (r'section\s*b\s*[–-]|general\s+practice', 'B'),
    (r'section\s*c\s*[–-]|internal\s+medicine', 'C'),
    (r'section\s*d\s*[–-]|pediatric', 'D'),
    (r'section\s*e\s*[–-]|dermatology', 'E'),
    (r'section\s*f\s*[–-]|neurology', 'F'),
    (r'section\s*g\s*[–-]|psychiatry', 'G'),
    (r'section\s*h\s*[–-]|anesthesia', 'H'),
    (r'section\s*i\s*[–-]|obstetrics', 'I'),
    (r'section\s*j\s*[–-]|ophthalmology', 'J'),
    (r'section\s*k\s*[–-]|otolaryngology', 'K'),
    (r'section\s*l\s*[–-]|general\s+surgery', 'L'),
    (r'section\s*m\s*[–-]|cardiac|cardiovascular', 'M'),
    (r'section\s*n\s*[–-]|plastic', 'N'),
    (r'section\s*o\s*[–-]|neurosurgery', 'O'),
    (r'section\s*p\s*[–-]|orthopedic', 'P'),
    (r'section\s*q\s*[–-]|thoracic', 'Q'),
    (r'section\s*r\s*[–-]|urology', 'R'),
    (r'section\s*s\s*[–-]|physical\s+medicine', 'S'),
    (r'section\s*t\s*[–-]|emergency', 'T'),
    (r'section\s*v\s*[–-]|laboratory', 'V'),
    (r'section\s*w\s*[–-]|diagnostic\s+radiology', 'W'),
    (r'section\s*x\s*[–-]|nuclear|radiation', 'X'),
    (r'section\s*y\s*[–-]|pathology', 'Y'),
]

# Add-on detection patterns
ADD_ON_PATTERNS = [
    r'\badd\b', r'\badd-on\b', r'\badditional\b', r'\bsupplement\b',
    r'\beach additional\b', r'\bper additional\b', r'\badd to\b',
]

# Final output column order
OUTPUT_COLUMNS = [
    'tariff_code', 'tariff_code_display', 'parent_code',
    'section_code', 'section_name', 'specialty_code', 'specialty_name',
    'category', 'subcategory', 'subsubcategory',
    'description', 'notes',
    'fee_specialist', 'fee_gp',
    'is_add_on', 'add_on_to',
    'age_restriction', 'setting_restriction', 'exclusions',
    'is_provisional', 'is_asterisked', 'is_by_report',
    'is_cross_reference', 'cross_reference_to',
    'applicable_rules', 'time_requirement_minutes'
]

print("✓ Configuration constants defined")

---
## Phase 1: Extraction with L1/L2/L3 Hierarchy
---

In [None]:
# =============================================================================
# TEXT CLEANING UTILITIES
# =============================================================================

def clean_text(text: str) -> str:
    """Normalize dashes and clean encoding issues."""
    text = text.replace('—', '-')
    text = text.replace('–', '-')
    text = text.replace('−', '-')
    text = text.replace('‐', '-')
    text = text.replace('\u00a0', ' ')
    text = re.sub(r'\s+', ' ', text).strip()
    return text


def to_title_case(text: str) -> str:
    """Convert text to title case, handling special patterns."""
    if pd.isna(text) or not str(text).strip():
        return text
    text = str(text).strip()

    # Handle 'S E C T I O N' pattern (spaced letters)
    if re.match(r'^[A-Z]\s+[A-Z]', text):
        text = re.sub(r'\b([A-Z])\s+([A-Z]+)', r'\1\2', text)
        text = re.sub(r'\s*,\s*', ', ', text)
        text = re.sub(r'\s+', ' ', text)

    text = text.title()

    # Preserve lowercase for small words
    for word in ['And', 'Or', 'The', 'A', 'An', 'Of', 'In', 'For', 'To', 'By']:
        text = re.sub(r'\s' + word + r'\s', ' ' + word.lower() + ' ', text)

    return text


print("✓ Text cleaning utilities defined")

In [None]:
# =============================================================================
# SECTION AND SPECIALTY LOOKUPS - MODIFIED FOR SK
# =============================================================================

def get_section_code(l1_text: str) -> str:
    """Map L1 section text to section code (A-Y)."""
    l1_lower = l1_text.lower()
    
    # SK format: "S E C T I O N  A – General Services" 
    # Extract section letter directly
    match = re.search(r's\s*e\s*c\s*t\s*i\s*o\s*n\s+([a-y])\s*[–-]', l1_lower)
    if match:
        return match.group(1).upper()
    
    # Fallback to pattern matching
    for pattern, code in SECTION_PATTERNS:
        if re.search(pattern, l1_lower):
            return code
    return ''


def get_specialty_info(l1_text: str) -> Tuple[str, str]:
    """Extract specialty code and name from L1 text."""
    # SK format: "S E C T I O N  A – General Services"
    match = re.search(r's\s*e\s*c\s*t\s*i\s*o\s*n\s+([a-y])\s*[–-]\s*(.+)', l1_text, re.IGNORECASE)
    if match:
        section_letter = match.group(1).upper()
        section_name = match.group(2).strip()
        return section_letter, section_name
    return '', ''


print("✓ Section and specialty lookups defined")

In [None]:
# =============================================================================
# FEE AND CONTENT EXTRACTION - MODIFIED FOR SK
# =============================================================================

def find_content_start(lines: List[str]) -> int:
    """Find the line where main content begins (Section A)."""
    for i, line in enumerate(lines):
        # SK starts at "S E C T I O N  A"
        if CONTENT_START_MARKER in line and i > CONTENT_START_MIN_LINE:
            return i
        # Alternative: look for Section A pattern
        if re.search(r'«L1:S\s*E\s*C\s*T\s*I\s*O\s*N\s+A', line, re.IGNORECASE) and i > 100:
            return i
    return 0


def parse_fee(val: str) -> float:
    """Parse fee string, handling comma-separated thousands and $ signs."""
    val = val.replace(',', '').replace('$', '').strip()
    return float(val)


def extract_fee_from_block(block: str) -> dict:
    """Parse fee information from a code block - SK format.
    
    SK format: $30.00 $30.00 (Specialist, GP) or single $30.00
    """
    result = {'fee_specialist': None, 'fee_gp': None, 'is_by_report': False}

    if re.search(r'By Report', block, re.IGNORECASE):
        result['is_by_report'] = True

    # SK Fee pattern: $XX.XX (with optional comma for thousands)
    FEE_PATTERN = r'\$([\d,]+\.\d{2})'

    # Find all dollar amounts in the block
    fees = re.findall(FEE_PATTERN, block)
    
    if len(fees) >= 2:
        # Two fees: Specialist and GP
        result['fee_specialist'] = parse_fee(fees[-2])  # Second to last
        result['fee_gp'] = parse_fee(fees[-1])  # Last
    elif len(fees) == 1:
        # Single fee
        result['fee_specialist'] = parse_fee(fees[0])
        result['fee_gp'] = parse_fee(fees[0])

    return result


def extract_description(block: str, code: str) -> str:
    """Extract and clean description text from a code block."""
    # Remove the CODE marker
    text = re.sub(r'«CODE:~?\d{1,4}[A-Z]\*?»', '', block)
    
    lines = []
    for line in text.split('\n'):
        # Stop at notes
        if re.match(r'^\s*Notes?:', line, re.IGNORECASE):
            break
        # Skip page headers/footers
        if re.match(r'^\s*April 1,|^Page\s+\d+|^\f', line, re.IGNORECASE):
            continue
        if re.match(r'^Payment Schedule for Insured', line, re.IGNORECASE):
            continue
        if line.strip():
            lines.append(line)
    
    desc = ' '.join(lines)
    
    # Remove the code at the start
    desc = re.sub(r'^\s*~?' + re.escape(code) + r'\*?\s*', '', desc)
    
    # Remove fee amounts
    desc = re.sub(r'\s*\$[\d,]+\.\d{2}', '', desc)
    
    # Remove trailing classification codes (D, 0, L, M, H, etc.)
    desc = re.sub(r'\s+[DLMH0-9]+\s*$', '', desc)
    
    desc = clean_text(desc)
    return desc[:MAX_DESCRIPTION_LENGTH]


def extract_notes(block: str) -> str:
    """Extract notes section from a code block."""
    match = re.search(r'Notes?:\s*(.+?)(?=«|$)', block, re.DOTALL | re.IGNORECASE)
    if match:
        notes = clean_text(match.group(1))
        return notes[:MAX_NOTES_LENGTH]
    return ''


def check_cross_reference(block: str) -> Tuple[bool, str]:
    """Check if block contains a cross-reference to another section."""
    patterns = [r'See Section\s*([A-Y])', r'See\s+(General\s+Services|Laboratory)']
    for pattern in patterns:
        match = re.search(pattern, block, re.IGNORECASE)
        if match:
            return True, match.group(1)
    return False, ''


def extract_rules(block: str) -> str:
    """Extract applicable rule references from a code block."""
    rules = set()
    for match in re.finditer(r'Rules?\s+(\d+(?:\s+to\s+\d+)?)', block, re.IGNORECASE):
        rules.add(match.group(1))
    return ', '.join(sorted(rules)) if rules else ''


def extract_time_requirement(block: str) -> Optional[int]:
    """Extract minimum time requirement in minutes from a code block."""
    match = re.search(r'(\d+)\s*minutes?', block, re.IGNORECASE)
    return int(match.group(1)) if match else None


def is_add_on_fee(block: str, description: str) -> bool:
    """Detect if this is an add-on fee based on text patterns."""
    text = (block + ' ' + description).lower()
    for pattern in ADD_ON_PATTERNS:
        if re.search(pattern, text):
            return True
    return False


print("✓ Fee and content extraction functions defined")

In [None]:
# =============================================================================
# HIERARCHY EXTRACTION
# =============================================================================

def extract_hierarchy_text(lines: List[str], start_idx: int, fallback_text: str) -> str:
    """
    Extract readable text for a hierarchy level marker.
    """
    for k in range(start_idx + 1, min(start_idx + HIERARCHY_LOOKAHEAD_LINES, len(lines))):
        next_line = lines[k].strip()
        if next_line and not next_line.startswith('«') and not next_line.startswith('\f'):
            # For SK, handle the spaced SECTION pattern
            cleaned = clean_text(next_line.split('$')[0])  # Remove fee amounts
            return cleaned
    return clean_text(fallback_text)


class HierarchyTracker:
    """Tracks the current L1/L2/L3/L4 hierarchy state during extraction."""

    def __init__(self):
        self.l1 = ""
        self.l2 = ""
        self.l3 = ""
        self.l4 = ""

    def set_l1(self, value: str):
        """Set L1 and reset L2, L3, L4."""
        self.l1 = value
        self.l2 = ""
        self.l3 = ""
        self.l4 = ""

    def set_l2(self, value: str):
        """Set L2 and reset L3, L4."""
        self.l2 = value
        self.l3 = ""
        self.l4 = ""

    def set_l3(self, value: str):
        """Set L3 and reset L4."""
        self.l3 = value
        self.l4 = ""

    def set_l4(self, value: str):
        """Set L4."""
        self.l4 = value

    def get_current(self) -> Tuple[str, str, str, str]:
        """Return current hierarchy as tuple (l1, l2, l3, l4)."""
        return self.l1, self.l2, self.l3, self.l4


print("✓ Hierarchy extraction functions defined")

In [None]:
# =============================================================================
# MAIN EXTRACTION FUNCTION - MODIFIED FOR SK CODE FORMAT
# =============================================================================

def run_extraction(lines: List[str]) -> pd.DataFrame:
    """Main extraction function with L1/L2/L3/L4 hierarchy - SK version."""
    entries = []
    start_idx = find_content_start(lines)
    print(f"Content starts at line {start_idx}")

    hierarchy = HierarchyTracker()

    # Regex patterns for hierarchy and code markers
    hierarchy_patterns = [
        (r'«L1:(.+?)»', hierarchy.set_l1),
        (r'«L2:(.+?)»', hierarchy.set_l2),
        (r'«L3:(.+?)»', hierarchy.set_l3),
        (r'«L4:(.+?)»', hierarchy.set_l4),
    ]
    
    # SK code pattern: digits followed by a letter (e.g., 70A, 153A, 5B)
    # Optional ~ prefix for provisional, optional * suffix for asterisked
    code_pattern = re.compile(r'«CODE:(~)?(\d{1,4}[A-Z])(\*)?»')
    block_end_pattern = re.compile(r'«CODE:|«L1:|«L2:|«L3:|«L4:')

    i = start_idx
    while i < len(lines):
        line = lines[i]

        # Check for hierarchy markers
        hierarchy_matched = False
        for pattern, setter in hierarchy_patterns:
            match = re.search(pattern, line)
            if match:
                readable = extract_hierarchy_text(lines, i, match.group(1))
                setter(readable)
                hierarchy_matched = True
                break

        if hierarchy_matched:
            i += 1
            continue

        # Check for CODE marker
        code_match = code_pattern.search(line)
        if code_match:
            is_provisional_in_tag = code_match.group(1) is not None
            code = code_match.group(2)  # e.g., "70A", "153A"
            is_asterisked_in_tag = code_match.group(3) is not None
            block_start = i

            # Collect block lines until next marker
            block_lines = [line]
            j = i + 1
            while j < len(lines):
                next_line = lines[j]
                if block_end_pattern.search(next_line):
                    break
                block_lines.append(next_line)
                j += 1

            block = '\n'.join(block_lines)
            fee_info = extract_fee_from_block(block)
            is_xref, xref_to = check_cross_reference(block)

            # Only include entries with fee, by-report, or cross-reference
            if fee_info['fee_specialist'] is not None or fee_info['is_by_report'] or is_xref:
                is_provisional = is_provisional_in_tag or bool(re.search(r'[@#]', block))
                is_asterisked = is_asterisked_in_tag or bool(re.search(r'\*', block))

                l1, l2, l3, l4 = hierarchy.get_current()
                section_code = get_section_code(l1)
                specialty_code, specialty_name = get_specialty_info(l1)
                description = extract_description(block, code)

                display = code
                if is_provisional:
                    display = '~' + display
                if is_asterisked:
                    display = display + '*'

                entries.append({
                    'tariff_code': code,
                    'tariff_code_display': display,
                    'parent_code': None,
                    'section_code': section_code,
                    'section_name': l1,
                    'specialty_code': specialty_code,
                    'specialty_name': specialty_name,
                    'category': l2,
                    'subcategory': l3,
                    'subsubcategory': l4,
                    'description': description,
                    'notes': extract_notes(block),
                    'fee_specialist': fee_info['fee_specialist'],
                    'fee_gp': fee_info['fee_gp'],
                    'is_add_on': is_add_on_fee(block, description),
                    'is_provisional': is_provisional,
                    'is_asterisked': is_asterisked,
                    'is_by_report': fee_info['is_by_report'],
                    'is_cross_reference': is_xref,
                    'cross_reference_to': xref_to,
                    'applicable_rules': extract_rules(block),
                    'time_requirement_minutes': extract_time_requirement(block),
                    'source_line': block_start + 1,
                })

            i = j
            continue

        i += 1

    return pd.DataFrame(entries)


print("✓ Main extraction function defined")

In [None]:
# =============================================================================
# RUN PHASE 1 EXTRACTION
# =============================================================================

print("=" * 60)
print("PHASE 1: EXTRACTION")
print("=" * 60)

df = run_extraction(LINES)

print(f"\n✓ Extracted {len(df):,} entries")
print(f"  Unique codes: {df['tariff_code'].nunique():,}")
print(f"  Has specialist fee: {df['fee_specialist'].notna().sum():,} ({100*df['fee_specialist'].notna().sum()/len(df):.1f}%)")
print(f"  By Report: {df['is_by_report'].sum():,}")
print(f"  Add-on fees: {df['is_add_on'].sum():,}")
print(f"\nHierarchy coverage:")
print(f"  Has L1 (section_name): {(df['section_name'] != '').sum():,}")
print(f"  Has L2 (category): {(df['category'] != '').sum():,}")
print(f"  Has L3 (subcategory): {(df['subcategory'] != '').sum():,}")
print(f"  Has L4 (subsubcategory): {(df['subsubcategory'] != '').sum():,}")

In [None]:
# Show hierarchy examples
print("Sample hierarchy:")
sample = df.head(20)
for idx, row in sample.iterrows():
    print(f"\n{row['tariff_code']}:")
    print(f"  L1: {row['section_name'][:50] if row['section_name'] else '(none)'}")
    print(f"  L2: {row['category'][:40] if row['category'] else '(none)'}")
    print(f"  L3: {row['subcategory'][:40] if row['subcategory'] else '(none)'}")
    print(f"  Desc: {row['description'][:40] if row['description'] else '(none)'}")
    print(f"  Fee: ${row['fee_specialist']} / ${row['fee_gp']}")

In [None]:
# Validate known SK codes
print("\nKnown code validation (SK):")
KNOWN_CODES = [
    ('70A', 30.00),
    ('71A', 55.00),
    ('74A', 140.00),
    ('153A', 30.40),
]

for code, expected_fee in KNOWN_CODES:
    matches = df[df['tariff_code'] == code]
    if len(matches) > 0:
        actual = matches.iloc[0]['fee_specialist']
        status = '✓' if actual and abs(actual - expected_fee) < 0.01 else f'✗ got {actual}'
        print(f"  {code}: {expected_fee} -> {status}")
    else:
        print(f"  {code}: NOT FOUND")

In [None]:
# Apply title case cleaning to text columns
TEXT_COLUMNS = ['section_name', 'category', 'subcategory', 'subsubcategory', 'description']

for col in TEXT_COLUMNS:
    if col in df.columns:
        df[col] = df[col].apply(to_title_case)

print("✓ Data cleaning applied (title case)")
print(f"\nSample cleaned data:")
sample = df[['tariff_code', 'section_name', 'category', 'subcategory', 'description']].head(10)
for idx, row in sample.iterrows():
    category_display = row['category'][:30] if row['category'] else '(none)'
    print(f"  {row['tariff_code']}: {category_display} > {row['description'][:30] if row['description'] else ''}")

In [None]:
# =============================================================================
# EXPORT UTILITIES
# =============================================================================

def prefix_for_excel(value, prefix="'"):
    """Prefix a value to prevent Excel auto-formatting."""
    if pd.isna(value) or value == '' or value is None:
        return value
    return f"{prefix}{value}"


def clean_tariff_code(x):
    """Clean and format a single tariff code (SK format: alphanumeric)."""
    if pd.isna(x) or x == '' or x is None:
        return None
    return str(x).strip().upper()


def clean_tariff_code_list(x):
    """Clean and format a comma-separated list of tariff codes."""
    if pd.isna(x) or x == '' or x is None:
        return None
    codes = [c.strip().upper() for c in str(x).split(',')]
    return ', '.join(codes)


print("✓ Export utilities defined")

In [None]:
# Export Phase 1 output
PHASE1_FILE = 'sk_tariffs_phase1.csv'

df_phase1 = df.copy()
df_phase1['tariff_code'] = df_phase1['tariff_code'].apply(lambda x: prefix_for_excel(x))
df_phase1['specialty_code'] = df_phase1['specialty_code'].apply(
    lambda x: prefix_for_excel(x) if x != '' else x
)

df_phase1.to_csv(PHASE1_FILE, index=False, encoding='utf-8')
print(f"✓ Phase 1 export saved: {PHASE1_FILE}")
print(f"  {len(df)} rows, {len(df.columns)} columns")
files.download(PHASE1_FILE)

---
## Phase 2: GPT Full Review

GPT reviews ALL entries for:
- Description completion (if L1+L2+L3+L4+desc insufficient)
- Parent code relationships
- Add-on fee detection (is_add_on, add_on_to)
- Age restrictions
- Setting restrictions
- Exclusions

---

In [None]:
# Setup OpenAI (secure API key input - key will NOT be displayed)
try:
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    print("✓ API key loaded from Colab secrets")
except:
    OPENAI_API_KEY = getpass.getpass("Enter OpenAI API key (input hidden): ")
    print("✓ API key entered")

client = OpenAI(api_key=OPENAI_API_KEY)
print("✓ OpenAI client ready")

In [None]:
# Phase 2 cost and time estimates
total_entries = len(df)
estimated_cost = total_entries * COST_PER_ENTRY_ESTIMATE
estimated_minutes = total_entries * TIME_PER_ENTRY_SECONDS / 60

print(f"Total entries to review: {total_entries}")
print(f"Estimated cost: ~${estimated_cost:.2f}")
print(f"Estimated time: ~{estimated_minutes:.0f} minutes")

In [None]:
# =============================================================================
# GPT REVIEW SYSTEM PROMPT - MODIFIED FOR SASKATCHEWAN
# =============================================================================

SYSTEM_PROMPT = '''You are an expert medical billing coder reviewing Saskatchewan's Payment Schedule for Insured Services.

IMPORTANT: Saskatchewan uses ALPHANUMERIC codes (e.g., 70A, 153A, 5B) not 4-digit codes.

For EACH code, extract the following:

═══════════════════════════════════════════════════════════════
1. DESCRIPTION COMPLETION
═══════════════════════════════════════════════════════════════
L1, L2, L3, L4 are ALREADY stored in separate columns.
Do NOT repeat them in the description.

If a description seems incomplete:
- Set parent_code to link it to its parent
- KEEP the description SHORT as-is
- Users can follow the parent_code chain to get full meaning

═══════════════════════════════════════════════════════════════
2. PARENT_CODE
═══════════════════════════════════════════════════════════════

A child code is a code whose description is incomplete on its own.
Look at INDENTATION in the source context to identify relationships.

SET parent_code TO THE IMMEDIATE PARENT:
    70A     Telephone call from SGI...              → parent_code: null
    71A     Written letter requested by SGI...      → parent_code: null
    74A     Examination and Report for SGI...       → parent_code: null
    
For procedure variants:
    100A    Collection of blood from donor          → parent_code: null  
    101A    Phlebotomy for therapeutic reason       → parent_code: null (different procedure)
    
For tiered codes:
    80A     Third Party Counselling - first 15 min  → parent_code: null
    81A     Third Party Counselling - next 15 min   → parent_code: "80A"

The parent_code must be an alphanumeric code VISIBLE in the nearby context.

═══════════════════════════════════════════════════════════════
3. ADD-ON DETECTION
═══════════════════════════════════════════════════════════════
is_add_on = true ONLY when THIS CODE is explicitly a supplement
that gets billed ON TOP OF another service.

DEFINITE ADD-ONS (is_add_on: true):
- Description contains "add to" or "supplement"
- Notes say THIS code "may be claimed in addition to" other codes

NOT ADD-ONS (is_add_on: false):
- Regular visit/examination codes
- "Each additional X" patterns - these are VARIANTS, use parent_code

═══════════════════════════════════════════════════════════════
4. AGE RESTRICTION (only if EXPLICITLY stated)
═══════════════════════════════════════════════════════════════
Look for: "under 18", "child", "pediatric", "over 65", "elderly", "newborn"
Return the exact restriction text. null if not mentioned.

═══════════════════════════════════════════════════════════════
5. SETTING RESTRICTION (only if EXPLICITLY stated)
═══════════════════════════════════════════════════════════════
Look for: "hospital only", "office", "home", "ICU", "Emergency"
Return the exact restriction text. null if not mentioned.

═══════════════════════════════════════════════════════════════
6. EXCLUSIONS (only if EXPLICITLY stated)
═══════════════════════════════════════════════════════════════
Look for: "cannot be claimed with", "may not be claimed with", "excludes"
Return the exact exclusion text. null if not mentioned.

═══════════════════════════════════════════════════════════════
SK-SPECIFIC NOTES
═══════════════════════════════════════════════════════════════
- Codes marked with @ or # require entitlement/approval → is_provisional=true
- Codes with * after fee → is_asterisked=true (age supplement applies)
- Fee format: $XX.XX $XX.XX (Specialist, GP)

Return JSON:
{
  "needs_completion": true/false,
  "description": "...",
  "parent_code": "XXA" or null,
  "is_add_on": true/false,
  "add_on_to": "XXA, YYB" or null,
  "age_restriction": "..." or null,
  "setting_restriction": "..." or null,
  "exclusions": "..." or null
}
'''

print("✓ System prompt defined")

In [None]:
# =============================================================================
# GPT EVALUATION FUNCTIONS - MODIFIED FOR SK CODE FORMAT
# =============================================================================

def get_source_context(source_line: int, lines: List[str], before: int = 30, after: int = 5) -> str:
    """Get surrounding context from source file for GPT review."""
    if pd.isna(source_line) or source_line < 1:
        return "(no source context available)"
    source_line = int(source_line)
    start = max(0, source_line - before - 1)
    end = min(len(lines), source_line + after)
    context = '\n'.join(lines[start:end])
    context = context.replace('\f', '\n').strip()
    return context[:MAX_CONTEXT_LENGTH]


def build_user_message(row: pd.Series, context: str) -> str:
    """Build the user message for GPT review."""
    return f"""Code: {row['tariff_code']}

HIERARCHY:
  L1 (section): {row['section_name']}
  L2 (category): {row['category'] if row['category'] else '(none)'}
  L3 (subcategory): {row['subcategory'] if row['subcategory'] else '(none)'}
  L4 (subsubcategory): {row['subsubcategory'] if row['subsubcategory'] else '(none)'}

Description: "{row['description']}"
Notes: "{row['notes'] if row['notes'] else '(none)'}"

Source context:
```
{context}
```"""


def parse_gpt_response(result: Dict, original_description: str) -> Dict:
    """Parse and clean GPT response - SK version."""
    # Clean description
    desc = result.get('description', original_description)
    desc = clean_text(desc)
    if len(desc) > MAX_DESCRIPTION_LENGTH:
        desc = desc[:MAX_DESCRIPTION_LENGTH - 3] + '...'

    # Clean parent_code (SK format: alphanumeric like 70A)
    parent = result.get('parent_code')
    if parent:
        parent = str(parent).strip().upper()
        if not re.match(r'^\d{1,4}[A-Z]$', parent):
            parent = None

    # Clean add_on_to
    add_on_to = result.get('add_on_to')
    if add_on_to:
        add_on_to = str(add_on_to).strip().upper()
        codes = [c.strip() for c in add_on_to.split(',')]
        codes = [c for c in codes if re.match(r'^\d{1,4}[A-Z]$', c)]
        add_on_to = ', '.join(codes) if codes else None

    return {
        'needs_completion': result.get('needs_completion', False),
        'description': desc,
        'parent_code': parent,
        'is_add_on': result.get('is_add_on', False),
        'add_on_to': add_on_to,
        'age_restriction': result.get('age_restriction'),
        'setting_restriction': result.get('setting_restriction'),
        'exclusions': result.get('exclusions')
    }


def get_empty_result(original_description: str) -> Dict:
    """Return empty result for error cases."""
    return {
        'needs_completion': False,
        'description': original_description,
        'parent_code': None,
        'is_add_on': False,
        'add_on_to': None,
        'age_restriction': None,
        'setting_restriction': None,
        'exclusions': None
    }


def evaluate_entry(row: pd.Series, lines: List[str]) -> Dict:
    """Send entry to GPT for full evaluation."""
    context = get_source_context(row['source_line'], lines)
    user_msg = build_user_message(row, context)

    try:
        response = client.chat.completions.create(
            model=GPT_MODEL,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": user_msg}
            ],
            temperature=GPT_TEMPERATURE,
            response_format={"type": "json_object"}
        )
        result = json.loads(response.choices[0].message.content)
        return parse_gpt_response(result, row['description'])
    except Exception as e:
        print(f"  Error on {row['tariff_code']}: {e}")
        return get_empty_result(row['description'])


print("✓ GPT evaluation functions defined")

In [None]:
# =============================================================================
# CHECKPOINTING FUNCTIONS
# =============================================================================

def save_checkpoint(results: Dict, stats: Dict, last_processed_idx: int, checkpoint_file: str = CHECKPOINT_FILE):
    """Save current progress to a checkpoint file."""
    checkpoint_data = {
        'results': {str(k): v for k, v in results.items()},
        'stats': stats,
        'last_processed_idx': last_processed_idx,
        'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
    }
    with open(checkpoint_file, 'w', encoding='utf-8') as f:
        json.dump(checkpoint_data, f, indent=2)
    print(f"  [Checkpoint saved: {last_processed_idx + 1} entries]")


def load_checkpoint(checkpoint_file: str = CHECKPOINT_FILE) -> Tuple[Dict, Dict, int]:
    """Load progress from a checkpoint file if it exists."""
    if not os.path.exists(checkpoint_file):
        return {}, {'completed': 0, 'parents': 0, 'add_ons': 0, 'age_restricted': 0, 'setting_restricted': 0, 'has_exclusions': 0}, -1

    try:
        with open(checkpoint_file, 'r', encoding='utf-8') as f:
            checkpoint_data = json.load(f)

        results = {int(k): v for k, v in checkpoint_data['results'].items()}
        stats = checkpoint_data['stats']
        last_processed_idx = checkpoint_data['last_processed_idx']

        print(f"✓ Loaded checkpoint from {checkpoint_data['timestamp']}")
        print(f"  Resuming from entry {last_processed_idx + 1}")
        print(f"  Stats so far: completions={stats['completed']}, parents={stats['parents']}, add-ons={stats['add_ons']}")

        return results, stats, last_processed_idx
    except Exception as e:
        print(f"Warning: Could not load checkpoint ({e}). Starting fresh.")
        return {}, {'completed': 0, 'parents': 0, 'add_ons': 0, 'age_restricted': 0, 'setting_restricted': 0, 'has_exclusions': 0}, -1


def clear_checkpoint(checkpoint_file: str = CHECKPOINT_FILE):
    """Remove checkpoint file after successful completion."""
    if os.path.exists(checkpoint_file):
        os.remove(checkpoint_file)
        print(f"✓ Checkpoint file removed")


print("✓ Checkpointing functions defined")

In [None]:
# =============================================================================
# TEST CONFIGURATION
# =============================================================================

TEST_MODE = False  # Set to False for full run
TEST_LIMIT = 50

if TEST_MODE:
    process_df = df.head(TEST_LIMIT).copy()
    print(f"✓ TEST MODE: First {len(process_df)} entries")
else:
    process_df = df.copy()
    print(f"FULL RUN: Processing all {len(process_df)} entries")

print(f"\nEstimated time: ~{len(process_df) * TIME_PER_ENTRY_SECONDS / 60:.1f} minutes")
print(f"Estimated cost: ~${len(process_df) * COST_PER_ENTRY_ESTIMATE:.2f}")

In [None]:
# =============================================================================
# RUN PHASE 2 GPT REVIEW (with checkpointing)
# =============================================================================

print("\n" + "=" * 60)
print("PHASE 2: GPT FULL REVIEW")
print("=" * 60 + "\n")

results, stats, last_processed_idx = load_checkpoint()

entries_to_process = list(enumerate(process_df.iterrows()))
start_position = last_processed_idx + 1

if start_position > 0:
    print(f"Skipping {start_position} already-processed entries...")

for i, (idx, row) in entries_to_process[start_position:]:
    if (i + 1) % PROGRESS_REPORT_INTERVAL == 0:
        print(f"  {i+1}/{len(process_df)} - completions:{stats['completed']}, parents:{stats['parents']}, add-ons:{stats['add_ons']}")

    result = evaluate_entry(row, LINES)
    results[idx] = result

    if result['needs_completion']:
        stats['completed'] += 1
    if result['parent_code']:
        stats['parents'] += 1
    if result['is_add_on']:
        stats['add_ons'] += 1
    if result['age_restriction']:
        stats['age_restricted'] += 1
    if result['setting_restriction']:
        stats['setting_restricted'] += 1
    if result['exclusions']:
        stats['has_exclusions'] += 1

    if (i + 1) % CHECKPOINT_INTERVAL == 0:
        save_checkpoint(results, stats, i)

    time.sleep(API_CALL_DELAY_SECONDS)

save_checkpoint(results, stats, len(process_df) - 1)

print(f"\n✓ Phase 2 complete!")
print(f"  Descriptions completed: {stats['completed']}")
print(f"  Parent codes: {stats['parents']}")
print(f"  Add-on fees: {stats['add_ons']}")
print(f"  Age restricted: {stats['age_restricted']}")
print(f"  Setting restricted: {stats['setting_restricted']}")
print(f"  Has exclusions: {stats['has_exclusions']}")

In [None]:
# =============================================================================
# APPLY GPT RESULTS TO DATAFRAME
# =============================================================================

df['_desc_original'] = df['description']
df['add_on_to'] = None
df['age_restriction'] = None
df['setting_restriction'] = None
df['exclusions'] = None

for idx, result in results.items():
    if result['needs_completion']:
        df.loc[idx, 'description'] = result['description']
    if result['parent_code']:
        df.loc[idx, 'parent_code'] = result['parent_code']
    df.loc[idx, 'is_add_on'] = result['is_add_on']
    if result['add_on_to']:
        df.loc[idx, 'add_on_to'] = result['add_on_to']
    if result['age_restriction']:
        df.loc[idx, 'age_restriction'] = result['age_restriction']
    if result['setting_restriction']:
        df.loc[idx, 'setting_restriction'] = result['setting_restriction']
    if result['exclusions']:
        df.loc[idx, 'exclusions'] = result['exclusions']

print(f"Applied {stats['completed']} description updates")
print(f"Applied {stats['parents']} parent codes")
print(f"Applied {stats['add_ons']} add-on flags")

In [None]:
# =============================================================================
# FINAL CLEANUP AND COLUMN ORDERING
# =============================================================================

drop_cols = ['_desc_original']
df_final = df.drop(columns=[c for c in drop_cols if c in df.columns], errors='ignore')

df_final = df_final[[c for c in OUTPUT_COLUMNS if c in df_final.columns]]

print(f"Final dataset: {len(df_final)} rows x {len(df_final.columns)} columns")

In [None]:
# =============================================================================
# FINAL SUMMARY
# =============================================================================

print("\n" + "=" * 60)
print("FINAL SUMMARY - SASKATCHEWAN")
print("=" * 60)

print(f"\nTotal entries: {len(df_final):,}")
print(f"Unique codes: {df_final['tariff_code'].nunique():,}")

print(f"\nFee coverage:")
print(f"  Has specialist fee: {df_final['fee_specialist'].notna().sum():,} ({100*df_final['fee_specialist'].notna().sum()/len(df_final):.1f}%)")
print(f"  By Report: {df_final['is_by_report'].sum():,}")

print(f"\nHierarchy:")
print(f"  Has category (L2): {(df_final['category'] != '').sum():,}")
print(f"  Has subcategory (L3): {(df_final['subcategory'] != '').sum():,}")
print(f"  Has subsubcategory (L4): {(df_final['subsubcategory'] != '').sum():,}")

print(f"\nGPT Review:")
print(f"  Entries reviewed: {len(results)}")
print(f"  Descriptions completed: {stats['completed']}")
print(f"  Parent codes: {df_final['parent_code'].notna().sum():,}")
print(f"  Add-on fees: {df_final['is_add_on'].sum():,}")
print(f"  Age restrictions: {df_final['age_restriction'].notna().sum():,}")
print(f"  Setting restrictions: {df_final['setting_restriction'].notna().sum():,}")
print(f"  Exclusions: {df_final['exclusions'].notna().sum():,}")

In [None]:
# =============================================================================
# EXPORT FINAL OUTPUT
# =============================================================================

OUTPUT_FILE = 'sk_tariffs_enriched.csv'

df_export = df_final.copy()

df_export['parent_code'] = df_export['parent_code'].apply(clean_tariff_code)
df_export['add_on_to'] = df_export['add_on_to'].apply(clean_tariff_code_list)

df_export['tariff_code'] = df_export['tariff_code'].apply(prefix_for_excel)
df_export['specialty_code'] = df_export['specialty_code'].apply(
    lambda x: prefix_for_excel(x) if pd.notna(x) and x != '' else x
)

df_export.to_csv(OUTPUT_FILE, index=False, encoding='utf-8')
print(f"✓ Saved {OUTPUT_FILE}")

clear_checkpoint()

files.download(OUTPUT_FILE)