# Unified ETL Pipeline - ConnectWise PSA

This notebook provides a complete ETL pipeline for ConnectWise data using the unified framework.
It supports both full refresh and incremental processing modes.

## Features:
- Bronze layer: API extraction with validation
- Silver layer: Schema transformation and flattening
- Gold layer: Business logic and fact tables
- Incremental processing: Only process changed records
- Dimension generation: Create dimension tables from enums

## 1. Install Dependencies

Install the unified ETL packages from wheel files:

In [None]:
# Install the wheel files
%pip install /lakehouse/default/Files/unified_etl_core-1.0.0-py3-none-any.whl
%pip install /lakehouse/default/Files/unified_etl_connectwise-1.0.0-py3-none-any.whl

## 2. Configuration and Setup

In [None]:
import os
import logging
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Get Spark session
spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
print(f"Spark version: {spark.version}")

# Configuration parameters
INCREMENTAL_MODE = True  # Set to False for full refresh
DAYS_TO_REFRESH = 30    # How many days back to refresh for incremental
PROCESS_LAYERS = ["bronze", "silver", "gold"]  # Which layers to process

# Entity configuration
ENTITIES = [
    "Agreement",
    "TimeEntry", 
    "ExpenseEntry",
    "ProductItem",
    "PostedInvoice",
    "UnpostedInvoice"
]

# Endpoint mapping
ENTITY_ENDPOINTS = {
    "Agreement": "/finance/agreements",
    "TimeEntry": "/time/entries",
    "ExpenseEntry": "/expense/entries",
    "ProductItem": "/procurement/products",
    "PostedInvoice": "/finance/invoices/posted",
    "UnpostedInvoice": "/finance/invoices",
}

print(f"Configuration:")
print(f"  Mode: {'Incremental' if INCREMENTAL_MODE else 'Full Refresh'}")
print(f"  Days to refresh: {DAYS_TO_REFRESH}")
print(f"  Layers: {PROCESS_LAYERS}")
print(f"  Entities: {len(ENTITIES)}")

## 3. Check Existing Tables

In [None]:
def check_existing_tables():
    """Check what tables exist in each layer."""
    table_info = {}
    
    for schema in ["bronze", "silver", "gold"]:
        try:
            tables = spark.sql(f"SHOW TABLES IN {schema}").collect()
            schema_tables = {}
            
            for row in tables:
                table_name = row.tableName
                full_name = f"{schema}.{table_name}"
                try:
                    count = spark.sql(f"SELECT COUNT(*) FROM {full_name}").collect()[0][0]
                    schema_tables[table_name] = count
                except:
                    schema_tables[table_name] = None
                    
            table_info[schema] = schema_tables
            print(f"\n{schema.upper()} schema: {len(tables)} tables")
            for table, count in sorted(schema_tables.items()):
                if count is not None:
                    print(f"  {table}: {count:,} rows")
                else:
                    print(f"  {table}: <error counting>")
        except Exception as e:
            print(f"\n{schema.upper()} schema: Not found or error - {str(e)}")
            table_info[schema] = {}
    
    return table_info

print("=== Existing Table Status ===")
existing_tables = check_existing_tables()

## 4. Bronze Layer - Extract from ConnectWise API

In [None]:
from unified_etl_connectwise import ConnectWiseClient
from unified_etl_connectwise.api_utils import build_condition_string

def extract_bronze_data(incremental=True, days_back=30):
    """Extract data from ConnectWise API to Bronze layer."""
    client = ConnectWiseClient()
    results = {}
    
    # Calculate date threshold for incremental
    if incremental:
        since_date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%d")
        print(f"\nIncremental mode: Fetching data since {since_date}")
    else:
        print(f"\nFull refresh mode: Fetching all data")
    
    for entity_name, endpoint in ENTITY_ENDPOINTS.items():
        print(f"\nExtracting {entity_name} from {endpoint}...")
        
        try:
            # Build conditions based on entity type and mode
            conditions = None
            order_by = "id"
            
            if incremental:
                if entity_name in ["TimeEntry", "ExpenseEntry"]:
                    conditions = build_condition_string(date_entered_gte=since_date)
                    order_by = "dateEntered desc"
                else:
                    conditions = f"(lastUpdated>=[{since_date}])"
                    order_by = "lastUpdated desc"
            
            # Extract data
            df = client.extract(
                endpoint=endpoint,
                conditions=conditions,
                order_by=order_by,
                page_size=1000
            )
            
            # Add ETL metadata
            df = df.withColumn("_etl_timestamp", F.current_timestamp())
            df = df.withColumn("_etl_source", F.lit("connectwise"))
            df = df.withColumn("_etl_batch_id", F.lit(datetime.now().strftime("%Y%m%d_%H%M%S")))
            
            record_count = df.count()
            results[entity_name] = {
                "df": df,
                "count": record_count,
                "success": True
            }
            print(f"  ✅ Extracted {record_count:,} records")
            
        except Exception as e:
            results[entity_name] = {
                "df": None,
                "count": 0,
                "success": False,
                "error": str(e)
            }
            print(f"  ❌ Failed: {str(e)}")
    
    return results

# Execute Bronze extraction
if "bronze" in PROCESS_LAYERS:
    print("\n=== BRONZE LAYER PROCESSING ===")
    bronze_results = extract_bronze_data(incremental=INCREMENTAL_MODE, days_back=DAYS_TO_REFRESH)
    
    # Summary
    print("\nBronze Extraction Summary:")
    total_records = sum(r["count"] for r in bronze_results.values() if r["success"])
    print(f"  Total records extracted: {total_records:,}")
else:
    print("\n⏭️  Skipping Bronze layer")

## 5. Write Bronze Data with MERGE

In [None]:
def write_bronze_data(results, incremental=True):
    """Write extracted data to Bronze tables using MERGE for incremental."""
    write_summary = {}
    
    for entity_name, result in results.items():
        if not result["success"] or result["count"] == 0:
            continue
            
        bronze_table = f"bronze.bronze_cw_{entity_name.lower()}"
        df = result["df"]
        
        try:
            # Check if table exists
            table_exists = spark.catalog.tableExists(bronze_table)
            
            if incremental and table_exists:
                # Deduplicate incoming data
                df_deduped = df.dropDuplicates(["id"])
                if df_deduped.count() < df.count():
                    print(f"\n{entity_name}: Removed {df.count() - df_deduped.count()} duplicates")
                
                # Use MERGE for incremental updates
                temp_view = f"temp_{entity_name.lower()}_updates"
                df_deduped.createOrReplaceTempView(temp_view)
                
                # Get columns for merge
                merge_key = "id"
                update_cols = [col for col in df.columns if col != merge_key]
                update_expr = ", ".join([f"target.{col} = source.{col}" for col in update_cols])
                insert_cols = ", ".join(df.columns)
                insert_values = ", ".join([f"source.{col}" for col in df.columns])
                
                merge_sql = f"""
                MERGE INTO {bronze_table} AS target
                USING {temp_view} AS source
                ON target.{merge_key} = source.{merge_key}
                WHEN MATCHED THEN 
                    UPDATE SET {update_expr}
                WHEN NOT MATCHED THEN
                    INSERT ({insert_cols}) VALUES ({insert_values})
                """
                
                spark.sql(merge_sql)
                spark.catalog.dropTempView(temp_view)
                
                # Get final count
                final_count = spark.sql(f"SELECT COUNT(*) FROM {bronze_table}").collect()[0][0]
                write_summary[entity_name] = {
                    "mode": "merge",
                    "records_processed": df_deduped.count(),
                    "final_count": final_count
                }
                print(f"  ✅ Merged {df_deduped.count()} records into {bronze_table} (total: {final_count:,})")
                
            else:
                # Full overwrite or new table
                df_deduped = df.dropDuplicates(["id"])
                df_deduped.write.mode("overwrite").saveAsTable(bronze_table)
                
                write_summary[entity_name] = {
                    "mode": "overwrite",
                    "records_processed": df_deduped.count(),
                    "final_count": df_deduped.count()
                }
                print(f"  ✅ Wrote {df_deduped.count()} records to {bronze_table}")
                
        except Exception as e:
            write_summary[entity_name] = {"error": str(e)}
            print(f"  ❌ Failed to write {entity_name}: {str(e)}")
    
    return write_summary

# Write Bronze data
if "bronze" in PROCESS_LAYERS and 'bronze_results' in locals():
    print("\n=== Writing Bronze Data ===")
    bronze_write_summary = write_bronze_data(bronze_results, incremental=INCREMENTAL_MODE)
    
    # Store entities that were updated for Silver processing
    bronze_updated_entities = [name for name, summary in bronze_write_summary.items() 
                              if "error" not in summary]
    print(f"\nBronze updated entities: {bronze_updated_entities}")

## 6. Silver Layer - Transform and Validate

In [None]:
from unified_etl_connectwise.config import SILVER_CONFIG
from unified_etl_core.silver import apply_silver_transformations
import unified_etl_connectwise.models.models as cw_models

def process_silver_layer(entities_to_process=None, incremental=True):
    """Process Bronze to Silver transformations."""
    
    # Model mapping
    model_mapping = {
        "Agreement": cw_models.Agreement,
        "TimeEntry": cw_models.TimeEntry,
        "ExpenseEntry": cw_models.ExpenseEntry,
        "UnpostedInvoice": cw_models.Invoice,
        "PostedInvoice": cw_models.Invoice,
        "ProductItem": cw_models.ProductItem,
    }
    
    # If no specific entities provided, process all configured ones
    if entities_to_process is None:
        entities_to_process = list(SILVER_CONFIG["entities"].keys())
    
    silver_summary = {}
    silver_changes = {}  # Track what changed for Gold processing
    
    for entity_name in entities_to_process:
        if entity_name not in SILVER_CONFIG["entities"]:
            print(f"\n⚠️  {entity_name} not configured for Silver processing")
            continue
            
        entity_config = SILVER_CONFIG["entities"][entity_name]
        bronze_table = entity_config["bronze_table"]
        silver_table = entity_config["silver_table"]
        
        # Check if we're using schema prefix
        if not bronze_table.startswith("bronze."):
            bronze_table = f"bronze.{bronze_table}"
        if not silver_table.startswith("silver."):
            silver_table = f"silver.{silver_table}"
        
        print(f"\nProcessing {entity_name}: {bronze_table} → {silver_table}")
        
        try:
            # Read Bronze data
            if incremental and spark.catalog.tableExists(silver_table):
                # Get last refresh timestamp from Silver table
                last_refresh = spark.sql(f"""
                    SELECT MAX(_etl_timestamp) as last_refresh 
                    FROM {silver_table}
                """).collect()[0][0]
                
                if last_refresh:
                    # Only process new/changed records
                    bronze_df = spark.sql(f"""
                        SELECT * FROM {bronze_table}
                        WHERE _etl_timestamp > '{last_refresh}'
                    """)
                    print(f"  Incremental: Processing records since {last_refresh}")
                else:
                    bronze_df = spark.table(bronze_table)
                    print(f"  No timestamp found, processing all records")
            else:
                # Full refresh
                bronze_df = spark.table(bronze_table)
                print(f"  Full refresh: Processing all records")
            
            record_count = bronze_df.count()
            if record_count == 0:
                print(f"  No new records to process")
                continue
                
            print(f"  Found {record_count:,} records to process")
            
            # Track changed IDs for Gold processing
            silver_changes[entity_name] = bronze_df.select("id").distinct()
            
            # Get model class
            model_class = model_mapping.get(entity_name)
            if not model_class:
                print(f"  ⚠️  No model class found for {entity_name}")
                continue
            
            # Apply Silver transformations
            silver_df = apply_silver_transformations(
                df=bronze_df,
                entity_config=entity_config,
                model_class=model_class,
            )
            
            # Write to Silver
            if incremental and spark.catalog.tableExists(silver_table):
                # Use MERGE for incremental
                temp_view = f"temp_silver_{entity_name.lower()}"
                silver_df.createOrReplaceTempView(temp_view)
                
                business_keys = entity_config.get("business_keys", ["id"])
                merge_conditions = " AND ".join([f"target.{key} = source.{key}" for key in business_keys])
                
                merge_sql = f"""
                MERGE INTO {silver_table} AS target
                USING {temp_view} AS source
                ON {merge_conditions}
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
                """
                
                spark.sql(merge_sql)
                spark.catalog.dropTempView(temp_view)
                
                final_count = spark.sql(f"SELECT COUNT(*) FROM {silver_table}").collect()[0][0]
                print(f"  ✅ Merged {record_count} records into {silver_table} (total: {final_count:,})")
            else:
                # Full overwrite
                silver_df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(silver_table)
                print(f"  ✅ Wrote {silver_df.count()} records to {silver_table}")
            
            silver_summary[entity_name] = {
                "success": True,
                "records_processed": record_count,
                "columns": len(silver_df.columns)
            }
            
        except Exception as e:
            silver_summary[entity_name] = {"success": False, "error": str(e)}
            print(f"  ❌ Failed: {str(e)}")
    
    return silver_summary, silver_changes

# Execute Silver processing
if "silver" in PROCESS_LAYERS:
    print("\n=== SILVER LAYER PROCESSING ===")
    
    # Determine which entities to process
    if 'bronze_updated_entities' in locals() and INCREMENTAL_MODE:
        # Only process entities that had Bronze updates
        entities_for_silver = bronze_updated_entities
    else:
        # Process all entities
        entities_for_silver = None
    
    silver_summary, silver_changes = process_silver_layer(
        entities_to_process=entities_for_silver,
        incremental=INCREMENTAL_MODE
    )
    
    # Summary
    print("\nSilver Processing Summary:")
    for entity, summary in silver_summary.items():
        if summary.get("success"):
            print(f"  {entity}: ✅ {summary['records_processed']} records, {summary['columns']} columns")
        else:
            print(f"  {entity}: ❌ {summary.get('error', 'Unknown error')}")
else:
    print("\n⏭️  Skipping Silver layer")
    silver_changes = {}

## 7. Gold Layer - Create Fact Tables

In [None]:
from unified_etl_connectwise.transforms import (
    create_agreement_period_fact,
    create_expense_entry_fact,
    create_invoice_line_fact,
    create_time_entry_fact,
)

def process_gold_layer(silver_changes=None, incremental=True):
    """Create Gold fact tables with business logic."""
    gold_summary = {}
    
    # Time Entry Facts
    if not silver_changes or "TimeEntry" in silver_changes or "Agreement" in silver_changes:
        print("\n📊 Creating Time Entry Facts...")
        try:
            # Read required tables
            time_entry_df = spark.table("silver.silver_cw_timeentry")
            agreement_df = spark.table("silver.silver_cw_agreement") if spark.catalog.tableExists("silver.silver_cw_agreement") else None
            member_df = spark.table("silver.silver_cw_member") if spark.catalog.tableExists("silver.silver_cw_member") else None
            
            # Filter to changed records if incremental
            if incremental and silver_changes and "TimeEntry" in silver_changes:
                changed_ids = silver_changes["TimeEntry"]
                changed_ids.createOrReplaceTempView("temp_changed_time_ids")
                time_entry_df = spark.sql("""
                    SELECT t.* FROM silver.silver_cw_timeentry t
                    JOIN temp_changed_time_ids c ON t.id = c.id
                """)
                print(f"  Processing {time_entry_df.count()} changed time entries")
            
            # Create fact table
            fact_df = create_time_entry_fact(
                spark=spark,
                time_entry_df=time_entry_df,
                agreement_df=agreement_df,
                member_df=member_df,
            )
            
            # Write fact table
            if incremental and spark.catalog.tableExists("gold.gold_fact_time_entry"):
                # MERGE for incremental
                fact_df.createOrReplaceTempView("temp_fact_time_entry")
                spark.sql("""
                    MERGE INTO gold.gold_fact_time_entry AS target
                    USING temp_fact_time_entry AS source
                    ON target.timeEntryId = source.timeEntryId
                    WHEN MATCHED THEN UPDATE SET *
                    WHEN NOT MATCHED THEN INSERT *
                """)
                final_count = spark.sql("SELECT COUNT(*) FROM gold.gold_fact_time_entry").collect()[0][0]
                print(f"  ✅ Time Entry Facts: Merged {fact_df.count()} records (total: {final_count:,})")
            else:
                fact_df.write.mode("overwrite").saveAsTable("gold.gold_fact_time_entry")
                print(f"  ✅ Time Entry Facts: Created with {fact_df.count():,} records")
            
            gold_summary["time_entry_fact"] = {"success": True, "count": fact_df.count()}
            
        except Exception as e:
            gold_summary["time_entry_fact"] = {"success": False, "error": str(e)}
            print(f"  ❌ Failed: {str(e)}")
    
    # Invoice Line Facts (includes uninvoiced billable work)
    if not silver_changes or any(e in silver_changes for e in ["TimeEntry", "UnpostedInvoice", "PostedInvoice", "ProductItem"]):
        print("\n💰 Creating Invoice Line Facts...")
        try:
            # Read required tables
            invoice_df = spark.table("silver.silver_cw_invoice")
            time_entry_df = spark.table("silver.silver_cw_timeentry") if spark.catalog.tableExists("silver.silver_cw_timeentry") else None
            product_df = spark.table("silver.silver_cw_productitem") if spark.catalog.tableExists("silver.silver_cw_productitem") else None
            agreement_df = spark.table("silver.silver_cw_agreement") if spark.catalog.tableExists("silver.silver_cw_agreement") else None
            
            # Create fact table (now includes uninvoiced entries)
            fact_df = create_invoice_line_fact(
                spark=spark,
                invoice_df=invoice_df,
                time_entry_df=time_entry_df,
                product_df=product_df,
                agreement_df=agreement_df,
            )
            
            # Write fact table
            fact_df.write.mode("overwrite").saveAsTable("gold.gold_fact_invoice_line")
            print(f"  ✅ Invoice Line Facts: Created with {fact_df.count():,} records (includes uninvoiced)")
            
            gold_summary["invoice_line_fact"] = {"success": True, "count": fact_df.count()}
            
        except Exception as e:
            gold_summary["invoice_line_fact"] = {"success": False, "error": str(e)}
            print(f"  ❌ Failed: {str(e)}")
    
    # Agreement Period Facts
    if not silver_changes or "Agreement" in silver_changes:
        print("\n📅 Creating Agreement Period Facts...")
        try:
            agreement_df = spark.table("silver.silver_cw_agreement")
            
            # Create monthly period facts
            fact_df = create_agreement_period_fact(
                spark=spark,
                agreement_df=agreement_df,
                config={"start_date": "2020-01-01", "frequency": "month"}
            )
            
            # Write fact table
            fact_df.write.mode("overwrite").saveAsTable("gold.gold_fact_agreement_period")
            print(f"  ✅ Agreement Period Facts: Created with {fact_df.count():,} records")
            
            gold_summary["agreement_period_fact"] = {"success": True, "count": fact_df.count()}
            
        except Exception as e:
            gold_summary["agreement_period_fact"] = {"success": False, "error": str(e)}
            print(f"  ❌ Failed: {str(e)}")
    
    # Expense Entry Facts
    if not silver_changes or "ExpenseEntry" in silver_changes:
        print("\n💳 Creating Expense Entry Facts...")
        try:
            expense_df = spark.table("silver.silver_cw_expenseentry")
            agreement_df = spark.table("silver.silver_cw_agreement") if spark.catalog.tableExists("silver.silver_cw_agreement") else None
            
            # Create fact table
            fact_df = create_expense_entry_fact(
                spark=spark,
                expense_df=expense_df,
                agreement_df=agreement_df,
            )
            
            # Write fact table
            fact_df.write.mode("overwrite").saveAsTable("gold.gold_fact_expense_entry")
            print(f"  ✅ Expense Entry Facts: Created with {fact_df.count():,} records")
            
            gold_summary["expense_entry_fact"] = {"success": True, "count": fact_df.count()}
            
        except Exception as e:
            gold_summary["expense_entry_fact"] = {"success": False, "error": str(e)}
            print(f"  ❌ Failed: {str(e)}")
    
    return gold_summary

# Execute Gold processing
if "gold" in PROCESS_LAYERS:
    print("\n=== GOLD LAYER PROCESSING ===")
    gold_summary = process_gold_layer(
        silver_changes=silver_changes if INCREMENTAL_MODE else None,
        incremental=INCREMENTAL_MODE
    )
    
    # Summary
    print("\nGold Processing Summary:")
    for fact_name, summary in gold_summary.items():
        if summary.get("success"):
            print(f"  {fact_name}: ✅ {summary['count']:,} records")
        else:
            print(f"  {fact_name}: ❌ {summary.get('error', 'Unknown error')}")
else:
    print("\n⏭️  Skipping Gold layer")

## 8. Generate Dimensions

In [None]:
from unified_etl_core.date_utils import generate_date_dimension
from unified_etl_core.dimensions import create_dimension_from_column

def generate_dimensions():
    """Generate dimension tables from Silver data."""
    
    # Generate date dimension if it doesn't exist
    if not spark.catalog.tableExists("gold.gold_dim_date"):
        print("\n📅 Creating Date Dimension...")
        try:
            date_dim = generate_date_dimension(
                spark=spark,
                start_date="2020-01-01",
                end_date="2030-12-31",
                fiscal_year_start_month=7,
            )
            date_dim.write.mode("overwrite").saveAsTable("gold.gold_dim_date")
            print(f"  ✅ Created dim_date with {date_dim.count():,} records")
        except Exception as e:
            print(f"  ❌ Failed: {str(e)}")
    
    # Define dimensions to generate
    dimension_configs = [
        # From TimeEntry
        {"source_table": "silver_cw_timeentry", "column": "billableOption", "dimension_name": "dim_billable_option"},
        {"source_table": "silver_cw_timeentry", "column": "status", "dimension_name": "dim_time_status"},
        {"source_table": "silver_cw_timeentry", "column": "chargeToType", "dimension_name": "dim_charge_type"},
        # From Agreement
        {"source_table": "silver_cw_agreement", "column": "agreementStatus", "dimension_name": "dim_agreement_status"},
        {"source_table": "silver_cw_agreement", "column": "billCycleIdentifier", "dimension_name": "dim_bill_cycle"},
        # From Invoice
        {"source_table": "silver_cw_invoice", "column": "statusName", "dimension_name": "dim_invoice_status"},
        # From ExpenseEntry
        {"source_table": "silver_cw_expenseentry", "column": "typeName", "dimension_name": "dim_expense_type"},
    ]
    
    print("\n🔷 Generating Dimension Tables...")
    dimension_summary = {}
    
    for config in dimension_configs:
        table_name = f"gold.{config['dimension_name']}"
        print(f"\nCreating {config['dimension_name']}...")
        
        try:
            dim_df = create_dimension_from_column(
                spark=spark,
                source_table=config["source_table"],
                column_name=config["column"],
                dimension_name=config["dimension_name"],
                include_counts=True,
            )
            
            dim_df.write.mode("overwrite").saveAsTable(table_name)
            count = dim_df.count()
            dimension_summary[config["dimension_name"]] = count
            print(f"  ✅ Created with {count} unique values")
            
        except Exception as e:
            dimension_summary[config["dimension_name"]] = "error"
            print(f"  ❌ Failed: {str(e)}")
    
    return dimension_summary

# Generate dimensions
if "gold" in PROCESS_LAYERS:
    print("\n=== DIMENSION GENERATION ===")
    dimension_summary = generate_dimensions()
else:
    print("\n⏭️  Skipping dimension generation")

## 9. Final Summary and Data Quality Checks

In [None]:
print("\n" + "=" * 80)
print("📊 ETL PIPELINE COMPLETE - FINAL SUMMARY")
print("=" * 80)

# Check final table counts
final_tables = check_existing_tables()

# Calculate totals
for schema, tables in final_tables.items():
    if tables:
        total_rows = sum(count for count in tables.values() if count is not None)
        print(f"\n{schema.upper()}: {len(tables)} tables, {total_rows:,} total rows")

# Data quality checks
print("\n🔍 Data Quality Checks:")

# Check for uninvoiced revenue
if spark.catalog.tableExists("gold.gold_fact_invoice_line"):
    uninvoiced_revenue = spark.sql("""
        SELECT 
            COUNT(*) as uninvoiced_lines,
            SUM(lineAmount) as uninvoiced_amount
        FROM gold.gold_fact_invoice_line
        WHERE invoiceId IS NULL
    """).collect()[0]
    
    print(f"\n💰 Uninvoiced Revenue:")
    print(f"  Lines: {uninvoiced_revenue['uninvoiced_lines']:,}")
    print(f"  Amount: ${uninvoiced_revenue['uninvoiced_amount']:,.2f}" if uninvoiced_revenue['uninvoiced_amount'] else "  Amount: $0.00")

# Check MRR trends
if spark.catalog.tableExists("gold.gold_fact_agreement_period"):
    mrr_trend = spark.sql("""
        SELECT 
            DATE_FORMAT(period_start, 'yyyy-MM') as month,
            SUM(monthly_revenue) as mrr,
            COUNT(DISTINCT id) as active_agreements
        FROM gold.gold_fact_agreement_period
        WHERE is_active_period = true
        GROUP BY DATE_FORMAT(period_start, 'yyyy-MM')
        ORDER BY month DESC
        LIMIT 3
    """).collect()
    
    print("\n📈 Recent MRR Trends:")
    for row in mrr_trend:
        print(f"  {row['month']}: ${row['mrr']:,.2f} ({row['active_agreements']} agreements)")

# Save refresh timestamp
refresh_timestamp = datetime.now().isoformat()
spark.conf.set("spark.unified_etl.last_refresh", refresh_timestamp)
print(f"\n⏰ Refresh completed at: {refresh_timestamp}")
print(f"Mode: {'Incremental' if INCREMENTAL_MODE else 'Full Refresh'}")

print("\n✅ Pipeline execution complete!")