# Patronage Pipeline Runner
This notebook runs the Patronage data pipeline. Pipeline logic is implemented in the `patronage_modularized` Python package; the notebook provides a simple run interface and validation utilities.

## Quick Start (Recommended Run Order)
1. **Cell 4 — Imports & Setup**: prepares imports and reloads pipeline modules
2. **Cell 5 — (Optional) Logging Verbosity**: set `VERBOSE_LOGGING = True` for detailed logs
3. **Cell 6 — Automatic Mode Detection**: selects `update` vs `rebuild` based on the target Delta table state
4. **Cell 7 — Execute Pipeline**: runs `orchestrator.run_pipeline(...)` using the selected mode

## Processing Modes (Auto-detected)
- **rebuild**: Full historical rebuild (used when the target table is missing or empty)
- **update**: Incremental processing when the target table already contains data

## What happens during UPDATE (high level)
- Processes new Caregiver (CG) files
- Processes new Service-Connected Disability (SCD) files
- Updates PT indicator flags (as configured in the pipeline)
- Applies SCD Type 2 merge logic to preserve history (audit trail via active/expired records)

## Notes
- **Cell 2** is column mapping / lineage documentation (schema reference).

# Patronage - Column Mapping

## Delta Table Schema Documentation

This table documents the complete data lineage for all columns in the Patronage delta table, including source systems, original column names, and any transformations applied.

| Delta Table Column Name | Source | Original Column Name | Description of Transformation | Sent to DMDC |
|------------------------|--------|---------------------|------------------------------|---------------|
| **edipi** | Identity Correlations Delta Table | edipi | Direct mapping from ICN correlation lookup | **Yes** |
| **ICN** | Multiple Sources | Caregiver_ICN__c (CG)<br/>ICN (Identity Correlations)<br/>ICN (from participant_id lookup) | **CG Source**: Truncated to first 10 characters from Caregiver_ICN__c<br/>**SCD Source**: Mapped from participant_id via identity correlations<br/>**Seed File**: Truncated to first 10 characters | No |
| **Veteran_ICN** | Caregiver Sources Only | Veteran_ICN__c | **CG Source**: Truncated to first 10 characters<br/>**SCD/PAI Sources**: Set to NULL | No |
| **participant_id** | Multiple Sources | participant_id (Identity Correlations)<br/>PTCPNT_ID (SCD)<br/>PTCPNT_VET_ID (PAI) | **SCD Source**: Direct mapping from PTCPNT_ID<br/>**PAI Source**: Direct mapping from PTCPNT_VET_ID<br/>**CG Source**: Retrieved via ICN lookup from identity correlations | No |
| **Batch_CD** | System Generated | N/A | **CG Records**: Hard-coded as "CG"<br/>**SCD Records**: Hard-coded as "SCD"<br/>**PAI Records**: Hard-coded as "SCD" (processed as SCD updates) | **Yes** |
| **Applicant_Type** | Caregiver Sources Only | Applicant_Type__c | **CG Source**: Direct mapping<br/>**SCD/PAI Sources**: Set to NULL | No |
| **Caregiver_Status** | Caregiver Sources Only | Caregiver_Status__c | **CG Source**: Direct mapping<br/>**SCD/PAI Sources**: Set to NULL | No |
| **SC_Combined_Disability_Percentage** | SCD Sources Only | CMBNED_DEGREE_DSBLTY | **SCD Source**: Zero-padded to 3 digits, empty strings converted to "000"<br/>**CG/PAI Sources**: Set to NULL | **Yes** |
| **PT_Indicator** | PAI Sources + Default | PT_35_FLAG (PAI)<br/>target_PT_Indicator (existing records) | **PAI Source**: Direct mapping from PT_35_FLAG<br/>**SCD Records**: Defaults to "N" for new records, preserves existing values<br/>**CG Records**: Set to NULL | **Yes** |
| **Individual_Unemployability** | Not Currently Populated | N/A | Set to NULL for all sources (placeholder for future implementation) | **Yes** |
| **Status_Begin_Date** | Multiple Sources | Dispositioned_Date__c (CG)<br/>DSBL_DTR_DT (SCD)<br/>target_Status_Begin_Date (existing) | **CG Source**: Date formatted from Dispositioned_Date__c to YYYYMMDD<br/>**SCD Source**: Uses existing Status_Begin_Date or DSBL_DTR_DT if new record<br/>**Date Format**: Converted from MMddyyyy to yyyyMMdd | **Yes** |
| **Status_Last_Update** | Multiple Sources | DSBL_DTR_DT (SCD)<br/>N/A (CG) | **SCD Source**: Direct mapping from DSBL_DTR_DT<br/>**CG Source**: Set to NULL | **Yes** |
| **Status_Termination_Date** | Caregiver Sources Only | Benefits_End_Date__c | **CG Source**: Date formatted from Benefits_End_Date__c to YYYYMMDD<br/>**SCD/PAI Sources**: Set to NULL | **Yes** |
| **SDP_Event_Created_Timestamp** | File Metadata | _metadata.file_modification_time<br/>CreatedDate (seed) | **All File Sources**: Extracted from file modification timestamp<br/>**Seed File**: Uses configured start datetime<br/>**PAI Delta Table**: Uses current datetime | No |
| **filename** | File Metadata + System | _metadata.file_name<br/>Path (seed)<br/>Generated (PAI) | **File Sources**: Extracted from file metadata<br/>**Seed File**: Full file path<br/>**PAI Delta Updates**: Generated description with timestamp | No |
| **RecordLastUpdated** | System Generated | N/A | **New Records**: Set to NULL<br/>**Updated Records**: Set to SDP_Event_Created_Timestamp during merge | No |
| **RecordStatus** | System Generated | N/A | **Active Records**: Set to TRUE<br/>**Expired Records**: Set to FALSE during SCD Type 2 updates | No |
| **sentToDoD** | System Generated | N/A | **New Records**: Set to FALSE<br/>**Expired Records**: Set to TRUE during updates | No |
| **change_log** | System Generated | N/A | **New Records**: "New Record"<br/>**Updated Records**: Detailed log of field changes with old→new values | No |
| **RecordChangeStatus** | System Generated | N/A | **New Records**: "New Record"<br/>**Updated Records**: "Updated Record"<br/>**Expired Records**: "Expired Record" | No |

## Data Source Details

### Primary Data Sources:
1. **Caregiver Events (CG)**: CARMA system CSV files (`caregiverevent*.csv`)
2. **Service-Connected Disability (SCD)**: VADIR (`CPIDODIEX_*.csv`)
3. **PT Indicator Legacy (PAI)**: Text files (`WRTS*.txt`)
4. **PT Indicator Modern (PAI)**: VBA Delta table (`DW_ADHOC_RECURR.DOD_PATRONAGE_SCD_PT`)
5. **Identity Correlations**: MVI Delta table mapping ICNs to EDIPIs and participant IDs
6. **Seed Data**: Initial caregiver population CSV file

### Key Transformation Patterns:
- **ICN Standardization**: All ICNs truncated to 10 characters for consistency
- **Date Standardization**: All dates converted to YYYYMMDD string format
- **Null Handling**: Explicit NULL assignment for irrelevant fields per source type
- **Change Detection**: xxhash64 used for efficient change identification
- **Deduplication**: Window functions ensure latest record per unique key combination
- **Audit Trail**: Complete change tracking with before/after values

In [0]:
from IPython.display import HTML, display

mermaid_diagram_clean = r"""
flowchart TD
  subgraph "MVI Sources"
    SRC1["MVI Person<br/>SMVIPerson<br/>MVIPersonICN, ICNStatus"]
    SRC2["MVI Site Assoc<br/>SMVIPersonSiteAssociation<br/>TreatingFacilityPersonIdentifier, CorrelationModifiedDateTime"]
    SRC3["MVI Institution<br/>NDim.MVIInstitution<br/>MVIInstitutionSID, InstitutionCode"]
  end

  subgraph "Identity Correlation"
    ID1["Filter + Latest-per-ICN<br/>ActiveMergedIdentifier ∈ {Active, NULL}<br/>Latest person per ICN (calc_IngestionTimestamp)"]
    ID2["Join PSA + Institution<br/>Pivot by InstitutionCode"]
    ID3["Quarantine duplicates<br/>Ambiguous mappings"]
    ID4["Identity lookup columns used downstream<br/>ICN, participant_id, edipi"]
  end

  subgraph "Identity Storage"
    IST1["Identity Lookup Delta<br/>IDENTITY_TABLE_NAME"]
    IST2["Duplicates Delta<br/>DUP_IDENTITY_TABLE_NAME"]
  end

  subgraph "Mode Detection (Notebook Runner)"
    MD1["Target table check<br/>Delta exists? + rowcount > 0"]
    MD2{"Mode"}
    MD3["UPDATE<br/>End boundary: today 00:00 UTC"]
    MD4["REBUILD<br/>End boundary: prev month end (UTC)"]
  end

  subgraph "Discovery"
    DISC["Per-batch discovery (CG, SCD)<br/>Start: checkpoint (UPDATE) or begin_date (REBUILD)<br/>Select files where start < mod_time < end"]
  end

  subgraph "CG Processing"
    CG0["CG seed file<br/>(only if REBUILD and target empty)"]
    CG1["Transform + dedupe"]
    CG2["Join identity lookup on ICN"]
    CG3["Batch_CD='CG'"]
  end

  subgraph "SCD + PT Processing"
    S0["For each SCD file discovered"]
    S1["Normalize + integrate PT<br/>PT delta always; PT seed only on REBUILD"]
    S2["Join identity lookup; drop null ICN"]
    S3["Batch_CD='SCD'"]
  end

  subgraph "SCD Type 2 Merge"
    MRG0["Load active target<br/>patronage_unified (RecordStatus=True)"]
    MRG1["Change detection + business rules"]
    MRG2["Prepare inserts + expirations"]
    MRG3["Delta merge into patronage_unified<br/>Full history via RecordStatus"]
  end

  subgraph "Scheduled Tasks"
    SCH1["EDIPI backfill<br/>Last Friday of month (UTC)"]
    SCH2["DMDC export (Wed/Fri)<br/>Window: last&nbsp;checkpoint&nbsp;→&nbsp;today&nbsp;00:00&nbsp;UTC"]
  end

  %% Flow
  SRC1 & SRC2 & SRC3 --> ID1 --> ID2 --> ID4 --> IST1
  ID2 --> ID3 --> IST2

  IST1 --> MD1 --> MD2
  MD2 -->|UPDATE| MD3 --> DISC
  MD2 -->|REBUILD| MD4 --> DISC

  DISC -->|CG files| CG0 --> CG1 --> CG2 --> CG3 --> MRG0
  DISC -->|SCD files| S0 --> S1 --> S2 --> S3 --> MRG0

  MRG0 --> MRG1 --> MRG2 --> MRG3 --> SCH1
  MRG3 --> SCH2

  %% Styling (reuse existing palette for consistency)
  classDef mviSource fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  classDef identityEngine fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
  classDef storage fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
  classDef modeDetection fill:#fff3e0,stroke:#f57c00,stroke-width:2px
  classDef processing fill:#e0f2f1,stroke:#00796b,stroke-width:2px
  classDef merge fill:#f9fbe7,stroke:#827717,stroke-width:2px
  classDef scheduled fill:#fff8e1,stroke:#ff8f00,stroke-width:2px

  class SRC1,SRC2,SRC3 mviSource
  class ID1,ID2,ID3,ID4 identityEngine
  class IST1,IST2 storage
  class MD1,MD2,MD3,MD4 modeDetection
  class DISC,CG0,CG1,CG2,CG3,S0,S1,S2,S3 processing
  class MRG0,MRG1,MRG2,MRG3 merge
  class SCH1,SCH2 scheduled
"""

display(HTML(f"""
<div class=\"mermaid\">
{mermaid_diagram_clean}
</div>
<script src=\"https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js\"></script>
<script>
  mermaid.initialize({{ startOnLoad: true, theme: 'default', flowchart: {{ useMaxWidth: true, htmlLabels: true }} }});
</script>
"""))

In [0]:
import importlib
from databricks.sdk.runtime import dbutils
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
import sys

# IMPORTANT: sys.path must point to the PARENT folder of the package
# so Python can resolve `import patronage_modularized`.
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
parts = notebook_path.strip("/").split("/")
if parts[0] == "Workspace":
    base = "/" + "/".join(parts[:4])
else:
    base = "/Workspace/" + "/".join(parts[:3])

if base not in sys.path:
    sys.path.insert(0, base)

# In the modularized pipeline, logging + constants live in config.
from patronage_modularized import config as pipeline
import patronage_modularized.orchestrator as orchestrator

importlib.reload(pipeline)
importlib.reload(orchestrator)

pipeline.log_message('Pipeline modules imported and reloaded.')
pipeline.log_message(f'Target Table Path: {pipeline.PATRONAGE_TABLE_PATH}')


In [0]:
# Set this to True for detailed, verbose logging, or False for summary logging.
VERBOSE_LOGGING = False

# Set the verbosity in the pipeline module
pipeline.LOGGING_VERBOSE = VERBOSE_LOGGING

pipeline.log_message(f"Logging verbosity set to: {'DETAILED' if VERBOSE_LOGGING else 'SUMMARY'}")


In [0]:
# =============================================================================
# AUTOMATIC MODE DETECTION & INITIALIZATION
# =============================================================================

pipeline.log_message("Analyzing current table state to determine processing mode...")

try:
    # Check if the target Delta table exists and is not empty.
    if DeltaTable.isDeltaTable(spark, pipeline.PATRONAGE_TABLE_PATH) and spark.read.format("delta").load(pipeline.PATRONAGE_TABLE_PATH).count() > 0:
        processing_mode = "update"
        pipeline.log_message(f"Target table '{pipeline.PATRONAGE_TABLE_NAME}' found with data.")
    else:
        processing_mode = "rebuild"
        pipeline.log_message(f"Target table '{pipeline.PATRONAGE_TABLE_NAME}' is missing or empty.")

except Exception as e:
    # If any error occurs (e.g., path not found), default to rebuild.
    processing_mode = "rebuild"
    pipeline.log_message(f"Could not access target table. Defaulting to REBUILD mode. Error: {str(e)}")

pipeline.log_message(f"Selected Processing Mode: {processing_mode.upper()}")

In [0]:
# Execute the Pipeline
# Run the pipeline with the determined mode and logging setting
try:
    pipeline.log_message("Starting the VA Patronage Pipeline...")
    orchestrator.run_pipeline(processing_mode, verbose_logging=VERBOSE_LOGGING)
    pipeline.log_message("Completed processing VA Patronage Pipeline...")
except Exception as e:
    pipeline.log_message(f"Pipeline error: {str(e)}")
    raise  # Re-raise to see full traceback


## File Processing Reconciliation

This section validates that all files in blob storage have been processed and recorded in the Delta table. It compares files from the current month (start of month to yesterday) to ensure no files were missed during pipeline execution.

In [0]:
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import col, lit, to_timestamp

def reconcile_file_processing(batch_cd, start_date=None, end_date=None, show_details=True):
    """
    Reconcile blob storage files with Delta table processed files.
    
    Parameters:
        batch_cd: 'CG' or 'SCD'
        start_date: Start date for comparison (default: start of current month)
        end_date: End date for comparison (default: yesterday 23:59:59)
        show_details: Display missing files or just summary
    
    Returns:
        Dict with reconciliation results
    """
    pipeline.log_message(f"FILE RECONCILIATION REPORT: {batch_cd}")
    
    # Calculate default date range
    if not start_date:
        # Start of current month
        now = datetime.now(timezone.utc)
        start_date = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
    else:
        start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
    
    if not end_date:
        # Yesterday end of day
        now = datetime.now(timezone.utc)
        end_date = (now - relativedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=999999)
        
    else:
        end_date = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc)
    
    pipeline.log_message(f"Date Range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
        
    # Get configuration from pipeline
    source_config = pipeline.PIPELINE_CONFIG[batch_cd]
    source_path = source_config['path']
    
    # 1. Get all files from blob storage in date range
    try:
        file_list = dbutils.fs.ls(source_path)
        
        blob_files = []
        for file_info in file_list:
            # Convert millisecond timestamp to UTC datetime
            mod_time_utc = datetime.fromtimestamp(file_info.modificationTime / 1000, tz=timezone.utc)
            
            # Apply filename matching logic
            is_match = (
                (batch_cd == 'SCD' and file_info.name.startswith(source_config['matching_characters'])) or
                (batch_cd == 'CG' and file_info.name.lower().endswith('.csv') and 'caregiver' in file_info.name.lower())
            )
            
            # Filter by date range
            if is_match and start_date <= mod_time_utc <= end_date:
                blob_files.append({
                    'path': file_info.path,
                    'modification_time': mod_time_utc
                })
        
        pipeline.log_message(f"Found {len(blob_files):,} files in blob storage ({source_path})")
        
    except Exception as e:
        pipeline.log_message(f"Error accessing blob storage: {str(e)}")
        return None
    
    # 2. Get processed files from Delta table
    try:
        delta_table = spark.table(pipeline.PATRONAGE_TABLE_NAME)
        
        # Filter by Batch_CD and date range
        processed_files_df = delta_table.filter(
            (col("Batch_CD") == batch_cd) &
            (col("SDP_Event_Created_Timestamp") >= lit(start_date)) &
            (col("SDP_Event_Created_Timestamp") <= lit(end_date))
        ).select(
            col("filename"),
            col("SDP_Event_Created_Timestamp")
        ).distinct()
        
        processed_count = processed_files_df.count()
        pipeline.log_message(f"Found {processed_count:,} distinct files in Delta table")
        
    except Exception as e:
        pipeline.log_message(f"Error querying Delta table: {str(e)}")
        return None
    
    # 3. Find missing files (in blob but not in Delta)
    if len(blob_files) == 0:
        pipeline.log_message(f"\nWarning: No files found in blob storage for specified date range")
        return {'blob_count': 0, 'delta_count': processed_count, 'missing_count': 0}
    
    # Create DataFrame from blob files
    blob_df = spark.createDataFrame(blob_files)
    
    # Left anti join to find files in blob but not in Delta
    missing_files_df = blob_df.alias("blob").join(
        processed_files_df.alias("delta"),
        col("blob.path") == col("delta.filename"),
        "left_anti"
    ).select(
        col("blob.path").alias("filename"),
        col("blob.modification_time")
    ).orderBy("modification_time")
    
    missing_count = missing_files_df.count()
    
    # 4. Display results
    pipeline.log_message(f"RECONCILIATION SUMMARY:")
    pipeline.log_message(f"Blob Storage Files:    {len(blob_files):,}")
    pipeline.log_message(f"Delta Table Records:   {processed_count:,}")
    pipeline.log_message(f"Missing Files:         {missing_count:,}")
    
    if missing_count > 0:
        pipeline.log_message(f"\nWARNING: {missing_count} file(s) in blob storage not found in Delta table!")
        
        if show_details:
            pipeline.log_message(f"\nMissing File Details:")
            missing_files_df.show(truncate=False)
            
        pipeline.log_message(f"Recommendation: Run pipeline in UPDATE mode to process missing files.")
    else:
        pipeline.log_message(f"SUCCESS: All blob storage files for {batch_cd.upper()} have been processed!")
    
    pipeline.log_message(f"{'='*70}\n")
    
    return {
        'blob_count': len(blob_files),
        'delta_count': processed_count,
        'missing_count': missing_count,
        'missing_files': missing_files_df if missing_count > 0 else None
    }

# Execute reconciliation for CG and SCD
pipeline.log_message("="*70)
pipeline.log_message("  VA PATRONAGE FILE PROCESSING RECONCILIATION")
pipeline.log_message("="*70)

cg_results = reconcile_file_processing('CG', show_details=True)
scd_results = reconcile_file_processing('SCD', show_details=True)

# Overall summary
if cg_results and scd_results:
    total_missing = cg_results['missing_count'] + scd_results['missing_count']
    
    pipeline.log_message(f"OVERALL RECONCILIATION SUMMARY")

    pipeline.log_message(f"CG Missing Files:   {cg_results['missing_count']:,}")
    pipeline.log_message(f"SCD Missing Files:  {scd_results['missing_count']:,}")
    pipeline.log_message(f"Total Missing:      {total_missing:,}")
    
    if total_missing == 0:
        pipeline.log_message(f"All files processed successfully! No action required.")
    else:
        pipeline.log_message(f"\nAction required: {total_missing} file(s) need processing")
    
    pipeline.log_message(f"{'='*70}\n")

In [0]:
# =============================================================================
# PATRONAGE TABLE DATA ANALYSIS - Quick Analytics Dashboard
# =============================================================================

# Import required modules
from datetime import datetime

print("PATRONAGE TABLE DATA ANALYSIS")
print("=" * 60)

# Get the table name from the pipeline module
table_name = pipeline.PATRONAGE_TABLE_NAME

try:
    print(f"Analyzing table: {table_name}")
    print(f"Analysis timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("-" * 60)
    
    # 1. Total Active records for both CG and SCD
    print("ACTIVE RECORDS BY BATCH TYPE")
    active_by_batch = spark.sql(f"""
        SELECT Batch_CD, 
               COUNT(*) as Active_Records
        FROM {table_name}
        WHERE RecordStatus = true
        GROUP BY Batch_CD
        ORDER BY Batch_CD
    """)
    active_by_batch.display()
    
    # 2. Total Null ICN for CG
    print("NULL ICN ANALYSIS FOR CG RECORDS")
    cg_null_icn = spark.sql(f"""
        SELECT 
            COUNT(*) as Total_CG_Active_Records,
            SUM(CASE WHEN ICN IS NULL THEN 1 ELSE 0 END) as Null_ICN_Count,
            SUM(CASE WHEN ICN IS NOT NULL THEN 1 ELSE 0 END) as Valid_ICN_Count,
            ROUND((SUM(CASE WHEN ICN IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*)), 2) as Null_ICN_Percentage
        FROM {table_name}
        WHERE Batch_CD = 'CG' AND RecordStatus = true
    """)
    cg_null_icn.display()
    
    # 3. Total active records for PT_Indicator (both 'Y' and 'N') active records only
    print("PT INDICATOR DISTRIBUTION (ACTIVE RECORDS ONLY)")
    pt_indicator_analysis = spark.sql(f"""
        SELECT PT_Indicator,
               COUNT(*) as Record_Count,
               ROUND((COUNT(*) * 100.0 / SUM(COUNT(*)) OVER ()), 2) as Percentage
        FROM {table_name}
        WHERE RecordStatus = true AND Batch_CD = 'SCD'
        GROUP BY PT_Indicator
        ORDER BY PT_Indicator
    """)
    pt_indicator_analysis.display()

    # 4. Null participant_id analysis for SCD
    print("NULL participant_id ANALYSIS FOR SCD RECORDS")
    scd_null_participant_id = spark.sql(f"""
        SELECT
            COUNT(*) as Total_SCD_Active_Records,
            SUM(CASE WHEN participant_id IS NULL THEN 1 ELSE 0 END) as Null_participant_id_Count,
            SUM(CASE WHEN participant_id IS NOT NULL THEN 1 ELSE 0 END) as Valid_participant_id_Count,
            ROUND((SUM(CASE WHEN participant_id IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*)), 2) as Null_participant_id_Percentage
        FROM {table_name}
        WHERE Batch_CD = 'SCD' AND RecordStatus = true
    """)
    scd_null_participant_id.display()
    
    # 5. Active records per SDP_Event_Created_Timestamp 
    print("PROCESSING DATES BY RECORD COUNT")
    records_by_timestamp = spark.sql(f"""
        SELECT DATE(SDP_Event_Created_Timestamp) as Processing_Date,
               COUNT(*) as Record_Count
        FROM {table_name}
        WHERE RecordStatus = true
        GROUP BY DATE(SDP_Event_Created_Timestamp)
        ORDER BY Processing_Date DESC, Record_Count DESC
    """)
    records_by_timestamp.display()
    
    # 6. Active Record count for each Applicant_Type
    print("ACTIVE RECORDS BY APPLICANT TYPE")
    records_by_applicant_type = spark.sql(f"""
        SELECT 
            COALESCE(Applicant_Type, 'NULL/Not Specified') as Applicant_Type,
            COUNT(*) as Record_Count,
            ROUND((COUNT(*) * 100.0 / SUM(COUNT(*)) OVER ()), 2) as Percentage
        FROM {table_name}
        WHERE RecordStatus = true AND Batch_CD = 'CG'
        GROUP BY Applicant_Type
        ORDER BY Record_Count DESC
    """)
    records_by_applicant_type.display()
    
    # 7. Active Record count for each Caregiver_Status
    print("ACTIVE RECORDS BY CAREGIVER STATUS")
    records_by_caregiver_status = spark.sql(f"""
        SELECT 
            COALESCE(Caregiver_Status, 'NULL/Not Specified') as Caregiver_Status,
            COUNT(*) as Record_Count,
            ROUND((COUNT(*) * 100.0 / SUM(COUNT(*)) OVER ()), 2) as Percentage
        FROM {table_name}
        WHERE RecordStatus = true AND Batch_CD = 'CG'
        GROUP BY Caregiver_Status
        ORDER BY Record_Count DESC
    """)
    records_by_caregiver_status.display()
    
    # 8. Record count for each RecordStatus 
    print("RECORD STATUS DISTRIBUTION")
    records_by_status = spark.sql(f"""
        SELECT RecordStatus,
               COUNT(*) as Record_Count,
               ROUND((COUNT(*) * 100.0 / SUM(COUNT(*)) OVER ()), 2) as Percentage
        FROM {table_name}
        GROUP BY RecordStatus
        ORDER BY RecordStatus DESC
    """)
    records_by_status.display()
    
    # 9. Record counts per filename 
    print("FILES BY RECORD COUNT")
    records_by_filename = spark.sql(f"""
        SELECT 
            CASE 
                WHEN filename LIKE '%/%' THEN REVERSE(SPLIT(REVERSE(filename), '/')[0])
                ELSE filename
            END as File_Name,
            COUNT(*) as Record_Count,
            MIN(SDP_Event_Created_Timestamp) as Processing_Timestamp
        FROM {table_name}
        GROUP BY filename
        ORDER BY Processing_Timestamp DESC
    """)
    records_by_filename.display(truncate=False)
    
    # 10. Overall Summary Statistics
    print("\nOVERALL SUMMARY STATISTICS")
    summary_stats = spark.sql(f"""
        SELECT 
            COUNT(*) as Total_Records,
            SUM(CASE WHEN RecordChangeStatus = 'New Record' THEN 1 ELSE 0 END) as New_Records,
            SUM(CASE WHEN RecordChangeStatus = 'Updated Record' THEN 1 ELSE 0 END) as Updated_Records,
            SUM(CASE WHEN RecordChangeStatus = 'Expired Record' THEN 1 ELSE 0 END) as Expired_Records,
            COUNT(DISTINCT filename) as Total_Files_Processed,
            COUNT(DISTINCT Batch_CD) as Batch_Types,
            MIN(SDP_Event_Created_Timestamp) as Earliest_Record,
            MAX(SDP_Event_Created_Timestamp) as Latest_Record
        FROM {table_name}
    """)
    summary_stats.display(truncate=False)
    
    # 11. Batch Code Processing Timeline
    print("BATCH CODE PROCESSING TIMELINE")
    batch_timeline_stats = spark.sql(f"""
        SELECT
            Batch_CD as Batch_Type,
            MIN(SDP_Event_Created_Timestamp) as Earliest_Record,
            MAX(SDP_Event_Created_Timestamp) as Latest_Record,
            COUNT(*) as Total_Records,
            COUNT(DISTINCT filename) as Files_Processed
        FROM {table_name}
        GROUP BY Batch_CD
        ORDER BY Batch_CD
    """)
    batch_timeline_stats.display()
    
    print("-" * 60)
    print("Data analysis completed successfully!")
    
except Exception as e:
    print(f"Error during data analysis: {str(e)}")
    print("Please ensure the table exists and is accessible.")

In [0]:
from pyspark.sql import functions as F
# Get the table name from the pipeline module
table_name = pipeline.PATRONAGE_TABLE_NAME

def _get_latest_timestamp(df, batch_cd=None):
    if batch_cd:
        df = df.filter(F.col("Batch_CD") == batch_cd)
    ts_row = (
        df.select(F.max("SDP_Event_Created_Timestamp").alias("max_ts"))
          .filter(F.col("max_ts").isNotNull())
          .collect()
    )
    return ts_row[0]["max_ts"] if ts_row else None

def compare_latest(batch_cd: str):
    batch_cd = batch_cd.upper()
    if batch_cd not in ("SCD", "CG"):
        raise ValueError("batch_cd must be 'SCD' or 'CG'")

    patronage_base = spark.table(table_name).filter(
        (F.col("RecordStatus") == True) & (F.col("Batch_CD") == batch_cd) & F.col("edipi").isNotNull()
    )
    staging_path = (
        "delta.`/mnt/Patronage/SCD_Staging`"
        if batch_cd == "SCD"
        else "delta.`/mnt/Patronage/Caregivers_Staging_New`"
    )
    staging_base = spark.sql(f"SELECT * FROM {staging_path}").filter(F.col("edipi").isNotNull())

    latest_ts = _get_latest_timestamp(patronage_base)
    latest_stg_ts = _get_latest_timestamp(staging_base)

    # align to the newer of the two so we compare the freshest data each table has
    compare_ts = max(ts for ts in [latest_ts, latest_stg_ts] if ts is not None)
    ts_lit = F.lit(compare_ts)

    pat_df = patronage_base.filter(F.col("SDP_Event_Created_Timestamp") == ts_lit).select("edipi").dropDuplicates()
    stg_df = staging_base.filter(F.col("SDP_Event_Created_Timestamp") == ts_lit).select("edipi").dropDuplicates()

    in_pat_not_stg = pat_df.join(stg_df, "edipi", "left_anti")
    in_stg_not_pat = stg_df.join(pat_df, "edipi", "left_anti")

    counts = {
        "batch_cd": batch_cd,
        "timestamp_compared": compare_ts,
        "patronage_count": pat_df.count(),
        "staging_count": stg_df.count(),
        "in_patronage_only": in_pat_not_stg.count(),
        "in_staging_only": in_stg_not_pat.count(),
    }

    print(f"\nBatch {batch_cd} @ {compare_ts}")
    for k, v in counts.items():
        if k not in ("batch_cd", "timestamp_compared"):
            print(f"{k}: {v:,}")

    return {
        "counts": counts,
        "patronage_only": in_pat_not_stg,
        "staging_only": in_stg_not_pat,
    }

# Run for both batches
result_scd = compare_latest("SCD")
result_cg  = compare_latest("CG")

# Optional: inspect EDIPIs
result_scd["patronage_only"].display()
result_scd["staging_only"].display()

In [0]:

from pyspark.sql import functions as F
from patronage_modularized import config as pipeline

# --- Caregivers ---
cg_staging_tbl = "delta.`/mnt/Patronage/Caregivers_Staging_New`"
cg_patronage_tbl = pipeline.PATRONAGE_TABLE_NAME


cg_staging_df = spark.sql(f"""
SELECT 'Staging CG' as table_name, SDP_Event_Created_Timestamp as dates, COUNT(*) as count, 
       ROW_NUMBER() OVER (ORDER BY SDP_Event_Created_Timestamp DESC) as rn
FROM {cg_staging_tbl}
WHERE Batch_CD = 'CG'
GROUP BY SDP_Event_Created_Timestamp
ORDER BY SDP_Event_Created_Timestamp DESC
""").filter("rn <= 10")

# Single DF for patronage
cg_patronage_df = spark.sql(f"""
SELECT 'Patronage CG' as table_name, SDP_Event_Created_Timestamp as dates, COUNT(*) as count, 
       ROW_NUMBER() OVER (ORDER BY SDP_Event_Created_Timestamp DESC) as rn
FROM {cg_patronage_tbl}
WHERE Batch_CD = 'CG'
GROUP BY SDP_Event_Created_Timestamp
ORDER BY SDP_Event_Created_Timestamp DESC
""").filter("rn <= 10")

# Join side by side
cg_combined_df = (
    cg_staging_df.alias("s")
    .join(cg_patronage_df.alias("p"), "rn", "outer")
    .select(
        F.col("s.table_name").alias("Staging Table Name"),
        F.col("s.dates").alias("Staging Dates"),
        F.col("s.count").alias("Staging Count"),
        F.col("p.table_name").alias("Patronage Table Name"),
        F.col("p.dates").alias("Patronage Dates"),
        F.col("p.count").alias("Patronage Count")
    )
)

pipeline.log_message("Caregivers Comparison")
cg_combined_df.display()

# --- SCD ---
scd_staging_tbl = "delta.`/mnt/Patronage/SCD_Staging`"
scd_patronage_tbl = pipeline.PATRONAGE_TABLE_NAME

# Single DF for staging
scd_staging_df = spark.sql(f"""
SELECT 'Staging SCD' as table_name, SDP_Event_Created_Timestamp as dates, COUNT(*) as count, 
       ROW_NUMBER() OVER (ORDER BY SDP_Event_Created_Timestamp DESC) as rn
FROM {scd_staging_tbl}
WHERE Batch_CD = 'SCD'
GROUP BY SDP_Event_Created_Timestamp
ORDER BY SDP_Event_Created_Timestamp DESC
""").filter("rn <= 10")

# Single DF for patronage
scd_patronage_df = spark.sql(f"""
SELECT 'Patronage SCD' as table_name, SDP_Event_Created_Timestamp as dates, COUNT(*) as count, 
       ROW_NUMBER() OVER (ORDER BY SDP_Event_Created_Timestamp DESC) as rn
FROM {scd_patronage_tbl}
WHERE Batch_CD = 'SCD'
GROUP BY SDP_Event_Created_Timestamp
ORDER BY SDP_Event_Created_Timestamp DESC
""").filter("rn <= 10")

# Join side by side
scd_combined_df = (
    scd_staging_df.alias("s")
    .join(scd_patronage_df.alias("p"), "rn", "outer")
    .select(
        F.col("s.table_name").alias("Staging Table Name"),
        F.col("s.dates").alias("Staging Dates"),
        F.col("s.count").alias("Staging Count"),
        F.col("p.table_name").alias("Patronage Table Name"),
        F.col("p.dates").alias("Patronage Dates"),
        F.col("p.count").alias("Patronage Count")
    )
)

pipeline.log_message("SCD Comparison")
scd_combined_df.display()

In [0]:
# from pyspark.sql.types import StructType, StructField, StringType, BooleanType, TimestampType, DateType
# from datetime import datetime

# import patronage_modularized.transforms as transforms
# import patronage_modularized.state as state
# from patronage_modularized import config as pipeline

# # 1. Define Schemas
# # -----------------
# scd_schema = StructType([
#     StructField("PTCPNT_ID", StringType(), True),
#     StructField("CMBNED_DEGREE_DSBLTY", StringType(), True),
#     StructField("DSBL_DTR_DT", StringType(), True),
#     StructField("_metadata", StructType([
#         StructField("file_path", StringType(), True),
#         StructField("file_modification_time", TimestampType(), True)
#     ]), True)
# ])

# target_schema = StructType([
#     StructField("participant_id", StringType(), True),
#     StructField("SC_Combined_Disability_Percentage", StringType(), True),
#     StructField("PT_Indicator", StringType(), True),
#     StructField("Status_Begin_Date", StringType(), True),
#     StructField("Status_Last_Update", StringType(), True),
#     StructField("RecordStatus", BooleanType(), True),
#     StructField("Batch_CD", StringType(), True),
#     StructField("ICN", StringType(), True) # Needed for identity join
# ])

# pt_schema = StructType([
#     StructField("participant_id", StringType(), True),
#     StructField("pt_indicator_value", StringType(), True)
# ])

# identity_schema = StructType([
#     StructField("participant_id", StringType(), True),
#     StructField("ICN", StringType(), True)
# ])

# # 2. Create Dummy Data
# # --------------------

# # A. Source Data (Daily File)
# # - P001: Update SC % (50 -> 70). (Will also get PT update from PT Data) -> Scenario: Both Update
# # - P005: New Record (SC=20). (Not in PT Data) -> Scenario: New Record SC Only
# # - P005: Duplicate Record (to test dedupe)
# # - P006: Update SC % (30 -> 40). (Not in PT Data) -> Scenario: Update Disability% Only
# # - P007: No SC Change (10 -> 10). (Will get PT update from PT Data) -> Scenario: Update PT_Indicator Only (Direct)
# source_data = [
#     ("P001", "70", "11012025", ("dummy_file.csv", datetime.now())),
#     ("P005", "20", "11012025", ("dummy_file.csv", datetime.now())),
#     ("P005", "20", "11012025", ("dummy_file.csv", datetime.now())),
#     ("P006", "40", "11012025", ("dummy_file.csv", datetime.now())),
#     ("P007", "10", "11012025", ("dummy_file.csv", datetime.now()))
# ]
# dummy_source_df = spark.createDataFrame(source_data, scd_schema)

# # B. Target Data (Existing Delta Table)
# # - P001: Active, SC=050, PT=N (Will be updated by Source SC + PT Data)
# # - P002: Active, SC=030, PT=N (Will be backfilled by PT Data) -> Scenario: Update (Backfill) PT_Indicator
# # - P003: Active, SC=010, PT=N (No Change)
# # - P004: Active, SC=100, PT=Y (No Change)
# # - P006: Active, SC=030, PT=N (Will be updated by Source SC only)
# # - P007: Active, SC=010, PT=N (Will be updated by PT Data via Direct path)
# target_data = [
#     ("P001", "050", "N", "20240101", "20240101", True, "SCD", "ICN001"),

# target_data = [
#     ("P001", "050", "N", "20240101", "20240101", True, "SCD", "ICN001"),
#     ("P002", "030", "N", "20240101", "20240101", True, "SCD", "ICN002"),
#     ("P003", "010", "N", "20240101", "20240101", True, "SCD", "ICN003"),
# dummy_target_df = spark.createDataFrame(target_data, target_schema)

# # C. PT Data (Reference Table)
# # - P001: Y (Triggers PT update for P001, combined with Source SC update)
# # - P002: Y (Triggers Backfill for P002)
# # - P004: Y (Matches existing P004)
# # - P007: Y (Triggers PT update for P007 via Direct path)
# pt_data = [
#     ("P001", "Y"),
#     ("P002", "Y"),
#     ("P004", "Y"),
#     ("P007", "Y")
# ]
# dummy_pt_df = spark.createDataFrame(pt_data, pt_schema)

# # D. Identity Data (Mocked identity_lookup_table)
# # Maps participant_id to ICN
# identity_data = [
#     ("P001", "ICN001"),
# dummy_pt_df = spark.createDataFrame(pt_data, pt_schema)

# # D. Identity Data (Mocked identity_lookup_table)

# ]
# dummy_identity_df = spark.createDataFrame(identity_data, identity_schema)

# print("Dummy Data Created:")
# print(f"Source Records:   {dummy_source_df.count()} (Includes 1 duplicate)")
# print(f"Target Records:   {dummy_target_df.count()}")
#     ("P006", "ICN006"),
#     ("P007", "ICN007")
# ]
# # Override the pipeline's table name variable to point to our dummy view
# original_table_name = pipeline.PATRONAGE_TABLE_NAME
# pipeline.PATRONAGE_TABLE_NAME = dummy_target_view
# transforms.PATRONAGE_TABLE_NAME = dummy_target_view

# # Inject the dummy identity dataframe into the pipeline module
# # This fixes the AttributeError: 'NoneType' object has no attribute '_jdf'
# # The global variable in the module is 'identity_lookup_table', not 'filtered_identity'
# state.identity_lookup_table = dummy_identity_df

# print(f"Environment Mocked. Pipeline now points to '{dummy_target_view}' instead of '{original_table_name}'")
# print("Injected dummy_identity_df into state.identity_lookup_table")

In [0]:
# # 4. Execute Transformation Logic
# # -------------------------------
# print("Running transform_scd_data with dummy inputs...")

# # We pass 'update' mode to trigger the standard logic
# # Note: transform_scd_data returns the DataFrame of *changes* (records to be upserted/expired)
# result_df = transforms.transform_scd_data(dummy_source_df, dummy_pt_df, 'update')

# # Cache result for analysis
# result_df.cache()
# result_count = result_df.count()

# print(f"Transformation Complete. Found {result_count} records to process.")
# result_df.select("participant_id", "SC_Combined_Disability_Percentage", "target_SC_Combined_Disability_Percentage", "PT_Indicator", "target_PT_Indicator", "target_RecordStatus").display()

In [0]:
# # 5. Assertions and Validation
# # ----------------------------
# print("Validating Results...")

# # Collect results to a list of rows for easy checking
# results = {row['participant_id']: row for row in result_df.collect()}

# # Assertion 1: Count Check
# # Expected: 5 records
# # - P001 (Update SC + PT)
# # - P005 (New - Deduplicated)
# # - P002 (Backfill PT)
# # - P006 (Update SC Only)
# # - P007 (Update PT Only - Direct)
# assert result_count == 5, f"Expected 5 records, found {result_count}"
# print("Assertion Passed: Record count is 5.")

# # Assertion 2: Deduplication Check
# # P005 appeared twice in source, should appear once in result
# assert "P005" in results, "P005 missing from results"
# # Check if P005 is marked as New (target_RecordStatus should be Null)
# assert results["P005"]["target_RecordStatus"] is None, "P005 should be identified as a New Record"
# print("Assertion Passed: Deduplication worked (P005 present once).")

# # Assertion 3: Update Logic Check (Scenario: Update Both SC + PT)
# # P001 should have SC=070 (Source) and PT=Y (PT Data)
# p001 = results["P001"]
# assert p001["SC_Combined_Disability_Percentage"] == "070", f"P001 SC% mismatch. Expected 070, got {p001['SC_Combined_Disability_Percentage']}"
# assert p001["PT_Indicator"] == "Y", f"P001 PT mismatch. Expected Y, got {p001['PT_Indicator']}"
# print("Assertion Passed: Simultaneous Update (SC + PT) worked for P001.")

# # Assertion 4: Backfill Logic Check (Scenario: Update Backfill PT)
# # P002 should have SC=030 (Target) and PT=Y (PT Data)
# p002 = results["P002"]
# assert p002["SC_Combined_Disability_Percentage"] == "030", f"P002 SC% mismatch. Expected 030, got {p002['SC_Combined_Disability_Percentage']}"
# assert p002["PT_Indicator"] == "Y", f"P002 PT mismatch. Expected Y, got {p002['PT_Indicator']}"
# print("Assertion Passed: Backfill logic worked for P002.")

# # Assertion 5: No False Positives
# # P003 (No change) and P004 (No change) should NOT be in results
# assert "P003" not in results, "P003 should not be in results (No change)"
# assert "P004" not in results, "P004 should not be in results (No change)"
# print("Assertion Passed: Unchanged records correctly ignored.")

# # Assertion 6: New Record Logic Check (Scenario: Insert New Record)
# # P005 should have SC=020 (Source) and PT=N (Default)
# p005 = results["P005"]
# assert p005["SC_Combined_Disability_Percentage"] == "020", f"P005 SC% mismatch. Expected 020, got {p005['SC_Combined_Disability_Percentage']}"
# assert p005["PT_Indicator"] == "N", f"P005 PT mismatch. Expected N, got {p005['PT_Indicator']}"
# print("Assertion Passed: New Record (SC only) worked for P005.")

# # Assertion 7: Update SC Only Logic Check (Scenario: Update Disability% Only)
# # P006 should have SC=040 (Source) and PT=N (Target/Default)
# p006 = results["P006"]
# assert p006["SC_Combined_Disability_Percentage"] == "040", f"P006 SC% mismatch. Expected 040, got {p006['SC_Combined_Disability_Percentage']}"
# assert p006["PT_Indicator"] == "N", f"P006 PT mismatch. Expected N, got {p006['PT_Indicator']}"
# print("Assertion Passed: Update SC Only worked for P006.")

# # Assertion 8: Update PT Only (Direct) Logic Check (Scenario: Update PT_Indicator Only)
# # P007 should have SC=010 (Source/Target) and PT=Y (PT Data)
# p007 = results["P007"]
# assert p007["SC_Combined_Disability_Percentage"] == "010", f"P007 SC% mismatch. Expected 010, got {p007['SC_Combined_Disability_Percentage']}"
# assert p007["PT_Indicator"] == "Y", f"P007 PT mismatch. Expected Y, got {p007['PT_Indicator']}"
# print("Assertion Passed: Update PT Only (Direct) worked for P007.")

# # Cleanup
# # Restore original table name
# pipeline.PATRONAGE_TABLE_NAME = original_table_name
# print("\nTest Complete. Environment restored.")

In [0]:
# import pyspark.sql.functions as F
# from patronage_modularized import config as pipeline
# pipeline.log_message("\n" + "=" * 70)
# pipeline.log_message("DMDC FILE COMPARISON")
# pipeline.log_message("=" * 70)

# file_name = 'PATRONAGE_20251226.txt' # 'BACKFILL_EDIPI_PATRONAGE_20251226.txt'
# # File paths
# staging_path = f"/mnt/ci-patronage/dmdc_extracts/test/combined_export/{file_name}"
# patronage_path = f"/mnt/ci-patronage/dmdc_extracts/combined_export/{file_name}" # LAGGED_EDIPI_PATRONAGE_20251226.txt 

# pipeline.log_message(f"\nPatronage: {patronage_path}")
# pipeline.log_message(f"Staging: {staging_path}")

# try:
#     # Read both files as text
#     patronage_raw = spark.read.text(patronage_path)
#     staging_raw = spark.read.text(staging_path)
    
#     # Parse fixed-width format (42 chars total)
#     # EDIPI (chars 1-10) | Batch_CD (11-13) | Disability % (14-16) | 
#     # Status_Begin_Date (17-24) | PT (25) | Unemployability (26) | 
#     # Status_Last_Update (27-34) | Status_Termination_Date (35-42)
    
#     query_columns = (
#         F.trim(F.substring(F.col("value"), 1, 10)).alias("edipi"),
#         F.trim(F.substring(F.col("value"), 11, 3)).alias("batch_cd"),
#         F.trim(F.substring(F.col("value"), 14, 3)).alias("disability_pct"),
#         F.trim(F.substring(F.col("value"), 17, 8)).alias("status_begin_date"),
#         F.trim(F.substring(F.col("value"), 25, 1)).alias("pt_indicator"),
#         F.trim(F.substring(F.col("value"), 26, 1)).alias("unemployability"),
#         F.trim(F.substring(F.col("value"), 27, 8)).alias("status_last_update"),
#         F.trim(F.substring(F.col("value"), 35, 8)).alias("status_termination_date"),
#         F.col("value").alias("full_record")
#     )

#     filter_condition = (F.col("edipi").isNotNull() & (F.col("edipi") != ""))
#     patronage = (
#         patronage_raw
#         .select(*query_columns)
#         .filter(filter_condition)
#     )
    
#     staging = (
#         staging_raw
#         .select(*query_columns)
#         .filter(filter_condition)
#     )
    
#     patronage_count = patronage.count()
#     staging_count = staging.count()
    
#     pipeline.log_message(f"\nFile Counts:")
#     pipeline.log_message(f"  Patronage: {patronage_count:,} records")
#     pipeline.log_message(f"  Staging: {staging_count:,} records")
#     pipeline.log_message(f"  Difference: {abs(patronage_count - staging_count):,} records")
    
#     # Compare by EDIPI
#     # EDIPIs in patronage but not in staging
#     only_in_patronage = patronage.join(staging, "edipi", "left_anti")
    
#     # EDIPIs in staging but not in patronage
#     only_in_staging = staging.join(patronage, "edipi", "left_anti")
    
#     count_only_patronage = only_in_patronage.count()
#     count_only_staging = only_in_staging.count()
    
#     pipeline.log_message(f"\nEDIPI Comparison:")
#     pipeline.log_message(f"  Only in Patronage: {count_only_patronage:,} EDIPIs")
#     pipeline.log_message(f"  Only in Staging: {count_only_staging:,} EDIPIs")
    
#     if count_only_patronage > 0:
#         pipeline.log_message(f"\n--- {count_only_patronage:,} EDIPIs ONLY in Patronage ---")
#         only_in_patronage.select("*").display()
    
#     if count_only_staging > 0:
#         pipeline.log_message(f"\n--- {count_only_staging:,} EDIPIs ONLY in Staging ---")
#         only_in_staging.select("*").display()
    
#     if count_only_patronage == 0 and count_only_staging == 0:
#         pipeline.log_message(f"\nPerfect match! Both files have identical EDIPIs.")
#     else:
#         pipeline.log_message(f"\nMismatch: {count_only_patronage + count_only_staging:,} EDIPIs differ")
    
#     pipeline.log_message("=" * 70)
    
# except Exception as e:
#     pipeline.log_message(f"ERROR: {str(e)}", level='ERROR')

## Trace Participant IDs from Source to Destination

Use `trace_participant_id(...)` to locate a participant ID across: raw SCD file → identity lookup → duplicates → SCD staging → `patronage_unified`.

In [0]:
# from pyspark.sql.functions import col


# def trace_participant_id(
#     file_name: str,
#     participant_ids,
#     *,
#     sep: str = ",",
#     base_dir: str = "dbfs:/mnt/ci-vadir-shared",
#     scd_staging_delta_path: str = "/mnt/Patronage/SCD_Staging",
# ) -> None:
#     ids = [str(x) for x in participant_ids]
#     raw_path = file_name if file_name.startswith("dbfs:") else f"{base_dir.rstrip('/')}/{file_name.lstrip('/')}"

#     raw_df = (
#         spark.read.option("header", "true")
#         .option("sep", sep)
#         .option("inferSchema", "false")
#         .csv(raw_path)
#     )
#     raw_hits = raw_df.filter(col("PTCPNT_ID").cast("string").isin(ids))
#     print(f"Raw SCD file: {raw_path}")
#     print(f"Raw matches (PTCPNT_ID in {ids}): {raw_hits.count()}")
#     display(raw_hits)

#     identity_df = spark.read.format("delta").load(pipeline.IDENTITY_TABLE_PATH)
#     identity_hits = identity_df.filter(col("participant_id").cast("string").isin(ids))
#     print(f"\nIdentity lookup matches (participant_id in {ids}): {identity_hits.count()}")
#     display(identity_hits)

#     dup_df = spark.read.format("delta").load(pipeline.DUP_IDENTITY_TABLE_PATH)
#     dup_hits = dup_df.filter(col("TreatingFacilityPersonIdentifier").cast("string").isin(ids))
#     print(f"\nDuplicate correlations matches (TreatingFacilityPersonIdentifier in {ids}): {dup_hits.count()}")
#     display(dup_hits)

#     staging_df = spark.read.format("delta").load(scd_staging_delta_path)
#     staging_hits = staging_df.filter(col("participant_id").cast("string").isin(ids))
#     print(f"\nSCD staging matches (participant_id in {ids}): {staging_hits.count()}")
#     display(staging_hits)

#     patronage_df = spark.table("patronage_unified")
#     patronage_hits = patronage_df.filter(col("participant_id").cast("string").isin(ids))
#     print(f"\nPatronage unified matches (participant_id in {ids}): {patronage_hits.count()}")
#     display(patronage_hits)

# # Usage
# trace_participant_id("CPIDODIEX_20251227_spool.csv", ["67186394", "4744852"])


In [0]:
# %sql
# WITH staging AS (
#     SELECT 
#         edipi, participant_id, PT_Indicator, SC_Combined_Disability_Percentage
#     FROM delta.`/mnt/Patronage/SCD_Staging`
#     WHERE SDP_Event_Created_Timestamp = '2025-12-27T09:31:00.000+00:00'
#         AND Batch_CD = 'SCD'
# ),
# patronage AS (
#     SELECT 
#         edipi, participant_id, PT_Indicator, SC_Combined_Disability_Percentage
#     FROM patronage_unified
#     WHERE SDP_Event_Created_Timestamp = '2025-12-27T09:31:00.000+00:00'
#         AND Batch_CD = 'SCD'
#         AND RecordStatus = TRUE
# )
# SELECT 
#     COALESCE(s.participant_id, p.participant_id) as participant_id,
#     s.PT_Indicator as staging_PT,
#     p.PT_Indicator as patronage_PT,
#     CASE WHEN s.PT_Indicator != p.PT_Indicator THEN 'MISMATCH' ELSE 'MATCH' END as PT_Status,
#     s.SC_Combined_Disability_Percentage as staging_Disability_Pct,
#     p.SC_Combined_Disability_Percentage as patronage_Disability_Pct,
#     CASE WHEN s.SC_Combined_Disability_Percentage != p.SC_Combined_Disability_Percentage THEN 'MISMATCH' ELSE 'MATCH' END as Disability_Status
# FROM staging s
# FULL OUTER JOIN patronage p
# WHERE s.PT_Indicator != p.PT_Indicator 
#    OR s.SC_Combined_Disability_Percentage != p.SC_Combined_Disability_Percentage
# ORDER BY COALESCE(s.edipi, p.edipi)

In [0]:

# df = spark.sql("DESCRIBE DETAIL delta.`/mnt/ci-vba-edw-2/DeltaTables/DW_ADHOC_RECURR.DOD_PATRONAGE_SCD_PT/`")
# display(df.select(df.lastModified))

## Pipeline Log Dashboard — Core Queries

Use the queries below to power a 360° operational dashboard from the pipeline log table.

In [None]:
# =============================================================================
# PIPELINE LOG DASHBOARD — CORE QUERIES
# =============================================================================
# Dashboard-ready queries for Patronage pipeline logging
log_table = pipeline.PATRONAGE_PIPELINE_LOG_TABLE_NAME

# 1) Latest runs overview
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        processing_mode,
        status,
        duration_seconds
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 30
""").display()

# 2) Recent failures (for backtracing)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        processing_mode,
        status,
        error_message
    FROM {log_table}
    WHERE status = 'FAILED'
    ORDER BY run_timestamp_utc DESC
    LIMIT 30
""").display()

# 3) Input watermarks + file discovery summary
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(input_watermarks, '$.SCD_last_processed_ts') AS scd_last_processed_ts,
        get_json_object(input_watermarks, '$.CG_last_processed_ts') AS cg_last_processed_ts,
        get_json_object(file_discovery_detail, '$.total_scd_files') AS total_scd_files,
        get_json_object(file_discovery_detail, '$.total_cg_files') AS total_cg_files
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 30
""").display()

# 4) DMDC + EDIPI backfill + backup status
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(dmdc_export_stats, '$.record_count') AS dmdc_record_count,
        get_json_object(dmdc_export_stats, '$.filename') AS dmdc_filename,
        get_json_object(edipi_backfill_stats, '$.record_count') AS edipi_record_count,
        get_json_object(edipi_backfill_stats, '$.filename') AS edipi_filename,
        get_json_object(backup_stats, '$.status') AS backup_status
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 30
""").display()

## Pipeline Log Dashboard — Extended 360° Views

Run any section below to populate dashboard visuals and trend monitoring.

In [None]:
# =============================================================================
# PIPELINE LOG DASHBOARD — EXTENDED 360° VIEWS
# =============================================================================
# Extended dashboard queries for Patronage pipeline logging
log_table = pipeline.PATRONAGE_PIPELINE_LOG_TABLE_NAME

# 5) Run volume + success rate (last 90 days)
spark.sql(f"""
    SELECT
        DATE(run_timestamp_utc) AS run_date,
        COUNT(*) AS total_runs,
        SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_runs,
        SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_runs
    FROM {log_table}
    WHERE run_timestamp_utc >= date_sub(current_date(), 90)
    GROUP BY DATE(run_timestamp_utc)
    ORDER BY run_date DESC
""").display()

# 6) Processing mode split + duration trend
spark.sql(f"""
    SELECT
        processing_mode,
        COUNT(*) AS runs,
        ROUND(AVG(duration_seconds), 2) AS avg_duration_sec,
        ROUND(MAX(duration_seconds), 2) AS max_duration_sec
    FROM {log_table}
    GROUP BY processing_mode
    ORDER BY runs DESC
""").display()

# 7) Raw vs processed rows (SCD/CG) over time
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(records_processed_detail, '$.raw_scd_rows') AS raw_scd_rows,
        get_json_object(records_processed_detail, '$.processed_scd_rows') AS processed_scd_rows,
        get_json_object(records_processed_detail, '$.raw_cg_rows') AS raw_cg_rows,
        get_json_object(records_processed_detail, '$.processed_cg_rows') AS processed_cg_rows
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 100
""").display()

# 8) Active/New/Expired output counts by batch
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(output_row_counts_by_batch, '$.SCD.active') AS scd_active,
        get_json_object(output_row_counts_by_batch, '$.SCD.new') AS scd_new,
        get_json_object(output_row_counts_by_batch, '$.SCD.expired') AS scd_expired,
        get_json_object(output_row_counts_by_batch, '$.CG.active') AS cg_active,
        get_json_object(output_row_counts_by_batch, '$.CG.new') AS cg_new,
        get_json_object(output_row_counts_by_batch, '$.CG.expired') AS cg_expired
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 100
""").display()

# 9) Scheduling events overview (DMDC, EDIPI, Backups)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(scheduled_tasks_detail, '$.dmdc_export_triggered') AS dmdc_triggered,
        get_json_object(scheduled_tasks_detail, '$.edipi_backfill_triggered') AS edipi_triggered,
        get_json_object(scheduled_tasks_detail, '$.backup_triggered') AS backup_triggered,
        get_json_object(dmdc_export_stats, '$.record_count') AS dmdc_records,
        get_json_object(edipi_backfill_stats, '$.record_count') AS edipi_records,
        get_json_object(backup_stats, '$.status') AS backup_status
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 100
""").display()

# 10) File discovery volumes vs watermarks
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(input_watermarks, '$.SCD_last_processed_ts') AS scd_watermark,
        get_json_object(input_watermarks, '$.CG_last_processed_ts') AS cg_watermark,
        get_json_object(file_discovery_detail, '$.total_scd_files') AS scd_files,
        get_json_object(file_discovery_detail, '$.total_cg_files') AS cg_files
    FROM {log_table}
    ORDER BY run_timestamp_utc DESC
    LIMIT 100
""").display()

# 11) Runtime outliers (top 20 longest runs)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        processing_mode,
        status,
        duration_seconds,
        error_message
    FROM {log_table}
    ORDER BY duration_seconds DESC
    LIMIT 20
""").display()

## Pipeline Log Dashboard — Daily Aggregates & Operational Health

These queries provide daily rollups and last-known statuses for exports, backfills, and backups.

In [None]:
# =============================================================================
# PIPELINE LOG DASHBOARD — DAILY AGGREGATES & HEALTH
# =============================================================================
# Daily aggregates and operational health queries
log_table = pipeline.PATRONAGE_PIPELINE_LOG_TABLE_NAME

# A) Daily run health (success/fail + avg duration)
spark.sql(f"""
    SELECT
        DATE(run_timestamp_utc) AS run_date,
        COUNT(*) AS total_runs,
        SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_runs,
        SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_runs,
        ROUND(AVG(duration_seconds), 2) AS avg_duration_sec
    FROM {log_table}
    GROUP BY DATE(run_timestamp_utc)
    ORDER BY run_date DESC
""").display()

# B) Daily raw vs processed row totals
spark.sql(f"""
    SELECT
        DATE(run_timestamp_utc) AS run_date,
        SUM(CAST(get_json_object(records_processed_detail, '$.raw_scd_rows') AS BIGINT)) AS raw_scd_rows,
        SUM(CAST(get_json_object(records_processed_detail, '$.processed_scd_rows') AS BIGINT)) AS processed_scd_rows,
        SUM(CAST(get_json_object(records_processed_detail, '$.raw_cg_rows') AS BIGINT)) AS raw_cg_rows,
        SUM(CAST(get_json_object(records_processed_detail, '$.processed_cg_rows') AS BIGINT)) AS processed_cg_rows
    FROM {log_table}
    GROUP BY DATE(run_timestamp_utc)
    ORDER BY run_date DESC
""").display()

# C) Latest successful pipeline run (timestamp + mode)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        processing_mode,
        duration_seconds
    FROM {log_table}
    WHERE status = 'SUCCESS'
    ORDER BY run_timestamp_utc DESC
    LIMIT 1
""").display()

# D) Latest DMDC export run (if triggered)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(dmdc_export_stats, '$.record_count') AS record_count,
        get_json_object(dmdc_export_stats, '$.filename') AS filename
    FROM {log_table}
    WHERE get_json_object(dmdc_export_stats, '$.triggered') = 'true'
    ORDER BY run_timestamp_utc DESC
    LIMIT 1
""").display()

# E) Latest EDIPI backfill run (if triggered)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(edipi_backfill_stats, '$.record_count') AS record_count,
        get_json_object(edipi_backfill_stats, '$.filename') AS filename
    FROM {log_table}
    WHERE get_json_object(edipi_backfill_stats, '$.triggered') = 'true'
    ORDER BY run_timestamp_utc DESC
    LIMIT 1
""").display()

# F) Latest monthly backup run (if triggered)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(backup_stats, '$.status') AS backup_status
    FROM {log_table}
    WHERE get_json_object(backup_stats, '$.triggered') = 'true'
    ORDER BY run_timestamp_utc DESC
    LIMIT 1
""").display()

# G) Latest run with unprocessed files (potential backlog)
spark.sql(f"""
    SELECT
        run_timestamp_utc,
        get_json_object(file_discovery_detail, '$.total_scd_files') AS total_scd_files,
        get_json_object(file_discovery_detail, '$.total_cg_files') AS total_cg_files
    FROM {log_table}
    WHERE CAST(get_json_object(file_discovery_detail, '$.total_scd_files') AS INT) > 0
       OR CAST(get_json_object(file_discovery_detail, '$.total_cg_files') AS INT) > 0
    ORDER BY run_timestamp_utc DESC
    LIMIT 10
""").display()