## Test Suite for ETL Pattern
Purpose of this notebook is to test out the ETL pattern by creating blank tables, inserting data, and ensuring it matches expectations.

### Test Numbering Convention
| Prefix | Category |
|--------|----------|
| **1.x** | Setup & Empty Load Tests |
| **2.x** | Dimension Type 1 Tests |
| **3.x** | Fact Table Tests |
| **4.x** | Dimension Type 2 (SCD2) Tests |
| **5.x** | Edge Cases & Robustness Tests |
| **6.x** | Deletes |

#### Library Setup and Connections

In [None]:
import os
import sys
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from snowflake.snowpark import Session

In [None]:
sys.path.append(os.getcwd().rsplit('\\',1)[0])

In [None]:
# Environment variables
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")

In [None]:
def get_session_via_keypair():
    key_path = os.environ.get("SNOWFLAKE_PRIVATE_KEY_PATH")
    key_path_2 = os.environ.get("SNOWFLAKE_PRIVATE_KEY_PATH_2")
    passphrase = os.environ.get("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE")
    password = passphrase.encode() if passphrase else None

    # Load private key
    try:
        with open(key_path, "rb") as key_file:
            private_key = serialization.load_pem_private_key(
                key_file.read(),
                password=password,
                backend=default_backend()
            )
    except:
        with open(key_path_2, "rb") as key_file:
            private_key = serialization.load_pem_private_key(
                key_file.read(),
                password=password,
                backend=default_backend()
            )

    # Convert to bytes format Snowflake expects
    pkb = private_key.private_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )

    # Create connection parameters
    connection_parameters = {
        "account": SNOWFLAKE_ACCOUNT,
        "user": SNOWFLAKE_USER,
        "role": SNOWFLAKE_ROLE,
        "database": SNOWFLAKE_DATABASE,
        "schema": SNOWFLAKE_SCHEMA,
        "warehouse": SNOWFLAKE_WAREHOUSE,
        "private_key": pkb
    }

    return connection_parameters

In [None]:
def test_connection():
    # Start a Snowflake session, run the query and write results to specified table
    with Session.builder.configs(get_session_via_keypair()).create() as session:
        print("Test started")
        # Print out current session context information.
        database = session.get_current_database()
        schema = session.get_current_schema()
        warehouse = session.get_current_warehouse()
        role = session.get_current_role()
        user = session.get_current_user()
        print(f"Connection succeeded. Current session context: user={user}, database={database}, schema={schema}, warehouse={warehouse}, role={role}")

In [None]:
test_connection()

In [None]:
session = Session.builder.configs(get_session_via_keypair()).create()
session.sql('SELECT CURRENT_USER()').collect()

In [None]:
print(session.sql('USE ROLE unit_test_role').collect())
print(session.sql('USE DATABASE learning_db').collect())


In [None]:
from procs.table_updater import *

#### Table Setup

In [None]:
print(session.sql(f"""
DROP TABLE IF EXISTS learning_db.unit_test.source_employee_profile;
""").collect())

print(session.sql(f"""
DROP TABLE IF EXISTS learning_db.unit_test.source_employee_pay;
""").collect())

print(session.sql(f"""
DROP TABLE IF EXISTS learning_db.unit_test.dim_employee_profile;
""").collect())

print(session.sql(f"""
DROP TABLE IF EXISTS learning_db.unit_test.fact_employee_pay;
""").collect())

print(session.sql(f"""
DROP TABLE IF EXISTS learning_db.unit_test.dim_employee_profile_type_2;
""").collect())

In [None]:
session.sql(f"""
CREATE TABLE learning_db.unit_test.source_employee_profile (
    employee_id BIGINT,
    first_name STRING,
    last_name STRING,
    department STRING
);
""").collect()

In [None]:
session.sql(f"""
CREATE TABLE learning_db.unit_test.source_employee_pay (
    employee_id BIGINT,
    pay_date DATE,
    pay_amount NUMBER(10,2)
);
""").collect()

In [None]:
print(session.sql(f"""
CREATE TABLE learning_db.unit_test.dim_employee_profile (
    dim_employee_profile_key BIGINT AUTOINCREMENT,
    employee_id BIGINT,
    first_name STRING,
    last_name STRING,
    department STRING,
    etl_row_hash_value STRING,
    create_username STRING,
    create_datetime TIMESTAMP_NTZ,
    create_batch_name STRING,
    last_update_username STRING,
    last_update_datetime TIMESTAMP_NTZ,
    last_update_batch_name STRING
);
""").collect())


print(session.sql(f"""
ALTER TABLE learning_db.unit_test.dim_employee_profile
ADD CONSTRAINT pk_dim_employee PRIMARY KEY (employee_id);
""").collect())

In [None]:
print(session.sql(f"""
CREATE TABLE learning_db.unit_test.fact_employee_pay (
    fact_employee_pay_key BIGINT AUTOINCREMENT,
    dim_employee_profile_key BIGINT,
    pay_date DATE,
    pay_amount NUMBER(10,2),
    employee_id BIGINT,
    etl_row_hash_value STRING,
    create_username STRING,
    create_datetime TIMESTAMP_NTZ,
    create_batch_name STRING,
    last_update_username STRING,
    last_update_datetime TIMESTAMP_NTZ,
    last_update_batch_name STRING
);
""").collect())


print(session.sql(f"""
ALTER TABLE learning_db.unit_test.fact_employee_pay
ADD CONSTRAINT pk_fact_employee_pay PRIMARY KEY (employee_id, pay_date);
""").collect())

In [None]:
print(session.sql(f"""
CREATE TABLE learning_db.unit_test.dim_employee_profile_type_2 (
    dim_employee_profile_type_2_key BIGINT AUTOINCREMENT,
    employee_id BIGINT,
    first_name STRING,
    last_name STRING,
    department STRING,
    etl_row_hash_value STRING,
    etl_row_hash_value_2 STRING,
    row_effective_date DATE,
    row_expiration_date DATE,
    current_row_flag INT,
    create_username STRING,
    create_datetime TIMESTAMP_NTZ,
    create_batch_name STRING,
    last_update_username STRING,
    last_update_datetime TIMESTAMP_NTZ,
    last_update_batch_name STRING
);
""").collect())


print(session.sql(f"""
ALTER TABLE learning_db.unit_test.dim_employee_profile_type_2
ADD CONSTRAINT pk_dim_employee PRIMARY KEY (employee_id);
""").collect())

#### Setup ETL Views

In [None]:
session.sql(f"""
CREATE OR REPLACE VIEW learning_db.unit_test.vw_dim_employee_profile AS
SELECT
    employee_id,
    first_name,
    last_name,
    department,
    SHA1(CONCAT_WS('|',
        COALESCE(CAST(first_name as STRING), '|'),
        COALESCE(CAST(last_name as STRING), '|'),
        COALESCE(CAST(department as STRING), '|')
    )) AS etl_row_hash_value --all but natural composite key
FROM learning_db.unit_test.source_employee_profile
""").collect()

In [None]:
session.sql("""
SELECT * FROM learning_db.unit_test.vw_dim_employee_profile
""").to_pandas()

In [None]:
session.sql(f"""
CREATE OR REPLACE VIEW learning_db.unit_test.vw_fact_employee_pay AS
SELECT
    e.dim_employee_profile_key,
    p.pay_date,
    p.pay_amount,
    p.employee_id,
    SHA1(CONCAT_WS('|',
        COALESCE(CAST(p.pay_amount as STRING), '|')
    )) AS etl_row_hash_value --all but natural composite key
FROM learning_db.unit_test.source_employee_pay p
INNER JOIN learning_db.unit_test.dim_employee_profile e
    ON p.employee_id = e.employee_id
""").collect()

In [None]:
session.sql("""
SELECT * FROM learning_db.unit_test.vw_fact_employee_pay
""").to_pandas()

In [None]:
session.sql(f"""
CREATE OR REPLACE VIEW learning_db.unit_test.vw_dim_employee_profile_type_2 AS
SELECT
    employee_id,
    first_name,
    last_name,
    department,
    SHA1(CONCAT_WS('|',
        COALESCE(CAST(first_name as STRING), '|'),
        COALESCE(CAST(last_name as STRING), '|')
    )) AS etl_row_hash_value, --Type 1 changes
    SHA1(CONCAT_WS('|',
        COALESCE(CAST(department as STRING), '|')
    )) AS etl_row_hash_value_2 --Type 2 changes
FROM learning_db.unit_test.source_employee_profile
""").collect()

In [None]:
session.sql("""
SELECT * FROM learning_db.unit_test.vw_dim_employee_profile_type_2
""").to_pandas()

In [None]:
def clear_all_tables_pre_test():
    print(session.sql('TRUNCATE TABLE learning_db.unit_test.source_employee_profile').collect())
    print(session.sql('TRUNCATE TABLE learning_db.unit_test.source_employee_pay').collect())
    print(session.sql('TRUNCATE TABLE learning_db.unit_test.dim_employee_profile').collect())
    print(session.sql('TRUNCATE TABLE learning_db.unit_test.fact_employee_pay').collect())

In [None]:
# ============================================
# TEST RESULTS TRACKER
# ============================================
test_results = []

def record_test(test_id: str, test_name: str, passed: bool, details: str = ""):
    """Record a test result"""
    status = "PASS" if passed else "FAIL"
    test_results.append({
        "test_id": test_id,
        "test_name": test_name,
        "passed": passed,
        "status": status,
        "details": details
    })
    print(f"[{status}] {test_id}: {test_name}" + (f" - {details}" if details else ""))

def run_test(test_id: str, test_name: str, condition: bool, fail_msg: str = ""):
    """Run a test assertion and record result"""
    if not condition:
        record_test(test_id=test_id, test_name=test_name, passed=False, details=fail_msg)
        raise Exception(f"[{test_id}] {test_name}: {fail_msg}")
    record_test(test_id=test_id, test_name=test_name, passed=True)

def show_table(table_name: str, label: str = "", where_clause: str = ""):
    """Display table contents with optional label and filter"""
    if label:
        print(f"\n{'='*60}\n{label}\n{'='*60}")
    query = f"SELECT * FROM {table_name}"
    if where_clause:
        query += f" WHERE {where_clause}"
    return session.sql(query).to_pandas()

In [None]:
def perform_full_class_run(session: Session, table_name: str, type_1_column_names: str = None):
    batch_id = f"{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}"
    table_updater = TableUpdater(
        session, 
        table_name, 
        batch_id,
        schema_name='unit_test',
        etl_schema_name='unit_test',
        src_schema_name='unit_test',
        type_1_column_names=type_1_column_names
    )
    table_updater.identify_upserts()
    if table_updater.table_type == 'dim_type_2':
        table_updater.process_type2_expirations()
    table_updater.process_table_updates()
    table_updater.process_table_inserts()
    if table_updater.table_type == 'dim_type_2':
        table_updater.process_type1_historical_updates()

---
## 1. Setup & Empty Load Tests

### Test 1.1: Empty Load (No Source Data)

**Testing:** ETL behavior when source tables are empty

**Change:** Run ETL with truncated (empty) source tables

**Expected Behavior:**
- No errors during execution
- Updates staging table remains empty (0 rows)
- Target dimension table remains empty (0 rows)

In [None]:
clear_all_tables_pre_test()

In [None]:
perform_full_class_run(session, 'dim_employee_profile')

In [None]:
update_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_updates').collect()[0].RECORD_COUNT
run_test(test_id="1.1a", test_name="Empty load - updates table empty", 
         condition=update_table_record_count == 0, 
         fail_msg=f"Expected 0 rows, got {update_table_record_count}")

In [None]:
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile').collect()[0].RECORD_COUNT
run_test(test_id="1.1b", test_name="Empty load - dim table empty", 
         condition=dim_table_record_count == 0, 
         fail_msg=f"Expected 0 rows, got {dim_table_record_count}")

---
## 2. Dimension Type 1 Tests

### Test 2.1: Insert New Records

**Testing:** Initial load of dimension records

**Change:** Insert 2 new employee records into source table, run ETL

**Expected Behavior:**
- 2 rows inserted into dimension table
- Audit columns populated (create_username, create_datetime, create_batch_name)
- Rerun produces no duplicates (idempotent)

In [None]:
clear_all_tables_pre_test()

In [None]:
# Insert source data
session.sql(f"""
INSERT INTO learning_db.unit_test.source_employee_profile (employee_id, first_name, last_name, department)
VALUES
    (1, 'Jack', 'Smith', 'Finance'),
    (2, 'Jill', 'Smith', 'Merchandising')
""").collect()

print("SOURCE DATA (AFTER INSERT):")
show_table("learning_db.unit_test.source_employee_profile")

In [None]:
perform_full_class_run(session, 'dim_employee_profile')

In [None]:
# Verify initial insert worked - 2 rows should be in the dim table
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile').collect()[0].RECORD_COUNT
run_test(test_id="2.1a", test_name="Insert - correct row count", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows, got {dim_table_record_count}")

# Verify 2 inserts were identified
insert_count = session.sql("SELECT COUNT(*) as cnt FROM learning_db.unit_test.dim_employee_profile_updates WHERE insert_update_indicator = 'insert'").collect()[0].CNT
run_test(test_id="2.1b", test_name="Insert - staging table shows 2 inserts", 
         condition=insert_count == 2,
         fail_msg=f"Expected 2 inserts in staging, got {insert_count}")

print("DIM TABLE (AFTER INITIAL INSERT):")
show_table("learning_db.unit_test.dim_employee_profile")


### Test 2.2: Update Existing Records

**Testing:** Type 1 dimension update behavior

**Change:** Update employee 1's department from 'Finance' to 'HR' in source, run ETL

**Expected Behavior:**
- Still 2 rows in dim table (no duplicate)
- Employee 1 has department = 'HR' 
- `last_update_*` columns changed, `create_*` columns remain original
- `last_update_batch_name` differs from `create_batch_name`


In [None]:
print("DIM TABLE (BEFORE UPDATE):")
display(show_table("learning_db.unit_test.dim_employee_profile"))

# Update source record - this should trigger an UPDATE in the dim table (not an insert)
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET department = 'HR' WHERE employee_id = 1").collect()

print("\nSOURCE DATA (AFTER UPDATE):")
show_table("learning_db.unit_test.source_employee_profile")

In [None]:
perform_full_class_run(session, 'dim_employee_profile')

In [None]:
# Verify still 2 rows (no duplicate created)
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile').collect()[0].RECORD_COUNT
run_test(test_id="2.2a", test_name="Update - no duplicate rows", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows, got {dim_table_record_count}")

In [None]:
# Verify the update was applied
updated_dept = session.sql("SELECT department FROM learning_db.unit_test.dim_employee_profile WHERE employee_id = 1").collect()[0].DEPARTMENT
run_test(test_id="2.2b", test_name="Update - department value changed", 
         condition=updated_dept == 'HR',
         fail_msg=f"Expected HR, got {updated_dept}")

In [None]:
# Verify audit columns: create_* should be original, last_update_* should be new
result = session.sql("""
    SELECT 
        create_username, create_datetime, create_batch_name,
        last_update_username, last_update_datetime, last_update_batch_name
    FROM learning_db.unit_test.dim_employee_profile 
    WHERE employee_id = 1
""").collect()[0]

run_test(test_id="2.2c", test_name="Update - audit batch names differ", 
         condition=result.CREATE_BATCH_NAME != result.LAST_UPDATE_BATCH_NAME,
         fail_msg="CREATE_BATCH_NAME should differ from LAST_UPDATE_BATCH_NAME after update")

run_test(test_id="2.2d", test_name="Update - audit timestamps correct", 
         condition=result.CREATE_DATETIME <= result.LAST_UPDATE_DATETIME,
         fail_msg="CREATE_DATETIME should be earlier than or equal to LAST_UPDATE_DATETIME")


In [None]:
print("DIM TABLE (AFTER UPDATE):")
print("MANUAL CHECK: Verify employee 1 has department='HR' and last_update_* differs from create_*")
show_table("learning_db.unit_test.dim_employee_profile")


In [None]:
# This verifies idempotency after update - row count should still be 2
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile').collect()[0].RECORD_COUNT
run_test(test_id="2.2e", test_name="Idempotency after update - row count unchanged", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows, got {dim_table_record_count}")

In [None]:
# Perform another run to ensure no duplicates
perform_full_class_run(session, 'dim_employee_profile')

In [None]:
update_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_updates').collect()[0].RECORD_COUNT
run_test(test_id="2.2f", test_name="Idempotency after update - no changes detected on rerun", 
         condition=update_table_record_count == 0,
         fail_msg=f"Expected 0 updates, got {update_table_record_count}")

In [None]:
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile').collect()[0].RECORD_COUNT
run_test(test_id="2.2g", test_name="Idempotency after update - no duplicates on rerun", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows after rerun, got {dim_table_record_count}")

---
## 3. Fact Table Tests

### Test 3.1: Insert New Fact Records

**Testing:** Fact table inserts with foreign key validation

**Change:** Insert 3 pay records (2 valid, 1 with non-existent employee_id 999)

**Expected Behavior:**
- Only 2 rows inserted (employee 999 filtered out by FK join)
- Audit columns populated correctly
- Rerun produces no duplicates

In [None]:
# Insert source pay data (note: employee 999 doesn't exist in dim - should not insert)
session.sql(f"""
INSERT INTO learning_db.unit_test.source_employee_pay (employee_id, pay_date, pay_amount)
VALUES
    (1, '2025-09-30', 100.00),
    (2, '2025-10-30', 200.00),
    (999, '2025-11-30', 200.00) --This should not insert into fact due to foreign key mismatch.
""").collect()

print("SOURCE PAY DATA (AFTER INSERT):")
show_table("learning_db.unit_test.source_employee_pay")

In [None]:
perform_full_class_run(session, 'fact_employee_pay')

In [None]:
print("FACT TABLE (AFTER INITIAL INSERT):")
show_table("learning_db.unit_test.fact_employee_pay")

In [None]:
fact_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.fact_employee_pay').collect()[0].RECORD_COUNT
run_test(test_id="3.1a", test_name="Fact insert - correct row count (FK filtered)", 
         condition=fact_table_record_count == 2,
         fail_msg=f"Expected 2 rows (employee 999 excluded), got {fact_table_record_count}")

In [None]:
# Perform another run to ensure no duplicates
perform_full_class_run(session, 'fact_employee_pay')

In [None]:
fact_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.fact_employee_pay').collect()[0].RECORD_COUNT
run_test(test_id="3.1b", test_name="Fact insert - idempotent (no duplicates on rerun)", 
         condition=fact_table_record_count == 2,
         fail_msg=f"Expected 2 rows after rerun, got {fact_table_record_count}")

### Test 3.2: Update Existing Fact Records (Composite Key)

**Testing:** Fact table updates with composite natural key (employee_id + pay_date)

**Change:** Update employee 1's pay_amount from 100.00 to 150.00

**Expected Behavior:**
- Still 2 rows in fact table (no duplicate)
- Employee 1's pay_amount = 150.00
- Audit columns: `last_update_*` differs from `create_*`


In [None]:
print("FACT TABLE (BEFORE UPDATE):")
display(show_table("learning_db.unit_test.fact_employee_pay"))

# Update source pay record - this should trigger an UPDATE in the fact table
session.sql("UPDATE learning_db.unit_test.source_employee_pay SET pay_amount = 150.00 WHERE employee_id = 1").collect()

print("\nSOURCE PAY DATA (AFTER UPDATE):")
show_table("learning_db.unit_test.source_employee_pay")

In [None]:
perform_full_class_run(session, 'fact_employee_pay')

In [None]:
# Verify still 2 rows (no duplicate created)
fact_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.fact_employee_pay').collect()[0].RECORD_COUNT
run_test(test_id="3.2a", test_name="Fact update - no duplicate rows", 
         condition=fact_table_record_count == 2,
         fail_msg=f"Expected 2 rows, got {fact_table_record_count}")


In [None]:
# Verify the update was applied (composite natural key test - employee_id + pay_date)
updated_pay = session.sql("SELECT pay_amount FROM learning_db.unit_test.fact_employee_pay WHERE employee_id = 1 AND pay_date = '2025-09-30'").collect()[0].PAY_AMOUNT
run_test(test_id="3.2b", test_name="Fact update - composite key matched correctly", 
         condition=float(updated_pay) == 150.00,
         fail_msg=f"Expected 150.00, got {updated_pay}")

In [None]:
# Verify audit columns for fact update
result = session.sql("""
    SELECT 
        create_batch_name, last_update_batch_name,
        create_datetime, last_update_datetime
    FROM learning_db.unit_test.fact_employee_pay 
    WHERE employee_id = 1 AND pay_date = '2025-09-30'
""").collect()[0]

run_test(test_id="3.2c", test_name="Fact update - audit batch names differ", 
         condition=result.CREATE_BATCH_NAME != result.LAST_UPDATE_BATCH_NAME,
         fail_msg="CREATE_BATCH_NAME should differ from LAST_UPDATE_BATCH_NAME")


In [None]:
print("FACT TABLE (AFTER UPDATE):")
print("MANUAL CHECK: Verify employee 1 has pay_amount=150.00 and last_update_* differs from create_*")
show_table("learning_db.unit_test.fact_employee_pay")


---
## 4. Dimension Type 2 (SCD2) Tests

### Test 4.1: Type 1 Change (No Historical Rows)

**Testing:** Type 1 change behavior when no historical rows exist

**Change:** Update employee 1's first_name from 'Jack' to 'Jackson' (Type 1 column)

**Expected Behavior:**
- Still 2 rows total (both current)
- Employee 1's first_name = 'Jackson'
- No historical rows to update (Type 1 historical update has no effect)


In [None]:
# First, truncate the Type 2 dim and do initial insert
session.sql('TRUNCATE TABLE learning_db.unit_test.dim_employee_profile_type_2').collect()

# Reset source data for employee 1 back to original
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET department = 'Finance', first_name = 'Jack' WHERE employee_id = 1").collect()

# Initial load
perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')


In [None]:
# Now make a Type 1 change (first_name) - no historical rows exist yet
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET first_name = 'Jackson' WHERE employee_id = 1").collect()

perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')


In [None]:
# Verify still 2 rows (no historical rows, just 2 current)
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2').collect()[0].RECORD_COUNT
run_test(test_id="4.1a", test_name="Type 1 change - no historical row created", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows, got {dim_table_record_count}")

In [None]:
# Verify the Type 1 update was applied to the current row
updated_name = session.sql("SELECT first_name FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id = 1 AND current_row_flag = 1").collect()[0].FIRST_NAME
run_test(test_id="4.1b", test_name="Type 1 change - value updated", 
         condition=updated_name == 'Jackson',
         fail_msg=f"Expected 'Jackson', got '{updated_name}'")


In [None]:
print("TYPE 2 DIM TABLE (AFTER TYPE 1 CHANGE):")
print("MANUAL CHECK: Verify 2 rows total, employee 1 has first_name='Jackson', all current_row_flag=1")
show_table("learning_db.unit_test.dim_employee_profile_type_2")


### Test 4.2: Type 2 Initial Load & Idempotency

**Testing:** Type 2 dimension initial load and idempotency

**Change:** Run ETL on existing data (no source changes)

**Expected Behavior:**
- 2 rows present (from Test 4.1)
- Rerun produces no duplicates (idempotent)

In [None]:
perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')

In [None]:
print("TYPE 2 DIM TABLE (CURRENT STATE):")
show_table("learning_db.unit_test.dim_employee_profile_type_2")

In [None]:
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2').collect()[0].RECORD_COUNT
run_test(test_id="4.2a", test_name="Type 2 initial load - correct row count", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows, got {dim_table_record_count}")

#### Idempotency Check (Rerun with No Changes)

In [None]:
# Test no duplicates
perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')

In [None]:
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2').collect()[0].RECORD_COUNT
run_test(test_id="4.2b", test_name="Type 2 idempotency - no duplicates on rerun", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows after rerun, got {dim_table_record_count}")

### Test 4.3: Type 2 Change (Department Update)

**Testing:** Type 2 SCD behavior when tracked column changes

**Change:** Update employee 1's department from 'Finance' to 'Accounting' (Type 2 column)

**Expected Behavior:**
- 3 rows total (1 historical for employee 1, 1 current for employee 1, 1 for employee 2)
- Old row: current_row_flag = 0, row_expiration_date set
- New row: current_row_flag = 1, row_expiration_date = '9999-12-31'

In [None]:
print("SOURCE DATA (BEFORE TYPE 2 CHANGE):")
show_table("learning_db.unit_test.source_employee_profile")

In [None]:
print("TYPE 2 DIM TABLE (BEFORE TYPE 2 CHANGE):")
show_table("learning_db.unit_test.dim_employee_profile_type_2")

In [None]:
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET department = 'Accounting' WHERE department = 'Finance'").collect()

In [None]:
perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')

In [None]:
print("TYPE 2 DIM TABLE (AFTER TYPE 2 CHANGE):")
print("MANUAL CHECK: Verify new row created, old row expired")
show_table("learning_db.unit_test.dim_employee_profile_type_2")

In [None]:
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2').collect()[0].RECORD_COUNT
run_test(test_id="4.3a", test_name="Type 2 change - new row created (3 total)", 
         condition=dim_table_record_count == 3,
         fail_msg=f"Expected 3 rows, got {dim_table_record_count}")

In [None]:
dim_table_record_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE current_row_flag = 1').collect()[0].RECORD_COUNT
run_test(test_id="4.3b", test_name="Type 2 change - only 2 current rows", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 current rows, got {dim_table_record_count}")

### Test 4.4: Type 1 Change with Historical Rows

**Testing:** Type 1 retroactive update to historical rows

**Change:** Update employee 1's first_name from 'Jack' to 'John' (Type 1 column)

**Expected Behavior:**
- All rows for employee 1 updated with first_name = 'John'
- Historical rows (current_row_flag = 0) also updated
- No new rows created (Type 1 doesn't create history)

In [None]:
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET first_name = 'John' WHERE first_name = 'Jackson'").collect()

In [None]:
perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')

In [None]:
print("TYPE 2 DIM TABLE (AFTER TYPE 1 HISTORICAL UPDATE):")
print("MANUAL CHECK: Verify ALL rows have first_name='John' (Type 1 applied to history)")
show_table("learning_db.unit_test.dim_employee_profile_type_2")

In [None]:
dim_table_record_count = session.sql("SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE first_name = 'John'").collect()[0].RECORD_COUNT
run_test(test_id="4.4a", test_name="Type 1 historical update - all rows updated", 
         condition=dim_table_record_count == 2,
         fail_msg=f"Expected 2 rows with first_name='John', got {dim_table_record_count}")

### Test 4.5: Simultaneous Type 1 + Type 2 Change

**Testing:** Concurrent Type 1 and Type 2 changes in same batch

**Change:** Update employee 1's last_name to 'Johnson' (Type 1) AND department to 'Marketing' (Type 2)

**Expected Behavior:**
- New row inserted for employee 1 (Type 2 triggers new row)
- Old row expired (current_row_flag = 0)
- ALL historical rows updated with last_name = 'Johnson' (Type 1)
- Date ranges don't overlap


In [None]:
# Simultaneous Type 1 (last_name) + Type 2 (department) change
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET last_name = 'Johnson', department = 'Marketing' WHERE employee_id = 1").collect()

perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')

In [None]:
# Count rows for employee 1 - should be 4 total (2 historical + 1 current for employee 1, plus 1 for employee 2)
# Actually after the prior tests: employee 1 has had 2 dept changes, so should have multiple rows
employee_1_rows = session.sql("SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id = 1").collect()[0].RECORD_COUNT
print(f'Employee 1 has {employee_1_rows} rows')


In [None]:
# Verify ALL rows for employee 1 have last_name = 'Johnson' (Type 1 change applied to historical rows)
johnson_rows = session.sql("SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id = 1 AND last_name = 'Johnson'").collect()[0].RECORD_COUNT
run_test(test_id="4.5a", test_name="Simultaneous change - Type 1 applied to all rows", 
         condition=johnson_rows == employee_1_rows,
         fail_msg=f"Expected {employee_1_rows} rows with 'Johnson', got {johnson_rows}")


In [None]:
# Verify current row has the new department (Type 2 change)
current_dept = session.sql("SELECT department FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id = 1 AND current_row_flag = 1").collect()[0].DEPARTMENT
run_test(test_id="4.5b", test_name="Simultaneous change - Type 2 created new current row", 
         condition=current_dept == 'Marketing',
         fail_msg=f"Expected 'Marketing', got '{current_dept}'")


In [None]:
print("TYPE 2 DIM TABLE (AFTER SIMULTANEOUS TYPE 1 + TYPE 2 CHANGE):")
print("MANUAL CHECK:")
print("  1. Employee 1 has multiple rows with different departments")
print("  2. ALL rows have last_name='Johnson' (Type 1 applied to history)")
print("  3. Date ranges don't overlap")
print("  4. Only 1 row has current_row_flag=1 per employee")
session.sql('SELECT * FROM learning_db.unit_test.dim_employee_profile_type_2 ORDER BY employee_id, row_effective_date').to_pandas()


In [None]:
# AUTOMATED: Verify date ranges don't overlap for any employee
overlap_check = session.sql("""
    SELECT *
    FROM learning_db.unit_test.dim_employee_profile_type_2 a
    JOIN learning_db.unit_test.dim_employee_profile_type_2 b
        ON a.employee_id = b.employee_id
        AND a.dim_employee_profile_type_2_key < b.dim_employee_profile_type_2_key
        AND a.row_effective_date <= b.row_expiration_date
        AND b.row_effective_date <= a.row_expiration_date
        AND a.row_effective_date < a.row_expiration_date
        AND b.row_effective_date < b.row_expiration_date
""").collect()
run_test(test_id="4.5c", test_name="Date ranges don't overlap", 
         condition=len(overlap_check) == 0,
         fail_msg=f"Overlapping date ranges found: {overlap_check}")

# AUTOMATED: Verify current rows have row_expiration_date = '9999-12-31'
bad_current_rows = session.sql("""
    SELECT employee_id, row_expiration_date 
    FROM learning_db.unit_test.dim_employee_profile_type_2 
    WHERE current_row_flag = 1 AND row_expiration_date != '9999-12-31'
""").collect()
run_test(test_id="4.5d", test_name="Current rows have expiration date 9999-12-31", 
         condition=len(bad_current_rows) == 0,
         fail_msg=f"Current rows with wrong expiration: {bad_current_rows}")

# AUTOMATED: Verify only one current row per natural key
duplicate_current = session.sql("""
    SELECT employee_id, COUNT(*) as cnt 
    FROM learning_db.unit_test.dim_employee_profile_type_2 
    WHERE current_row_flag = 1 
    GROUP BY employee_id 
    HAVING COUNT(*) > 1
""").collect()
run_test(test_id="4.5e", test_name="Single current row per natural key", 
         condition=len(duplicate_current) == 0,
         fail_msg=f"Multiple current rows found: {duplicate_current}")


### Test 4.6: Multiple Type 2 Changes (Historical Chain)

**Testing:** Successive Type 2 changes create proper historical chains

**Change:** Update employee 1's department to 'Legal' (another Type 2 change)

**Expected Behavior:**
- Additional historical row created for employee 1
- Date ranges do NOT overlap
- Only one current row per natural key


In [None]:
# Another Type 2 change - this creates another historical row
session.sql("UPDATE learning_db.unit_test.source_employee_profile SET department = 'Executive' WHERE employee_id = 1").collect()

perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')


In [None]:
# Count historical rows for employee 1 - should be 3+ now
employee_1_hist = session.sql("SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id = 1 AND current_row_flag = 0").collect()[0].RECORD_COUNT
run_test(test_id="4.6a", test_name="Multiple Type 2 changes - historical chain created", 
         condition=employee_1_hist >= 2,
         fail_msg=f"Expected at least 2 historical rows, got {employee_1_hist}")


In [None]:
# Verify only 1 current row per natural key
current_rows = session.sql("SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id = 1 AND current_row_flag = 1").collect()[0].RECORD_COUNT
run_test(test_id="4.6b", test_name="Multiple Type 2 changes - single current row", 
         condition=current_rows == 1,
         fail_msg=f"Expected 1 current row, got {current_rows}")


In [None]:
print("TYPE 2 DIM TABLE (HISTORICAL CHAIN FOR EMPLOYEE 1):")
print("MANUAL CHECK: Verify date ranges don't overlap")
print("  - Each row's row_expiration_date < next row's row_effective_date")
print("  - Current row has row_expiration_date = '9999-12-31'")
session.sql("""
    SELECT employee_id, department, row_effective_date, row_expiration_date, current_row_flag
    FROM learning_db.unit_test.dim_employee_profile_type_2 
    WHERE employee_id = 1
    ORDER BY row_effective_date
""").to_pandas()


---
## 5. Edge Cases & Robustness Tests

### Test 5.1: NULL and Edge Case Handling

**Testing:** NULL and empty string handling in hash calculations

**Change:** Insert 3 records with NULL first_name, empty string first_name, and NULL last_name

**Expected Behavior:**
- All 3 records inserted successfully
- Hash values are non-NULL (COALESCE handles NULL/empty)
- No errors during ETL processing


In [None]:
# Insert records with NULL values and empty strings
session.sql("""
INSERT INTO learning_db.unit_test.source_employee_profile (employee_id, first_name, last_name, department)
VALUES
    (99, NULL, 'NullFirst', 'TestDept'),
    (98, '', 'EmptyFirst', 'TestDept'),
    (97, 'Normal', NULL, 'TestDept')
""").collect()

print("SOURCE DATA (WITH NULL/EMPTY VALUES):")
display(show_table(table_name="learning_db.unit_test.source_employee_profile", where_clause="employee_id IN (97, 98, 99)"))

# Should not raise an error
perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')
record_test(test_id="5.1a", test_name="NULL handling - no errors during processing", passed=True)


In [None]:
# Verify NULL value records were inserted
null_records = session.sql("SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2 WHERE employee_id IN (97, 98, 99)").collect()[0].RECORD_COUNT
run_test(test_id="5.1b", test_name="NULL handling - records inserted", 
         condition=null_records == 3,
         fail_msg=f"Expected 3 records, got {null_records}")


In [None]:
print("TYPE 2 DIM TABLE (NULL/EMPTY VALUE RECORDS):")
print("MANUAL CHECK: Verify NULL/empty values have valid (non-NULL) hash values")
session.sql("""
    SELECT employee_id, first_name, last_name, department, etl_row_hash_value
    FROM learning_db.unit_test.dim_employee_profile_type_2 
    WHERE employee_id IN (97, 98, 99)
""").to_pandas()


### Test 5.2: Idempotency Verification (3x Runs)

**Testing:** ETL idempotency - repeated runs with no source changes

**Change:** Run ETL 3 times with no source data changes

**Expected Behavior:**
- Row count remains constant after each run
- No new inserts or updates occur
- Proves ETL is idempotent


In [None]:
# AUTOMATED: Verify NULL/empty source values still produce valid (non-NULL) hash values
null_hash_check = session.sql("""
    SELECT employee_id, etl_row_hash_value 
    FROM learning_db.unit_test.dim_employee_profile_type_2 
    WHERE employee_id IN (97, 98, 99) AND etl_row_hash_value IS NULL
""").collect()
run_test(test_id="5.1c", test_name="NULL handling - hash values not null", 
         condition=len(null_hash_check) == 0,
         fail_msg=f"NULL hash values found: {null_hash_check}")


In [None]:
# Get initial row count
initial_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2').collect()[0].RECORD_COUNT
print(f'Initial row count: {initial_count}')

# Run the same load 3 times with no source changes
idempotent = True
for i in range(3):
    perform_full_class_run(session, 'dim_employee_profile_type_2', type_1_column_names='first_name,last_name')
    current_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.dim_employee_profile_type_2').collect()[0].RECORD_COUNT
    if current_count != initial_count:
        idempotent = False
        break
    print(f'  Run {i+1}: Row count unchanged at {current_count}')

run_test(test_id="5.2a", test_name="Idempotency - 3 identical runs produce no changes", 
         condition=idempotent,
         fail_msg=f"Expected {initial_count} rows, got {current_count}")


### Test 6.0: Deletes

**Testing:** Deletes, try delete a row from a source and enable it to delete from the fact.

**Change:** Delete a row from source and see if it can be deleted from the fact.

**Expected Behavior:**
- Row deleted from fact. Check pre and post row count and see if deleted as expected.


In [None]:
print(session.sql('delete from learning_db.unit_test.source_employee_pay where employee_id <> 2').collect())

In [None]:
display(show_table("learning_db.unit_test.source_employee_pay"))

In [None]:
display(show_table("learning_db.unit_test.fact_employee_pay"))

In [None]:
batch_id = f"fact_employee_pay_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}"
table_updater = TableUpdater(
    session, 
    'fact_employee_pay', 
    batch_id,
    schema_name='unit_test',
    etl_schema_name='unit_test',
    src_schema_name='unit_test'
)

In [None]:
table_updater.process_table_deletes()

In [None]:
display(show_table("learning_db.unit_test.fact_employee_pay"))

In [None]:
# Verify the record was deleted from fact table
final_count = session.sql('SELECT COUNT(*) as record_count FROM learning_db.unit_test.fact_employee_pay').collect()[0].RECORD_COUNT
run_test(test_id="6.0b", test_name="Delete test - record removed from fact table", 
         condition=final_count == 1,
         fail_msg=f"Expected 1 rows after delete, got {final_count}")

---
## Test Summary

### Test Categories
| Category | Tests | Description |
|----------|-------|-------------|
| **1.x** | Setup & Empty Load | ETL handles empty tables gracefully |
| **2.x** | Dimension Type 1 | Insert and update dimension records |
| **3.x** | Fact Tables | Insert and update fact records (composite keys) |
| **4.x** | Dimension Type 2 (SCD2) | Type 1/2 changes, historical chains, date ranges |
| **5.x** | Edge Cases | NULL handling, idempotency |
| **6.x** | Deletes | Delete row from fact |

### Manual Inspection Points
- Audit columns (`create_*` vs `last_update_*`) are set correctly
- Type 2 date ranges don't overlap
- NULL/empty values produce valid hash values


In [None]:
# ============================================
# FINAL TEST RESULTS
# ============================================
import pandas as pd

passed = sum(1 for t in test_results if t['passed'])
failed = sum(1 for t in test_results if not t['passed'])
total = len(test_results)

print("=" * 70)
if failed == 0:
    print(f"TEST RESULTS: {passed}/{total} PASSED")
else:
    print(f"TEST RESULTS: {passed}/{total} PASSED ({failed} FAILED)")
print("=" * 70)

# Display results as table
results_df = pd.DataFrame(test_results)[['test_id', 'test_name', 'status', 'details']]
results_df.columns = ['Test ID', 'Test Name', 'Result', 'Details']
display(results_df)

if failed > 0:
    print("\n** SOME TESTS FAILED - Review the details above **")
else:
    print("\nALL TESTS PASSED")
