# Simulate Late-Arriving Data

This notebook simulates late-arriving orders by adding new data to source tables for specific dates.

## Purpose:
Demonstrates the value of insert_overwrite by showing how to efficiently refresh only affected dates.

## Flow:
1. Run dbt initially (creates mart tables with all data)
2. Run this notebook (adds late orders to randomly selected dates in source)
3. Run dbt with --full-refresh (insert_overwrite refreshes only those dates in mart)

## Parameters:
- `catalog`: Target Unity Catalog (default: main)
- `schema`: Schema name (default: your_schema)

## Auto-calculation:
The notebook automatically detects the date range from source tables and randomly selects 3 dates.
Each date gets a random number of late-arriving orders (between 20-100) to simulate realistic scenarios.
This ensures the demo works regardless of when you run it and creates truly random test scenarios.

In [None]:
# Get parameters
dbutils.widgets.text("catalog", "main", "Catalog Name")
dbutils.widgets.text("schema", "your_schema", "Schema Name")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")

# Auto-calculate target dates based on actual data range
print("üîÑ Auto-calculating target dates based on actual data range...")

# Get the actual date range from source table
date_range_result = spark.sql(f"""
    SELECT 
        MIN(order_date) as min_date,
        MAX(order_date) as max_date,
        DATEDIFF(MAX(order_date), MIN(order_date)) as days_span
    FROM {catalog}.{schema}.orders_partitioned
""").collect()[0]

min_date = date_range_result['min_date']
max_date = date_range_result['max_date']
days_span = date_range_result['days_span']

print(f"  Data range: {min_date} to {max_date} ({days_span} days)")

# Randomly select 3 dates within the range, excluding today and recent days
# Strategy: Pick 3 random offsets and ensure they're unique
# Exclude the last 2 days to avoid today and very recent dates with potential ongoing transactions
from datetime import timedelta, date
import random

# Calculate how many days to exclude from the end
today = date.today()
days_to_exclude = max(0, (max_date - today).days + 2)  # Exclude last 2 days including today
available_days = max(1, days_span - days_to_exclude + 1)  # Ensure at least 1 day available

# Generate 3 unique random day offsets from the available range
num_dates_to_select = min(3, available_days)  # Can't select more than available
random_offsets = random.sample(range(available_days), num_dates_to_select)
random_offsets.sort()  # Sort for cleaner output

# Calculate the actual dates
selected_dates = [min_date + timedelta(days=offset) for offset in random_offsets]
target_dates = [str(date) for date in selected_dates]

print(f"  ‚úÖ Randomly selected {num_dates_to_select} dates: {', '.join(target_dates)}")
print(f"     (offsets: {', '.join(str(o) for o in random_offsets)} days from start)")
print(f"     (excluded last {days_to_exclude} days to avoid today and recent dates)")

# For each date, we'll append a random number of new orders (20-100)
# This simulates late-arriving data being added to existing partitions
# The approach: append mode adds NEW orders on top of existing data
orders_to_add_per_date = {
    date: random.randint(20, 100) for date in target_dates
}

print(f"\nCatalog: {catalog}")
print(f"Schema: {schema}")
print(f"Target Dates: {target_dates}")
print(f"New orders to add per date (late arrivals):")
for date, count in orders_to_add_per_date.items():
    print(f"  {date}: +{count} orders (will be appended)")
total_orders = sum(orders_to_add_per_date.values())
print(f"Total new orders to add: {total_orders}")

In [None]:
# Get current counts for target dates BEFORE adding data
from pyspark.sql import functions as F

print("\n" + "="*80)
print("BEFORE: Current counts for target dates")
print("="*80)

for date in target_dates:
    # Source table counts
    count_partitioned_src = spark.sql(f"""
        SELECT COUNT(*) as cnt
        FROM {catalog}.{schema}.orders_partitioned
        WHERE order_date = '{date}'
    """).collect()[0]['cnt']

    count_liquid_src = spark.sql(f"""
        SELECT COUNT(*) as cnt
        FROM {catalog}.{schema}.orders_liquid
        WHERE order_date = '{date}'
    """).collect()[0]['cnt']

    # Mart table aggregations
    mart_partitioned = spark.sql(f"""
        SELECT total_orders
        FROM {catalog}.{schema}.orders_mart_partitioned
        WHERE order_date = '{date}'
    """).collect()

    mart_liquid_count = spark.sql(f"""
        SELECT COUNT(*) as customer_count, SUM(total_orders) as total_orders
        FROM {catalog}.{schema}.orders_mart_liquid
        WHERE order_date = '{date}'
    """).collect()

    mart_part_orders = mart_partitioned[0]['total_orders'] if mart_partitioned else 0
    mart_liq_orders = mart_liquid_count[0]['total_orders'] if mart_liquid_count and mart_liquid_count[0]['total_orders'] else 0

    new_orders_to_add = orders_to_add_per_date[date]
    expected_new_total = count_partitioned_src + new_orders_to_add

    print(f"\n{date}:")
    print(f"  Source Tables (CURRENT):")
    print(f"    orders_partitioned: {count_partitioned_src:,} orders")
    print(f"    orders_liquid:      {count_liquid_src:,} orders")
    print(f"  Mart Tables (aggregated):")
    print(f"    orders_mart_partitioned: {mart_part_orders:,} total_orders")
    print(f"    orders_mart_liquid:      {mart_liq_orders:,} total_orders")
    print(f"  ‚úÖ In Sync: Source and Mart both show {count_partitioned_src:,} orders")
    print(f"  üìù Will append {new_orders_to_add} new orders (new total: {expected_new_total:,})")

In [None]:
# Generate late-arriving orders for each target date
print(f"\nGenerating late-arriving orders (appending to existing data)...")

for date in target_dates:
    # Get the random count for this specific date
    num_orders = orders_to_add_per_date[date]
    
    # Create DataFrame with late orders for this specific date
    df = spark.range(num_orders).select(
        F.concat(
            F.lit('ORD-LATE-'),
            F.lit(date),
            F.lit('-'),
            F.lpad(F.col('id').cast('string'), 5, '0')
        ).alias('order_id'),
        
        F.concat(
            F.lit('CUST-'),
            F.lpad(
                F.when(F.rand() < 0.3, (F.rand() * 50 + 1).cast('int'))
                 .when(F.rand() < 0.6, (F.rand() * 150 + 51).cast('int'))
                 .otherwise((F.rand() * 300 + 201).cast('int')).cast('string'),
                5, '0'
            )
        ).alias('customer_id'),
        
        F.lit(date).cast('date').alias('order_date'),
        
        F.concat(
            F.lit('PROD-'),
            F.lpad(
                F.when(F.rand() < 0.4, (F.rand() * 20 + 1).cast('int'))
                 .when(F.rand() < 0.7, (F.rand() * 30 + 21).cast('int'))
                 .otherwise((F.rand() * 50 + 51).cast('int')).cast('string'),
                4, '0'
            )
        ).alias('product_id'),
        
        F.when(F.rand() < 0.5, F.lit(1))
         .when(F.rand() < 0.8, (F.rand() * 2 + 2).cast('int'))
         .otherwise((F.rand() * 7 + 4).cast('int')).alias('quantity'),
        
        F.when(F.rand() < 0.4, F.round(9.99 + F.rand() * 40, 2))
         .when(F.rand() < 0.7, F.round(50 + F.rand() * 100, 2))
         .otherwise(F.round(150 + F.rand() * 350, 2)).alias('unit_price'),
        
        F.lit(None).cast('decimal(10,2)').alias('total_amount'),
        
        F.when(F.rand() < 0.60, F.lit('delivered'))
         .when(F.rand() < 0.75, F.lit('shipped'))
         .when(F.rand() < 0.87, F.lit('confirmed'))
         .when(F.rand() < 0.95, F.lit('pending'))
         .otherwise(F.lit('cancelled')).alias('status'),
        
        F.current_timestamp().alias('created_at')
    )
    
    # Append to both source tables (adds NEW orders to existing data)
    df.write.mode('append').insertInto(f"{catalog}.{schema}.orders_partitioned")
    df.write.mode('append').insertInto(f"{catalog}.{schema}.orders_liquid")
    
    print(f"  ‚úì Appended {num_orders} new orders for {date}")

In [None]:
# Get counts AFTER adding data
print("\n" + "="*80)
print("AFTER: Updated counts for target dates")
print("="*80)

for date in target_dates:
    # Source table counts
    count_partitioned_src = spark.sql(f"""
        SELECT COUNT(*) as cnt
        FROM {catalog}.{schema}.orders_partitioned
        WHERE order_date = '{date}'
    """).collect()[0]['cnt']

    count_liquid_src = spark.sql(f"""
        SELECT COUNT(*) as cnt
        FROM {catalog}.{schema}.orders_liquid
        WHERE order_date = '{date}'
    """).collect()[0]['cnt']

    # Mart table aggregations (should still show OLD counts)
    mart_partitioned = spark.sql(f"""
        SELECT total_orders
        FROM {catalog}.{schema}.orders_mart_partitioned
        WHERE order_date = '{date}'
    """).collect()

    mart_liquid_count = spark.sql(f"""
        SELECT COUNT(*) as customer_count, SUM(total_orders) as total_orders
        FROM {catalog}.{schema}.orders_mart_liquid
        WHERE order_date = '{date}'
    """).collect()

    mart_part_orders = mart_partitioned[0]['total_orders'] if mart_partitioned else 0
    mart_liq_orders = mart_liquid_count[0]['total_orders'] if mart_liquid_count and mart_liquid_count[0]['total_orders'] else 0

    difference = count_partitioned_src - mart_part_orders
    added_count = orders_to_add_per_date[date]

    print(f"\n{date}:")
    print(f"  Source Tables (AFTER APPEND):")
    print(f"    orders_partitioned: {count_partitioned_src:,} orders (+{added_count})")
    print(f"    orders_liquid:      {count_liquid_src:,} orders (+{added_count})")
    print(f"  Mart Tables (STILL OLD):")
    print(f"    orders_mart_partitioned: {mart_part_orders:,} total_orders")
    print(f"    orders_mart_liquid:      {mart_liq_orders:,} total_orders")
    print(f"  ‚ö†Ô∏è  OUT OF SYNC: Source has {difference:,} more orders than Mart!")

In [None]:
# Display summary
print("\n" + "="*80)
print("‚úÖ LATE-ARRIVING DATA SIMULATION COMPLETE")
print("="*80)

total_new_orders = sum(orders_to_add_per_date.values())
print(f"\nAppended {total_new_orders} new orders to source tables")
print(f"Target dates: {', '.join(target_dates)}")
print(f"Orders appended per date (random delta):")
for date in target_dates:
    print(f"  {date}: +{orders_to_add_per_date[date]} orders")

print(f"\n‚ö†Ô∏è  CRITICAL: Source and Mart tables are now OUT OF SYNC!")
print(f"\n  Source Tables:")
print(f"    ‚úÖ Have the NEW orders (appended on top of existing)")
for date in target_dates:
    print(f"       {date}: +{orders_to_add_per_date[date]} orders")

print(f"\n  Mart Tables (orders_mart_partitioned, orders_mart_liquid):")
print(f"    ‚ùå Still show OLD aggregations")
print(f"    ‚ùå Missing the appended orders in their totals")
print(f"    ‚ùå Revenue, customer counts, and other metrics are STALE")
print(f"\nüí° This demonstrates a real-world scenario:")
print(f"   Late-arriving data from upstream systems (delayed feeds, corrections, etc.)")
print(f"   New orders are APPENDED to existing source data, creating sync gap with marts")
print(f"\n‚ú® This notebook is part of the complete dbt_workflow job")
print(f"   The workflow automatically continues to:")
print(f"   - Run dbt build --full-refresh (fast! bypasses information_schema queries)")
print(f"   - insert_overwrite auto-detects which partitions to overwrite")
print(f"   - Only {len(target_dates)} affected partitions are refreshed")
print(f"   - Verify synchronization")
print(f"\nüîë KEY INSIGHT:")
print(f"  --full-refresh with insert_overwrite does NOT replace all data!")
print(f"  It just skips incremental checks, while insert_overwrite remains data-driven")
print(f"  and only overwrites partitions with data in the result.")
print(f"\nüìö REFERENCES:")
print(f"  - dbt insert_overwrite: https://docs.getdbt.com/docs/build/incremental-strategy#insert_overwrite")
print(f"  - Databricks Dynamic Partition Overwrites:")
print(f"    https://docs.databricks.com/aws/en/delta/selective-overwrite#dynamic-partition-overwrites-with-replace-using")
print(f"\nüìä Efficiency gain:")
print(f"  - WITHOUT insert_overwrite: Replace entire table (~40 partitions)")
print(f"  - WITH dynamic overwrite:   Replace only {len(target_dates)} partitions (automatic!)")
print(f"  - Savings: {100 - (len(target_dates) / 40 * 100):.1f}% of partitions unchanged!")
print("="*80)