# Provincial Billing Code Crosswalk Pipeline

**Purpose:** Programmatically match Alberta physician billing codes to Ontario equivalents using:
1. Semantic embedding similarity (sentence-transformers)
2. Fee ratio validation
3. Category/service type alignment
4. Multi-signal confidence scoring

**Author:** HelpSeeker Technologies  
**Date:** 2025-01

## 1. Setup and Installation

In [14]:
from google.colab import files

print("Upload 1/3: Alberta Excel file")
f1 = files.upload()

print("\nUpload 2/3: Ontario fee file (.001)")
f2 = files.upload()

print("\nUpload 3/3: Ontario PDF")
f3 = files.upload()

print("\n✓ All files uploaded!")
!ls -la *.xlsx *.pdf *001*

Upload 1/3: Alberta Excel file


Saving merged_output_AB (2).xlsx to merged_output_AB (2).xlsx

Upload 2/3: Ontario fee file (.001)


Saving OCTSOB2025.001 to OCTSOB2025.001

Upload 3/3: Ontario PDF


Saving moh-schedule-benefit-2024-03-04.pdf to moh-schedule-benefit-2024-03-04.pdf

✓ All files uploaded!
-rw-r--r-- 1 root root  190769 Jan 19 22:33 'merged_output_AB (2).xlsx'
-rw-r--r-- 1 root root  190769 Jan 19 22:31  merged_output_AB.xlsx
-rw-r--r-- 1 root root 8426591 Jan 19 22:33  moh-schedule-benefit-2024-03-04.pdf
-rw-r--r-- 1 root root  568029 Jan 19 22:33  OCTSOB2025.001


In [15]:
# Install required packages (run once)
!pip install sentence-transformers pandas openpyxl scikit-learn pdfplumber -q

In [2]:
import pandas as pd
import numpy as np
import pdfplumber
import re
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')

print("✓ All packages loaded successfully")



✓ All packages loaded successfully


## 2. Configuration

In [27]:
@dataclass
class Config:
    """Configuration for the crosswalk pipeline"""

    # File paths - UPDATE THESE FOR YOUR FILES
    alberta_file: str = "merged_output_AB__2_.xlsx"
    ontario_fee_file: str = "OCTSOB2025.001 2"  # Fixed-width fee data
    ontario_pdf_file: str = "moh-schedule-benefit-2024-03-04.pdf"  # For descriptions

    # Embedding model - medical domain works best
    embedding_model: str = "all-MiniLM-L6-v2"  # Fast, good quality
    # Alternative: "pritamdeka/S-PubMedBert-MS-MARCO" for medical domain

    # Matching thresholds
    min_similarity_threshold: float = 0.4  # Minimum cosine similarity to consider
    high_confidence_threshold: float = 0.75  # Above this = high confidence match

    # Fee validation
    fee_ratio_tolerance: float = 0.5  # Allow 50% fee variance before flagging

    # Output
    output_file: str = "AB_ON_Crosswalk_Validated.xlsx"
    top_k_matches: int = 5  # Number of candidate matches to return per code

config = Config()
print(f"Configuration loaded:\n{config}")

Configuration loaded:
Config(alberta_file='merged_output_AB__2_.xlsx', ontario_fee_file='OCTSOB2025.001 2', ontario_pdf_file='moh-schedule-benefit-2024-03-04.pdf', embedding_model='all-MiniLM-L6-v2', min_similarity_threshold=0.4, high_confidence_threshold=0.75, fee_ratio_tolerance=0.5, output_file='AB_ON_Crosswalk_Validated.xlsx', top_k_matches=5)


## 3. Data Loading Functions

In [29]:
def load_alberta_codes(filepath: str) -> pd.DataFrame:
    """
    Load Alberta billing codes from Excel file.
    Returns deduplicated codes with descriptions and fees.
    """
    df = pd.read_excel(filepath)

    # Get unique codes with their core attributes
    codes = df[['BILLING_CODE', 'DESCRIPTION', 'BASE_RATE', 'CATEGORY']].drop_duplicates(
        subset=['BILLING_CODE']
    ).reset_index(drop=True)

    codes.columns = ['code', 'description', 'fee', 'category']
    codes['province'] = 'AB'

    # Clean descriptions
    codes['description_clean'] = codes['description'].str.lower().str.strip()

    print(f"✓ Loaded {len(codes)} Alberta codes")
    return codes


def parse_ontario_fee_file(filepath: str) -> pd.DataFrame:
    """
    Parse Ontario fixed-width fee schedule file.

    File structure (75 chars per line):
    - Chars 0-4: Fee code (4 chars)
    - Chars 4-12: Start date YYYYMMDD
    - Chars 12-20: End date YYYYMMDD (99999999 = active)
    - Chars 20-30: Primary fee (in cents, divide by 100)
    - Chars 30-40: Secondary field (H component for diagnostics)
    - Chars 40-50: Tertiary field
    - etc.
    """
    records = []

    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            if len(line) >= 30:
                code = line[0:4]
                start_date = line[4:12]
                end_date = line[12:20]

                # Primary fee field (divide by 1000 based on analysis)
                try:
                    fee_raw = int(line[20:30])
                    fee = fee_raw / 1000  # Convert to dollars
                except:
                    fee = 0

                # Technical component (H) for diagnostic codes
                try:
                    h_component = int(line[30:40]) / 1000 if len(line) >= 40 else 0
                except:
                    h_component = 0

                records.append({
                    'code': code,
                    'start_date': start_date,
                    'end_date': end_date,
                    'fee': fee,
                    'fee_h': h_component,
                    'is_active': end_date == '99999999'
                })

    df = pd.DataFrame(records)

    # Filter to active codes only
    active = df[df['is_active']].copy()

    print(f"✓ Parsed {len(active)} active Ontario codes from fee file")
    return active


def extract_ontario_descriptions_from_pdf(filepath: str,
                                          page_ranges: List[Tuple[int, int]] = None) -> Dict[str, str]:
    """
    Extract code descriptions from Ontario Schedule of Benefits PDF.

    Args:
        filepath: Path to PDF
        page_ranges: List of (start, end) page tuples to scan. If None, scans key sections.

    Returns:
        Dict mapping code -> description
    """
    if page_ranges is None:
        # Default ranges covering main sections
        page_ranges = [
            (125, 220),   # Consultations and Visits (A-codes)
            (300, 400),   # Diagnostic Radiology (X-codes)
            (380, 420),   # Diagnostic Ultrasound (J-codes)
            (80, 100),    # Special Visit Premiums
        ]

    code_descriptions = {}

    # Pattern to match fee codes with descriptions
    # Matches: CODE description....... fee
    pattern = re.compile(r'([A-Z]\d{3})\s+([A-Za-z][^\d\.]{5,80})\.{2,}')

    with pdfplumber.open(filepath) as pdf:
        total_pages = len(pdf.pages)

        for start, end in page_ranges:
            end = min(end, total_pages)

            for i in range(start, end):
                try:
                    text = pdf.pages[i].extract_text()
                    if text:
                        matches = pattern.findall(text)
                        for code, desc in matches:
                            desc = desc.strip()
                            desc = re.sub(r'\s+', ' ', desc)  # Normalize whitespace
                            if code not in code_descriptions:
                                code_descriptions[code] = desc
                except Exception as e:
                    continue

    print(f"✓ Extracted {len(code_descriptions)} Ontario code descriptions from PDF")
    return code_descriptions

In [30]:
def build_ontario_reference(fee_file: str, pdf_file: str) -> pd.DataFrame:
    """
    Build complete Ontario reference table by joining fee data with PDF descriptions.
    """
    # Load fee data
    fees_df = parse_ontario_fee_file(fee_file)

    # Extract descriptions from PDF
    descriptions = extract_ontario_descriptions_from_pdf(pdf_file)

    # Join
    fees_df['description'] = fees_df['code'].map(descriptions)
    fees_df['province'] = 'ON'

    # Infer category from code prefix
    def infer_category(code):
        prefix = code[0]
        category_map = {
            'A': 'Consultation/Assessment',
            'C': 'Hospital Services',
            'K': 'Assessment/Premium',
            'J': 'Diagnostic Procedures',
            'X': 'Diagnostic Radiology',
            'G': 'Diagnostic/Therapeutic',
            'Q': 'Premium/Special',
            'B': 'Premium/Home Visit',
            'W': 'Long-term Care',
            'E': 'Add-on/Premium',
        }
        return category_map.get(prefix, 'Other')

    fees_df['category'] = fees_df['code'].apply(infer_category)

    # Clean descriptions
    fees_df['description_clean'] = fees_df['description'].fillna('').str.lower().str.strip()

    # Filter to codes with descriptions for matching
    with_desc = fees_df[fees_df['description'].notna()].copy()

    print(f"✓ Built Ontario reference: {len(with_desc)} codes with descriptions")
    return with_desc

## 4. Embedding-Based Matching

In [19]:
class SemanticMatcher:
    """
    Semantic matching using sentence embeddings.
    """

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        print(f"Loading embedding model: {model_name}...")
        self.model = SentenceTransformer(model_name)
        print("✓ Model loaded")

        self.reference_embeddings = None
        self.reference_df = None

    def index_reference(self, reference_df: pd.DataFrame, text_column: str = 'description_clean'):
        """
        Pre-compute embeddings for reference (Ontario) codes.
        """
        self.reference_df = reference_df.copy()

        # Get descriptions
        descriptions = reference_df[text_column].fillna('').tolist()

        # Compute embeddings
        print(f"Computing embeddings for {len(descriptions)} reference codes...")
        self.reference_embeddings = self.model.encode(descriptions, show_progress_bar=True)
        print("✓ Reference embeddings computed")

    def find_matches(self,
                     query_description: str,
                     top_k: int = 5,
                     min_similarity: float = 0.3) -> List[Dict]:
        """
        Find top-k matching Ontario codes for a given Alberta description.

        Returns list of dicts with code, description, similarity, fee.
        """
        if self.reference_embeddings is None:
            raise ValueError("Must call index_reference() first")

        # Embed query
        query_embedding = self.model.encode([query_description.lower()])

        # Compute similarities
        similarities = cosine_similarity(query_embedding, self.reference_embeddings)[0]

        # Get top-k indices
        top_indices = np.argsort(similarities)[::-1][:top_k]

        matches = []
        for idx in top_indices:
            sim = similarities[idx]
            if sim >= min_similarity:
                row = self.reference_df.iloc[idx]
                matches.append({
                    'on_code': row['code'],
                    'on_description': row['description'],
                    'on_fee': row['fee'],
                    'on_category': row['category'],
                    'similarity': float(sim)
                })

        return matches

## 5. Multi-Signal Validation

In [20]:
def compute_fee_similarity(ab_fee: float, on_fee: float) -> float:
    """
    Compute fee similarity score (0-1).
    Returns 1.0 if fees are identical, decreasing as ratio diverges.
    """
    if ab_fee == 0 or on_fee == 0:
        return 0.5  # Neutral if either is zero (premium codes, etc.)

    ratio = min(ab_fee, on_fee) / max(ab_fee, on_fee)
    return ratio


def compute_category_match(ab_category: str, on_category: str) -> float:
    """
    Score category alignment (0-1).
    """
    # Mapping Alberta categories to Ontario
    ab_to_on_map = {
        'V': ['Consultation/Assessment', 'Hospital Services', 'Premium/Special'],
        'T': ['Diagnostic Radiology', 'Diagnostic Procedures', 'Diagnostic/Therapeutic'],
    }

    expected = ab_to_on_map.get(ab_category, [])

    if on_category in expected:
        return 1.0
    elif any(word in on_category.lower() for word in ['consult', 'assess', 'visit']):
        return 0.7
    else:
        return 0.3


def compute_confidence_score(semantic_sim: float,
                              fee_sim: float,
                              category_match: float,
                              weights: Tuple[float, float, float] = (0.6, 0.25, 0.15)) -> float:
    """
    Compute weighted confidence score from multiple signals.

    Default weights prioritize semantic similarity.
    """
    w_sem, w_fee, w_cat = weights

    score = (w_sem * semantic_sim +
             w_fee * fee_sim +
             w_cat * category_match)

    return round(score, 4)


def classify_confidence(score: float) -> str:
    """
    Classify confidence score into High/Medium/Low.
    """
    if score >= 0.75:
        return 'High'
    elif score >= 0.55:
        return 'Medium'
    else:
        return 'Low'

## 6. Main Crosswalk Pipeline

In [21]:
def run_crosswalk_pipeline(config: Config) -> pd.DataFrame:
    """
    Main pipeline to generate validated crosswalk.

    Returns DataFrame with Alberta codes matched to Ontario candidates.
    """
    print("="*60)
    print("BILLING CODE CROSSWALK PIPELINE")
    print("="*60)

    # 1. Load data
    print("\n[1/4] Loading data...")
    ab_codes = load_alberta_codes(config.alberta_file)
    on_reference = build_ontario_reference(config.ontario_fee_file, config.ontario_pdf_file)

    # 2. Initialize matcher
    print("\n[2/4] Initializing semantic matcher...")
    matcher = SemanticMatcher(config.embedding_model)
    matcher.index_reference(on_reference)

    # 3. Find matches for each Alberta code
    print("\n[3/4] Finding matches...")
    results = []

    for _, ab_row in ab_codes.iterrows():
        ab_code = ab_row['code']
        ab_desc = ab_row['description']
        ab_fee = ab_row['fee']
        ab_cat = ab_row['category']

        # Get candidate matches
        matches = matcher.find_matches(
            ab_desc,
            top_k=config.top_k_matches,
            min_similarity=config.min_similarity_threshold
        )

        for rank, match in enumerate(matches, 1):
            # Compute validation signals
            fee_sim = compute_fee_similarity(ab_fee, match['on_fee'])
            cat_match = compute_category_match(ab_cat, match['on_category'])
            confidence = compute_confidence_score(
                match['similarity'], fee_sim, cat_match
            )

            results.append({
                'ab_code': ab_code,
                'ab_description': ab_desc,
                'ab_fee': ab_fee,
                'ab_category': ab_cat,
                'match_rank': rank,
                'on_code': match['on_code'],
                'on_description': match['on_description'],
                'on_fee': match['on_fee'],
                'on_category': match['on_category'],
                'semantic_similarity': round(match['similarity'], 4),
                'fee_similarity': round(fee_sim, 4),
                'category_match': round(cat_match, 4),
                'confidence_score': confidence,
                'confidence_level': classify_confidence(confidence),
                'fee_ratio': round(match['on_fee'] / ab_fee, 2) if ab_fee > 0 else None,
            })

    results_df = pd.DataFrame(results)

    # 4. Add QA flags
    print("\n[4/4] Adding QA flags...")

    def add_qa_flags(row):
        flags = []

        # Fee discrepancy flag
        if row['fee_ratio'] and (row['fee_ratio'] < 0.5 or row['fee_ratio'] > 2.0):
            flags.append(f"FEE_DISCREPANCY ({row['fee_ratio']}x)")

        # Low semantic similarity
        if row['semantic_similarity'] < 0.5:
            flags.append("LOW_SEMANTIC_MATCH")

        # Category mismatch
        if row['category_match'] < 0.5:
            flags.append("CATEGORY_MISMATCH")

        return '; '.join(flags) if flags else '✓'

    results_df['qa_flags'] = results_df.apply(add_qa_flags, axis=1)

    print(f"\n✓ Pipeline complete: {len(results_df)} match candidates generated")
    print(f"  - High confidence: {len(results_df[results_df['confidence_level']=='High'])}")
    print(f"  - Medium confidence: {len(results_df[results_df['confidence_level']=='Medium'])}")
    print(f"  - Low confidence: {len(results_df[results_df['confidence_level']=='Low'])}")

    return results_df

## 7. Export Functions

In [22]:
def export_crosswalk(results_df: pd.DataFrame, output_file: str):
    """
    Export results to formatted Excel file.
    """
    from openpyxl import Workbook
    from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
    from openpyxl.utils.dataframe import dataframe_to_rows

    # Create workbook with multiple sheets
    wb = Workbook()

    # Sheet 1: Best matches only (rank 1)
    ws1 = wb.active
    ws1.title = "Best Matches"

    best_matches = results_df[results_df['match_rank'] == 1].copy()

    # Headers
    headers = ['AB Code', 'AB Description', 'AB Fee', 'ON Code', 'ON Description',
               'ON Fee', 'Semantic Sim', 'Fee Ratio', 'Confidence', 'QA Flags']

    for col, header in enumerate(headers, 1):
        cell = ws1.cell(row=1, column=col, value=header)
        cell.font = Font(bold=True)
        cell.fill = PatternFill(start_color="0B1F33", fill_type="solid")
        cell.font = Font(bold=True, color="FFFFFF")

    # Data
    for row_idx, row in enumerate(best_matches.itertuples(), 2):
        ws1.cell(row=row_idx, column=1, value=row.ab_code)
        ws1.cell(row=row_idx, column=2, value=row.ab_description)
        ws1.cell(row=row_idx, column=3, value=row.ab_fee)
        ws1.cell(row=row_idx, column=4, value=row.on_code)
        ws1.cell(row=row_idx, column=5, value=row.on_description)
        ws1.cell(row=row_idx, column=6, value=row.on_fee)
        ws1.cell(row=row_idx, column=7, value=row.semantic_similarity)
        ws1.cell(row=row_idx, column=8, value=row.fee_ratio)
        ws1.cell(row=row_idx, column=9, value=row.confidence_level)
        ws1.cell(row=row_idx, column=10, value=row.qa_flags)

        # Color code confidence
        conf_cell = ws1.cell(row=row_idx, column=9)
        if row.confidence_level == 'High':
            conf_cell.font = Font(color="0FB9B1", bold=True)
        elif row.confidence_level == 'Medium':
            conf_cell.font = Font(color="D97706", bold=True)
        else:
            conf_cell.font = Font(color="DC2626", bold=True)

    # Sheet 2: All candidates
    ws2 = wb.create_sheet("All Candidates")
    for r_idx, row in enumerate(dataframe_to_rows(results_df, index=False, header=True), 1):
        for c_idx, value in enumerate(row, 1):
            ws2.cell(row=r_idx, column=c_idx, value=value)

    # Sheet 3: Summary stats
    ws3 = wb.create_sheet("Summary")
    ws3['A1'] = "Crosswalk Summary Statistics"
    ws3['A1'].font = Font(bold=True, size=14)

    stats = [
        ("", ""),
        ("Total Alberta Codes", len(best_matches)),
        ("High Confidence Matches", len(best_matches[best_matches['confidence_level']=='High'])),
        ("Medium Confidence Matches", len(best_matches[best_matches['confidence_level']=='Medium'])),
        ("Low Confidence Matches", len(best_matches[best_matches['confidence_level']=='Low'])),
        ("", ""),
        ("Average Semantic Similarity", round(best_matches['semantic_similarity'].mean(), 3)),
        ("Average Fee Ratio (ON/AB)", round(best_matches['fee_ratio'].mean(), 3)),
        ("", ""),
        ("Codes with QA Flags", len(best_matches[best_matches['qa_flags'] != '✓'])),
    ]

    for row_idx, (label, value) in enumerate(stats, 3):
        ws3.cell(row=row_idx, column=1, value=label)
        ws3.cell(row=row_idx, column=2, value=value)

    # Save
    wb.save(output_file)
    print(f"\n✓ Exported to {output_file}")

## 8. Run the Pipeline

In [26]:
# Upload your files to Colab first, then run:

# Option 1: Upload files manually via Colab UI
# from google.colab import files
# uploaded = files.upload()

# Option 2: Mount Google Drive
# from google.colab import drive
# drive.mount('/content/drive')
# config.alberta_file = '/content/drive/MyDrive/path/to/merged_output_AB__2_.xlsx'
# config.ontario_fee_file = '/content/drive/MyDrive/path/to/OCTSOB2025.001 2'
# config.ontario_pdf_file = '/content/drive/MyDrive/path/to/moh-schedule-benefit-2024-03-04.pdf'

In [25]:
# Run the pipeline
results = run_crosswalk_pipeline(config)

BILLING CODE CROSSWALK PIPELINE

[1/4] Loading data...


FileNotFoundError: [Errno 2] No such file or directory: 'merged_output_AB__2_.xlsx'

In [None]:
# View best matches
best_matches = results[results['match_rank'] == 1]
display(best_matches[['ab_code', 'ab_description', 'ab_fee', 'on_code', 'on_description',
                       'on_fee', 'semantic_similarity', 'confidence_level', 'qa_flags']])

In [None]:
# Export results
export_crosswalk(results, config.output_file)

# Download file (Colab)
# from google.colab import files
# files.download(config.output_file)

## 9. Manual Review Helper

Use this to inspect specific codes that need attention.

In [None]:
def review_code(ab_code: str, results_df: pd.DataFrame):
    """
    Display all candidate matches for a specific Alberta code.
    """
    matches = results_df[results_df['ab_code'] == ab_code].copy()

    if len(matches) == 0:
        print(f"No matches found for {ab_code}")
        return

    print(f"\n{'='*60}")
    print(f"ALBERTA CODE: {ab_code}")
    print(f"Description: {matches.iloc[0]['ab_description']}")
    print(f"Fee: ${matches.iloc[0]['ab_fee']}")
    print(f"{'='*60}")

    print(f"\nTop {len(matches)} Ontario Candidates:\n")

    for _, row in matches.iterrows():
        print(f"  [{row['match_rank']}] {row['on_code']}: {row['on_description']}")
        print(f"      Fee: ${row['on_fee']:.2f} | Ratio: {row['fee_ratio']}x")
        print(f"      Semantic: {row['semantic_similarity']:.3f} | Confidence: {row['confidence_level']}")
        print(f"      Flags: {row['qa_flags']}")
        print()

# Example usage:
# review_code('03.03A', results)

In [None]:
def get_flagged_codes(results_df: pd.DataFrame) -> pd.DataFrame:
    """
    Return all codes that have QA flags for manual review.
    """
    best = results_df[results_df['match_rank'] == 1]
    flagged = best[best['qa_flags'] != '✓']

    print(f"Codes requiring review: {len(flagged)}")
    return flagged[['ab_code', 'ab_description', 'on_code', 'on_description',
                    'confidence_level', 'qa_flags']]

# Example usage:
# flagged = get_flagged_codes(results)
# display(flagged)

## 10. Validation Against Known Mappings

If you have expert-validated mappings, use this to measure accuracy.

In [None]:
def validate_against_ground_truth(results_df: pd.DataFrame,
                                   ground_truth: Dict[str, str]) -> Dict:
    """
    Validate pipeline results against known correct mappings.

    Args:
        results_df: Pipeline output
        ground_truth: Dict mapping ab_code -> correct_on_code

    Returns:
        Dict with accuracy metrics
    """
    best_matches = results_df[results_df['match_rank'] == 1].copy()

    correct = 0
    total = 0
    errors = []

    for ab_code, correct_on in ground_truth.items():
        match = best_matches[best_matches['ab_code'] == ab_code]

        if len(match) == 0:
            errors.append((ab_code, 'NO_MATCH', correct_on))
            total += 1
            continue

        predicted_on = match.iloc[0]['on_code']

        # Check if correct (allowing for composite codes)
        if correct_on in predicted_on or predicted_on in correct_on:
            correct += 1
        else:
            errors.append((ab_code, predicted_on, correct_on))

        total += 1

    accuracy = correct / total if total > 0 else 0

    return {
        'accuracy': accuracy,
        'correct': correct,
        'total': total,
        'errors': errors
    }

# Example: Define known correct mappings for validation
# ground_truth = {
#     '03.03A': 'A001',
#     '03.04A': 'A007',
#     '03.08A': 'A005',
#     'X310': 'J135',
# }
#
# validation = validate_against_ground_truth(results, ground_truth)
# print(f"Accuracy: {validation['accuracy']:.1%}")
# print(f"Errors: {validation['errors']}")

---

## Notes for Production Use

1. **Embedding Model**: Consider using a medical-domain model like `pritamdeka/S-PubMedBert-MS-MARCO` for better clinical terminology matching.

2. **Fee Data**: The Ontario fee file format may change between releases. Verify the field positions.

3. **PDF Extraction**: The regex patterns may need adjustment for different SOB PDF versions.

4. **Confidence Thresholds**: Tune based on your tolerance for false positives vs. false negatives.

5. **Human Review**: Always have a domain expert review Low and Medium confidence matches.

6. **Version Control**: Track which versions of the AB and ON fee schedules were used.