# Test Suite: Bronze Layer Audit Logging Infrastructure

**Purpose:** Validate the structure and configuration of `bronze.load_log` observability table

**Scope:**
- Extension requirements (pgcrypto for UUID generation)
- Schema and table existence
- Column definitions (names, types, nullability, defaults)
- Primary key and sequence configuration
- All 9 performance indexes
- CHECK constraints (status, phase validation)
- Data insertion and retrieval functionality

**Testing Strategy:**
- Extension validation (pgcrypto availability)
- Structural validation (11 columns with correct types)
- Index coverage (9 indexes for query optimization)
- Constraint enforcement (2 CHECK constraints for data quality)
- Integration testing (INSERT, query, cleanup)
- Default value verification (sequence, clock_timestamp)

**Prerequisites:**
- PostgreSQL server running
- sql_retail_analytics_warehouse database exists
- bronze schema exists (created by setup/create_schemas.sql)
- `scripts/bronze/ddl_bronze_log.sql` has been executed
- Connection credentials available
- Required packages: psycopg2, pytest, ipytest, pandas

## Setup: Import Dependencies & Configure Connection

In [1]:
import os
import psycopg2
from psycopg2 import sql
import pytest
import ipytest
import pandas as pd

# Configure ipytest for notebook usage
ipytest.autoconfig()

# Database connection parameters
DB_CONFIG = {
    'host': 'localhost',
    'database': 'sql_retail_analytics_warehouse',
    'user': 'postgres',
    'password': os.getenv('POSTGRES_PASSWORD', 'your_password_here')
}

print("✅ Dependencies imported successfully")

✅ Dependencies imported successfully


## Fixture: Database Connection

In [2]:
@pytest.fixture(scope='module')
def db_connection():
    """Create a database connection for tests."""
    conn = psycopg2.connect(**DB_CONFIG)
    conn.autocommit = True
    yield conn
    conn.close()

@pytest.fixture(scope='module')
def db_cursor(db_connection):
    """Create a cursor for executing queries."""
    cursor = db_connection.cursor()
    yield cursor
    cursor.close()

print("✅ Fixtures defined")

✅ Fixtures defined


## Test Suite 1: Extension and Schema Validation

**Tests in this suite:**
1. `test_pgcrypto_extension_exists` - Validates pgcrypto extension is installed
2. `test_bronze_schema_exists` - Validates bronze schema exists

**How these tests work:**

**Test 1: pgcrypto Extension Availability**
- Queries `pg_extension` system catalog for 'pgcrypto' extension
- ✅ **Success:** COUNT(*) = 1 (extension installed and active)
- ❌ **Failure:** COUNT(*) = 0 (extension missing, needs CREATE EXTENSION)
- **Purpose:** Ensures UUID generation capability is available
- **Why required:** `gen_random_uuid()` function used for generating batch run_id values
- **When created:** DDL script creates with `CREATE EXTENSION IF NOT EXISTS pgcrypto`
- **Privilege note:** Requires superuser or database owner to create extensions

**Test 2: Bronze Schema Existence**
- Queries `information_schema.schemata` for 'bronze' schema
- ✅ **Success:** COUNT(*) = 1 (schema exists)
- ❌ **Failure:** COUNT(*) = 0 (schema missing, run setup/create_schemas.sql first)
- **Purpose:** Validates medallion architecture bronze layer exists
- **Prerequisite:** bronze schema should be created before running bronze DDL scripts

**🔧 Extension Background:**

**pgcrypto Extension:**
- **Purpose:** Provides cryptographic functions including UUID generation
- **Key function:** `gen_random_uuid()` - generates version 4 (random) UUIDs
- **Alternative:** PostgreSQL 13+ has built-in `gen_random_uuid()` without extension
- **Backwards compatibility:** pgcrypto works on PostgreSQL 9.4+
- **Use in bronze.load_log:** Every ETL batch gets unique run_id for grouping log entries

**Why UUID over SERIAL for run_id:**
- **Uniqueness:** Globally unique across databases, servers, time
- **Distribution:** Random values prevent hot spots in indexes
- **Mergeability:** Can combine logs from multiple environments without conflicts
- **Security:** Non-sequential (doesn't leak batch count information)
- **Standard:** RFC 4122 compliance for interoperability

In [3]:
%%ipytest -vv

def test_pgcrypto_extension_exists(db_cursor):
    """Verify pgcrypto extension is installed (required for gen_random_uuid())."""
    db_cursor.execute("""
        SELECT COUNT(*) 
        FROM pg_extension 
        WHERE extname = 'pgcrypto'
    """)
    count = db_cursor.fetchone()[0]
    assert count == 1, "pgcrypto extension must be installed"

def test_bronze_schema_exists(db_cursor):
    """Verify bronze schema exists."""
    db_cursor.execute("""
        SELECT COUNT(*) 
        FROM information_schema.schemata 
        WHERE schema_name = 'bronze'
    """)
    count = db_cursor.fetchone()[0]
    assert count == 1, "bronze schema must exist"

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 2 items

t_a9c6325c3b224f11b58997d527c0ace9.py::test_pgcrypto_extension_exists [32mPASSED[0m[32m                 [ 50%][0m
t_a9c6325c3b224f11b58997d527c0ace9.py::test_bronze_schema_exists [32mPASSED[0m[32m                      [100%][0m



## Test Suite 2: Table Existence and Basic Structure

**Tests in this suite:**
1. `test_load_log_table_exists` - Validates bronze.load_log table exists
2. `test_load_log_column_count` - Validates table has exactly 11 columns

**How these tests work:**

**Test 1: Table Existence**
- Queries `information_schema.tables` for bronze.load_log
- Filters by schema='bronze' AND table_name='load_log'
- ✅ **Success:** COUNT(*) = 1 (table exists and is unique)
- ❌ **Failure:** COUNT(*) = 0 (table missing) or >1 (duplicate tables)
- **Purpose:** Confirms DDL script executed successfully
- **When created:** `CREATE TABLE IF NOT EXISTS bronze.load_log` in ddl_bronze_log.sql

**Test 2: Column Count Validation**
- Queries `information_schema.columns` for all columns in bronze.load_log
- ✅ **Success:** COUNT(*) = 11 (complete column set)
- ❌ **Failure:** COUNT(*) ≠ 11 (columns missing or extras added)
- **Purpose:** Quick structural integrity check before detailed validation
- **Complete column list:**
  1. id (BIGSERIAL PRIMARY KEY)
  2. run_id (UUID - batch identifier)
  3. phase (TEXT - operation type)
  4. table_name (TEXT - target table)
  5. file_path (TEXT - source file)
  6. status (TEXT - OK/ERROR)
  7. rows_loaded (BIGINT - count)
  8. started_at (TIMESTAMPTZ - start time)
  9. finished_at (TIMESTAMPTZ - end time)
  10. duration_sec (INTEGER - elapsed seconds)
  11. message (TEXT - details/errors)

**📊 bronze.load_log Purpose:**

**Observability Requirements:**
- **What:** Tracks every ETL operation across all bronze layer loads
- **When:** Logs written during COPY operations from CSV to bronze tables
- **Why:** Debugging, auditing, performance monitoring, compliance
- **How:** Append-only log (never UPDATE or DELETE in production)

**Log Entry Types:**
- **START:** Batch begins (single entry per run_id)
- **TRUNCATE:** Table truncated before load
- **COPY:** CSV data loaded into table (logs rows_loaded)
- **SEPARATOR:** Milestone marker between operations
- **FINISH:** Batch completes successfully
- **ERROR:** Operation failed (captures SQLERRM message)

**Operational Benefits:**
- **Debugging:** Identify which file/table caused failures
- **Performance:** Track duration_sec to find slow operations
- **Auditing:** Complete history of all data loads
- **Monitoring:** Query for ERROR status to trigger alerts
- **Compliance:** Immutable log for regulatory requirements

In [4]:
%%ipytest -vv

def test_load_log_table_exists(db_cursor):
    """Verify bronze.load_log table exists."""
    db_cursor.execute("""
        SELECT COUNT(*) 
        FROM information_schema.tables 
        WHERE table_schema = 'bronze' 
          AND table_name = 'load_log'
    """)
    count = db_cursor.fetchone()[0]
    assert count == 1, "bronze.load_log table must exist"

def test_load_log_column_count(db_cursor):
    """Verify bronze.load_log has expected number of columns."""
    db_cursor.execute("""
        SELECT COUNT(*) 
        FROM information_schema.columns 
        WHERE table_schema = 'bronze' 
          AND table_name = 'load_log'
    """)
    count = db_cursor.fetchone()[0]
    assert count == 11, "bronze.load_log should have 11 columns"

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 2 items

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_table_exists [32mPASSED[0m[32m                     [ 50%][0m
t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_column_count [32mPASSED[0m[32m                     [100%][0m



## Test Suite 3: Column Definitions

**Tests in this suite:**
1. `test_load_log_column_definitions` - Validates all 11 columns with correct types and nullability
2. `test_load_log_primary_key` - Validates id column is the primary key

**How these tests work:**

**Test 1: Column Definition Validation**
- Queries `information_schema.columns` for all column metadata
- Builds dictionary: `{column_name: {type, nullable}}` from query results
- Compares against expected definitions for all 11 columns
- ✅ **Success:** All columns exist with correct types and nullability
- ❌ **Failure:** Missing columns, type mismatches, or incorrect NULL constraints

**Detailed Column Specifications:**

**Identity & Grouping:**
- **id** (bigint, NOT NULL)
  - Auto-incrementing surrogate key via BIGSERIAL
  - Primary key for unique row identification
  - Supports up to 9.2 quintillion rows before wraparound
  
- **run_id** (uuid, NOT NULL)
  - Groups all log entries for a single batch execution
  - Generated via `gen_random_uuid()` at batch start
  - All operations in same batch share same run_id
  - Enables batch-level queries and analysis

**Operation Metadata:**
- **phase** (text, NOT NULL)
  - Identifies operation type: START, TRUNCATE, COPY, SEPARATOR, FINISH, ERROR
  - CHECK constraint enforces valid values
  - Used for filtering logs by operation type
  
- **table_name** (text, nullable)
  - Schema-qualified table name (e.g., 'bronze.erp_cust_az12')
  - NULL for batch-level events (START, FINISH)
  - Populated for table-specific operations (TRUNCATE, COPY)
  
- **file_path** (text, nullable)
  - Absolute path to source CSV file (server-side path)
  - NULL for non-file operations (TRUNCATE, SEPARATOR)
  - Critical for data lineage and reloading

**Status & Metrics:**
- **status** (text, NOT NULL)
  - Operation outcome: 'OK' or 'ERROR'
  - CHECK constraint enforces valid values
  - Primary field for error detection queries
  
- **rows_loaded** (bigint, nullable)
  - Number of rows successfully copied
  - NULL for non-COPY operations
  - Validates expected row counts from source files

**Timing:**
- **started_at** (timestamp with time zone, NOT NULL)
  - Operation start timestamp with timezone awareness
  - Defaults to `clock_timestamp()` (actual clock time, not transaction time)
  - Used for time-range queries and chronological ordering
  
- **finished_at** (timestamp with time zone, nullable)
  - Operation completion timestamp
  - NULL for in-progress or failed operations
  - Used to calculate duration_sec
  
- **duration_sec** (integer, nullable)
  - Elapsed time in seconds (finished_at - started_at)
  - NULL if operation incomplete
  - Performance analysis and slow operation detection

**Details:**
- **message** (text, nullable)
  - Free-form descriptive text for successful operations
  - PostgreSQL SQLERRM content for failures
  - Human-readable context for debugging

**Test 2: Primary Key Validation**
- Queries `pg_index` and `pg_attribute` for primary key columns
- ✅ **Success:** Primary key is exactly 'id' column
- ❌ **Failure:** No PK, wrong column, or composite PK
- **Purpose:** Ensures unique row identification and prevents duplicates

**🎯 Design Rationale:**

**TEXT vs VARCHAR:**
- All string columns use TEXT (unbounded length)
- No performance difference in PostgreSQL
- Avoids arbitrary length limits
- Simpler schema (no need to specify lengths)

**TIMESTAMPTZ over TIMESTAMP:**
- Timezone-aware timestamps prevent ambiguity
- Handles daylight saving time correctly
- Stores in UTC, displays in session timezone
- Critical for distributed systems

**BIGINT for row counts:**
- Supports tables with billions of rows
- Future-proof for data growth
- INTEGER would limit to 2.1 billion rows

In [5]:
%%ipytest -vv

def test_load_log_column_definitions(db_cursor):
    """Verify all required columns exist with correct data types."""
    db_cursor.execute("""
        SELECT 
            column_name,
            data_type,
            is_nullable
        FROM information_schema.columns
        WHERE table_schema = 'bronze'
          AND table_name = 'load_log'
        ORDER BY ordinal_position
    """)
    
    columns = db_cursor.fetchall()
    column_dict = {col[0]: {'type': col[1], 'nullable': col[2]} for col in columns}
    
    # Expected column definitions
    expected_columns = {
        'id': {'type': 'bigint', 'nullable': 'NO'},
        'run_id': {'type': 'uuid', 'nullable': 'NO'},
        'phase': {'type': 'text', 'nullable': 'NO'},
        'table_name': {'type': 'text', 'nullable': 'YES'},
        'file_path': {'type': 'text', 'nullable': 'YES'},
        'status': {'type': 'text', 'nullable': 'NO'},
        'rows_loaded': {'type': 'bigint', 'nullable': 'YES'},
        'started_at': {'type': 'timestamp with time zone', 'nullable': 'NO'},
        'finished_at': {'type': 'timestamp with time zone', 'nullable': 'YES'},
        'duration_sec': {'type': 'integer', 'nullable': 'YES'},
        'message': {'type': 'text', 'nullable': 'YES'}
    }
    
    # Verify each expected column
    for col_name, expected in expected_columns.items():
        assert col_name in column_dict, f"Column '{col_name}' must exist"
        assert column_dict[col_name]['type'] == expected['type'], \
            f"Column '{col_name}' should be {expected['type']}, got {column_dict[col_name]['type']}"
        assert column_dict[col_name]['nullable'] == expected['nullable'], \
            f"Column '{col_name}' nullable mismatch"

def test_load_log_primary_key(db_cursor):
    """Verify id column is the primary key."""
    db_cursor.execute("""
        SELECT a.attname
        FROM pg_index i
        JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
        WHERE i.indrelid = 'bronze.load_log'::regclass
          AND i.indisprimary
    """)
    pk_columns = [row[0] for row in db_cursor.fetchall()]
    assert pk_columns == ['id'], "Primary key should be 'id' column only"

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 2 items

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_column_definitions [32mPASSED[0m[32m               [ 50%][0m
t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_primary_key [32mPASSED[0m[32m                      [100%][0m



## Test Suite 4: Index Validation

**Tests in this suite:**
1. `test_load_log_indexes_exist` - Validates all 9 indexes are created correctly

**How this test works:**

**Index Coverage Validation:**
- Queries `pg_indexes` system catalog for all indexes on bronze.load_log
- ✅ **Success:** All 9 expected indexes exist (plus 1 automatic PK index = 10 total)
- ❌ **Failure:** Missing indexes or unexpected index count
- **Purpose:** Ensures query performance optimization infrastructure is in place

**Expected Indexes (9 total):**

1. **idx_load_log_run_id** - Index on `run_id` column
   - **Query pattern:** Retrieve all log entries for a specific batch
   - **Use case:** `WHERE run_id = '...'` - batch debugging and analysis
   - **Cardinality:** High (one UUID per batch execution)
   
2. **idx_load_log_phase** - Index on `phase` column
   - **Query pattern:** Filter by operation type
   - **Use case:** `WHERE phase = 'ERROR'` - find all failures
   - **Cardinality:** Low (7 distinct values: START, TRUNCATE, COPY, SEPARATOR, FINISH, ERROR)
   
3. **idx_load_log_table_name** - Index on `table_name` column
   - **Query pattern:** Per-table load history
   - **Use case:** `WHERE table_name = 'bronze.erp_cust_az12'` - table-specific drilldowns
   - **Cardinality:** Medium (one per bronze table)
   
4. **idx_load_log_file_path** - Index on `file_path` column
   - **Query pattern:** Source file provenance and lineage
   - **Use case:** `WHERE file_path LIKE '%cust_info.csv'` - track file reloads
   - **Cardinality:** High (varies with data refresh frequency)
   
5. **idx_load_log_status** - Index on `status` column
   - **Query pattern:** Error detection and health monitoring
   - **Use case:** `WHERE status = 'ERROR'` - quick error scans for alerting
   - **Cardinality:** Very low (2 values: OK, ERROR)
   
6. **idx_load_log_rows_loaded** - Index on `rows_loaded` column
   - **Query pattern:** Volume analysis and anomaly detection
   - **Use case:** `WHERE rows_loaded > 1000000` - heavy vs light loads
   - **Cardinality:** High (continuous numeric values)
   
7. **idx_load_log_started_at** - Index on `started_at` column
   - **Query pattern:** Time-range queries and chronological analysis
   - **Use case:** `WHERE started_at >= '2025-01-01'` - monthly/daily reports
   - **Cardinality:** Very high (timestamp precision to microseconds)
   
8. **idx_load_log_finished_at** - Index on `finished_at` column
   - **Query pattern:** Completion time analysis
   - **Use case:** `WHERE finished_at BETWEEN ... AND ...` - SLA monitoring
   - **Cardinality:** Very high (timestamp precision)
   
9. **idx_load_log_duration** - Index on `duration_sec` column
   - **Query pattern:** Performance profiling and optimization
   - **Use case:** `WHERE duration_sec > 300 ORDER BY duration_sec DESC` - slow operations
   - **Cardinality:** Medium (seconds precision, limited range)

**Plus Primary Key Index:**
- Automatically created on `id` column
- Usually named `load_log_pkey`
- B-tree index for unique constraint enforcement

**📈 Index Strategy Rationale:**

**Why 9 Indexes on an Audit Table?**

**Query Patterns Supported:**
- **Batch analysis:** run_id index groups all operations for a batch
- **Error monitoring:** status index enables fast `WHERE status = 'ERROR'` scans
- **Performance profiling:** duration_sec index finds slow operations
- **Table-specific history:** table_name index for per-table debugging
- **Time-series analysis:** started_at/finished_at for trend analysis

**Performance vs Storage Trade-offs:**
- **Write cost:** Indexes slow INSERT operations (acceptable for low-volume logs)
- **Storage cost:** ~10-20% overhead per index (worth it for query speed)
- **Read benefit:** 100-1000x faster queries for common patterns
- **Append-only pattern:** No UPDATE/DELETE means no index maintenance overhead

**Why NOT Index message Column:**
- Free-form text (high cardinality, low query value)
- Full-text search would require GIN index (expensive)
- Typically queried after narrowing by other columns

**Monitoring Queries Enabled:**
```sql
-- Find recent errors (status + started_at indexes)
WHERE status = 'ERROR' AND started_at > NOW() - INTERVAL '1 day'

-- Slow operations by table (table_name + duration_sec indexes)
WHERE table_name = 'bronze.crm_sales_details' AND duration_sec > 60

-- Batch timeline (run_id + started_at indexes)
WHERE run_id = '...' ORDER BY started_at
```

In [6]:
%%ipytest -vv

def test_load_log_indexes_exist(db_cursor):
    """Verify all expected indexes are created."""
    db_cursor.execute("""
        SELECT indexname
        FROM pg_indexes
        WHERE schemaname = 'bronze'
          AND tablename = 'load_log'
        ORDER BY indexname
    """)
    
    indexes = [row[0] for row in db_cursor.fetchall()]
    
    # Expected indexes (excluding primary key index)
    expected_indexes = [
        'idx_load_log_duration',
        'idx_load_log_file_path',
        'idx_load_log_finished_at',
        'idx_load_log_phase',
        'idx_load_log_rows_loaded',
        'idx_load_log_run_id',
        'idx_load_log_started_at',
        'idx_load_log_status',
        'idx_load_log_table_name'
    ]
    
    for idx_name in expected_indexes:
        assert idx_name in indexes, f"Index '{idx_name}' must exist"
    
    # Verify we have at least 10 indexes (9 explicit + 1 PK)
    assert len(indexes) >= 10, f"Expected at least 10 indexes, found {len(indexes)}"

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 1 item

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_indexes_exist [32mPASSED[0m[32m                    [100%][0m



## Test Suite 5: Constraint Validation

**Tests in this suite:**
1. `test_load_log_check_constraints` - Validates CHECK constraints on status and phase columns

**How this test works:**

**CHECK Constraint Validation:**
- Queries `pg_constraint` system catalog for CHECK constraints
- Uses `pg_get_constraintdef()` to retrieve constraint definitions
- ✅ **Success:** Both CHECK constraints exist with correct value lists
- ❌ **Failure:** Constraints missing or incomplete value definitions
- **Purpose:** Ensures data quality rules are enforced at database level

**Constraint 1: status CHECK Constraint**

**Constraint name:** `load_log_status_chk`

**Definition:** `CHECK (status IN ('OK', 'ERROR'))`

**Allowed values:**
- **'OK'** - Operation completed successfully
- **'ERROR'** - Operation failed (error details in message column)

**Why only 2 values:**
- Binary outcome model: success or failure
- No intermediate states (operations are atomic)
- KISS principle: simple status model is easier to query and monitor

**Creation note:** `NOT VALID` on creation
- Constraint is defined but not immediately validated against existing data
- Allows fast DDL execution (no table scan)
- Future rows are validated; old rows can be fixed lazily
- Can validate later with: `ALTER TABLE ... VALIDATE CONSTRAINT load_log_status_chk`

**Constraint 2: phase CHECK Constraint**

**Constraint name:** `load_log_phase_chk`

**Definition:** `CHECK (phase IN ('START','TRUNCATE','COPY','SEPARATOR','FINISH','ERROR'))`

**Allowed values:**
- **'START'** - Batch execution begins (first log entry for run_id)
- **'TRUNCATE'** - Table truncated before loading
- **'COPY'** - Data copied from CSV to table (primary operation)
- **SEPARATOR'** - Milestone marker between logical sections
- **'FINISH'** - Batch execution completes successfully (last log entry)
- **'ERROR'** - Critical failure (batch aborted)

**Phase Flow:**
```
Normal batch: START → TRUNCATE → COPY → COPY → ... → FINISH
Failed batch: START → TRUNCATE → COPY → ERROR
```

**Why these phases:**
- **START:** Marks batch boundary, generates run_id
- **TRUNCATE:** Documents destructive operation (important for auditing)
- **COPY:** Core data load operation (most frequent phase)
- **SEPARATOR:** Visual grouping in logs (optional, organizational)
- **FINISH:** Confirms successful completion
- **ERROR:** Captures failure point for debugging

**Creation note:** `NOT VALID` on creation (same rationale as status constraint)

**🔒 CHECK Constraint Benefits:**

**Data Quality Enforcement:**
- **Prevents typos:** Can't insert `phase = 'COYP'` (typo) or `status = 'FAIL'` (wrong term)
- **Schema documentation:** Constraint lists valid values in database metadata
- **Query optimization:** PostgreSQL can eliminate impossible conditions
- **Type safety:** Complements TEXT type with controlled vocabulary

**NOT VALID Strategy:**
- **Fast deployments:** No table scan on constraint creation
- **Flexible validation:** Can fix existing data before validating
- **Safe evolution:** Add new phases without breaking old data
- **Production-friendly:** Zero downtime for constraint addition

**Why NOT Use ENUM Type:**
- **Flexibility:** Adding phase values requires ALTER TYPE (locks table)
- **Text compatibility:** No casting needed in queries
- **Cross-database:** TEXT + CHECK is more portable
- **Simplicity:** Easier to modify constraint than enum definition

**Validation Queries Enabled:**
```sql
-- All errors (status constraint ensures only valid values)
SELECT * FROM bronze.load_log WHERE status = 'ERROR'

-- Find batch failures (phase constraint ensures valid phase names)
SELECT * FROM bronze.load_log WHERE phase = 'ERROR'

-- Invalid queries caught at INSERT time
INSERT INTO bronze.load_log (run_id, phase, status) 
VALUES (gen_random_uuid(), 'INVALID', 'OK')  -- ❌ Fails: phase check
```

In [7]:
%%ipytest -vv

def test_load_log_check_constraints(db_cursor):
    """Verify CHECK constraints on status and phase columns."""
    db_cursor.execute("""
        SELECT conname, pg_get_constraintdef(oid) AS definition
        FROM pg_constraint
        WHERE conrelid = 'bronze.load_log'::regclass
          AND contype = 'c'
        ORDER BY conname
    """)
    
    constraints = {row[0]: row[1] for row in db_cursor.fetchall()}
    
    # Verify status constraint
    assert 'load_log_status_chk' in constraints, "status CHECK constraint must exist"
    assert 'OK' in constraints['load_log_status_chk'], "status constraint should include 'OK'"
    assert 'ERROR' in constraints['load_log_status_chk'], "status constraint should include 'ERROR'"
    
    # Verify phase constraint
    assert 'load_log_phase_chk' in constraints, "phase CHECK constraint must exist"
    phases = ['START', 'TRUNCATE', 'COPY', 'SEPARATOR', 'FINISH', 'ERROR']
    for phase in phases:
        assert phase in constraints['load_log_phase_chk'], \
            f"phase constraint should include '{phase}'"

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 1 item

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_check_constraints [32mPASSED[0m[32m                [100%][0m



## Test Suite 6: Default Values and Sequences

**Tests in this suite:**
1. `test_load_log_id_sequence` - Validates id column auto-increment via sequence
2. `test_load_log_started_at_default` - Validates started_at default to clock_timestamp()

**How these tests work:**

**Test 1: Auto-Increment Sequence**
- Queries `information_schema.columns` for id column's default value
- ✅ **Success:** Default contains 'nextval' (sequence-based auto-increment)
- ❌ **Failure:** No default or non-sequence default
- **Purpose:** Confirms BIGSERIAL creates sequence for primary key generation

**BIGSERIAL Mechanics:**

**What BIGSERIAL Does:**
```sql
-- BIGSERIAL is syntactic sugar for:
id BIGINT NOT NULL DEFAULT nextval('bronze.load_log_id_seq')
-- Plus: CREATE SEQUENCE bronze.load_log_id_seq OWNED BY bronze.load_log.id
```

**Sequence Properties:**
- **Name:** Typically `tablename_columnname_seq` (e.g., `load_log_id_seq`)
- **Range:** 1 to 9,223,372,036,854,775,807 (9.2 quintillion)
- **Auto-increment:** Each INSERT gets next value automatically
- **Ownership:** Sequence is dropped if column is dropped
- **Cache:** PostgreSQL caches 1 value by default (configurable)

**Why BIGSERIAL over SERIAL:**
- **SERIAL:** INTEGER range (2.1 billion max)
- **BIGSERIAL:** BIGINT range (9.2 quintillion max)
- **Audit table growth:** Can accumulate billions of log entries over years
- **No wraparound concerns:** Effectively unlimited for any realistic workload
- **Minimal overhead:** BIGINT is 8 bytes vs INTEGER's 4 bytes (negligible)

**Test 2: Timestamp Default Validation**
- Queries `information_schema.columns` for started_at default value
- ✅ **Success:** Default contains 'clock_timestamp'
- ❌ **Failure:** No default or wrong function
- **Purpose:** Ensures operation start time is captured automatically

**clock_timestamp() vs now() vs CURRENT_TIMESTAMP:**

**clock_timestamp():**
- **Returns:** Actual current time when function is called
- **Changes:** During transaction execution (real-time clock)
- **Use case:** Measure elapsed time within a transaction
- **Example:** Multiple calls in same transaction return different times

**now() / CURRENT_TIMESTAMP:**
- **Returns:** Transaction start time (frozen)
- **Changes:** Same value throughout entire transaction
- **Use case:** Consistent timestamp across all statements in transaction
- **Example:** All rows in same transaction get identical timestamp

**Why clock_timestamp() for started_at:**
```sql
-- Example showing the difference:
BEGIN;
  SELECT now();              -- Returns: 2025-10-30 10:00:00
  SELECT pg_sleep(5);        -- Wait 5 seconds
  SELECT now();              -- Returns: 2025-10-30 10:00:00 (same!)
  SELECT clock_timestamp();  -- Returns: 2025-10-30 10:00:05 (updated!)
COMMIT;
```

**Audit Log Requirements:**
- Need to measure actual elapsed time between operations
- Each log entry should capture real-world time, not transaction time
- duration_sec calculation requires real timestamps
- Batch may have multiple operations; each needs distinct timestamp

**🕐 Timing Precision Benefits:**

**Accurate Duration Calculation:**
```sql
-- With clock_timestamp():
duration_sec = EXTRACT(EPOCH FROM (finished_at - started_at))
-- Accurate to the microsecond

-- With now():
duration_sec = 0  -- All operations in same transaction show 0 duration!
```

**Performance Profiling:**
- Identify slow COPY operations (large files)
- Compare truncate vs copy durations
- Find bottlenecks in ETL pipeline
- Track improvements over time

**Real-World Scenarios:**
```sql
-- Batch with 3 table loads
START     - started_at: 10:00:00.000
TRUNCATE  - started_at: 10:00:00.015  (15ms after start)
COPY      - started_at: 10:00:00.030  (duration: 15ms)
TRUNCATE  - started_at: 10:00:05.500  (5.5 seconds later)
COPY      - started_at: 10:00:05.520  (duration: 20ms)
FINISH    - started_at: 10:00:10.800  (total: 10.8 seconds)
```

**Timezone Awareness:**
- TIMESTAMPTZ stores in UTC internally
- Displays in session's timezone
- Handles daylight saving time correctly
- Critical for distributed systems and reporting

In [8]:
%%ipytest -vv

def test_load_log_id_sequence(db_cursor):
    """Verify id column uses a sequence (BIGSERIAL)."""
    db_cursor.execute("""
        SELECT column_default
        FROM information_schema.columns
        WHERE table_schema = 'bronze'
          AND table_name = 'load_log'
          AND column_name = 'id'
    """)
    
    default_value = db_cursor.fetchone()[0]
    assert default_value is not None, "id column should have a default value"
    assert 'nextval' in default_value, "id should use a sequence"

def test_load_log_started_at_default(db_cursor):
    """Verify started_at has clock_timestamp() default."""
    db_cursor.execute("""
        SELECT column_default
        FROM information_schema.columns
        WHERE table_schema = 'bronze'
          AND table_name = 'load_log'
          AND column_name = 'started_at'
    """)
    
    default_value = db_cursor.fetchone()[0]
    assert default_value is not None, "started_at should have a default value"
    assert 'clock_timestamp' in default_value.lower(), "started_at should default to clock_timestamp()"

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 2 items

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_id_sequence [32mPASSED[0m[32m                      [ 50%][0m
t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_started_at_default [32mPASSED[0m[32m               [100%][0m



## Test Suite 7: Integration Test - Insert and Verify

**Tests in this suite:**
1. `test_load_log_insert_and_query` - End-to-end test of INSERT, query, and cleanup

**How this test works:**

**Full Integration Workflow:**

**Step 1: UUID Adapter Registration**
```python
from psycopg2.extras import register_uuid
register_uuid()
```
- Registers PostgreSQL UUID type adapter with psycopg2
- Enables passing Python `uuid.UUID` objects directly in queries
- Without this: `ProgrammingError: can't adapt type 'UUID'`
- Alternative: Convert to string `str(test_run_id)` but loses type safety

**Step 2: Generate Unique run_id**
```python
test_run_id = uuid.uuid4()
```
- Creates version 4 (random) UUID
- Format: `'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'` (32 hex digits + 4 hyphens)
- Uniqueness: 2^122 possible values (1 in 5.3 undecillion)
- Matches type of run_id column in bronze.load_log

**Step 3: Insert Test Record**
```sql
INSERT INTO bronze.load_log (run_id, phase, status, message) 
VALUES (%s, 'START', 'OK', 'Test run from pytest')
RETURNING id, run_id, phase, status
```
- Inserts minimal valid log entry (only NOT NULL columns)
- Relies on defaults:
  - `id`: Auto-generated via sequence (BIGSERIAL)
  - `started_at`: Auto-populated via clock_timestamp()
- `RETURNING` clause captures inserted values for verification
- Parameterized query (safe from SQL injection)

**Step 4: Verify Insertion**
- Fetches returned row with `db_cursor.fetchone()`
- Asserts returned values match expected:
  - Row exists (not None)
  - run_id matches generated UUID
  - phase is 'START'
  - status is 'OK'
- ✅ **Success:** All assertions pass
- ❌ **Failure:** Any assertion fails or INSERT error

**Step 5: Cleanup Test Data**
```sql
DELETE FROM bronze.load_log WHERE run_id = %s
```
- Removes test record to keep log clean
- Uses run_id for precise deletion (no side effects)
- Prevents test pollution in production-like environments
- Good practice: tests should be idempotent and self-cleaning

**🧪 Integration Test Value:**

**What This Test Validates:**
- **Table writability:** Can INSERT rows successfully
- **Sequence functionality:** id auto-increments correctly
- **Default values:** started_at populated automatically
- **Constraint enforcement:** CHECK constraints allow valid values
- **UUID handling:** psycopg2 adapter works correctly
- **Query functionality:** Can retrieve inserted data
- **Cleanup capability:** Can delete test data

**What This Test Catches:**
- **Permission issues:** User lacks INSERT privilege
- **Type mismatches:** UUID adapter not registered
- **Constraint violations:** Invalid phase/status values
- **Default failures:** clock_timestamp() not working
- **Sequence errors:** BIGSERIAL sequence broken

**Why Minimal Column Set:**
- Tests essential functionality only
- Doesn't require nullable columns (table_name, file_path, etc.)
- Faster execution (fewer columns to populate)
- Focuses on core requirements (run_id, phase, status)
- Leaves optional fields for real ETL operations

**Production Usage Pattern:**
```sql
-- Real ETL batch would create multiple entries:
INSERT INTO bronze.load_log (run_id, phase, status, message)
VALUES (@run_id, 'START', 'OK', 'Starting bronze layer refresh');

INSERT INTO bronze.load_log (run_id, phase, table_name, status)
VALUES (@run_id, 'TRUNCATE', 'bronze.crm_cust_info', 'OK');

INSERT INTO bronze.load_log (run_id, phase, table_name, file_path, rows_loaded, status)
VALUES (@run_id, 'COPY', 'bronze.crm_cust_info', '/data/cust_info.csv', 15234, 'OK');

INSERT INTO bronze.load_log (run_id, phase, status, message)
VALUES (@run_id, 'FINISH', 'OK', 'Batch completed successfully');
```

**Self-Cleaning Tests:**
- Production audit tables should never be cleaned
- Test environments need cleanup to avoid clutter
- run_id-based deletion is safe (no accidental deletions)
- Alternative: Use separate test schema or database

In [10]:
%%ipytest -vv

def test_load_log_insert_and_query(db_cursor):
    """Test inserting a record and querying it back."""
    import uuid
    from psycopg2.extras import register_uuid
    
    # Register UUID type adapter for psycopg2
    register_uuid()
    
    # Generate a unique run_id for this test
    test_run_id = uuid.uuid4()
    
    # Insert a test record
    db_cursor.execute("""
        INSERT INTO bronze.load_log (
            run_id, phase, status, message
        ) VALUES (
            %s, 'START', 'OK', 'Test run from pytest'
        )
        RETURNING id, run_id, phase, status
    """, (test_run_id,))
    
    result = db_cursor.fetchone()
    assert result is not None, "Insert should return a row"
    assert result[1] == test_run_id, "run_id should match"
    assert result[2] == 'START', "phase should be START"
    assert result[3] == 'OK', "status should be OK"
    
    # Clean up test data
    db_cursor.execute("""
        DELETE FROM bronze.load_log 
        WHERE run_id = %s
    """, (test_run_id,))
    
    print(f"✅ Successfully inserted and deleted test record with run_id: {test_run_id}")

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 1 item

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_insert_and_query collected 1 item

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_insert_and_query [32mPASSED[0m[32m                 [100%][0m[32mPASSED[0m[32m                 [100%][0m





## Summary: Run All Tests

In [12]:
# Run all tests in this notebook
ipytest.run('-vv')

platform win32 -- Python 3.12.4, pytest-8.4.2, pluggy-1.6.0 -- c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\Laurent\Studies\sql-ultimate-course\Udemy-SQL-Data-Warehouse-Project\tests\tests_bronze
plugins: anyio-4.11.0, nbmake-1.5.5
[1mcollecting ... [0mcollected 1 item

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_insert_and_query collected 1 item

t_a9c6325c3b224f11b58997d527c0ace9.py::test_load_log_insert_and_query [32mPASSED[0m[32m                 [100%][0m[32mPASSED[0m[32m                 [100%][0m





<ExitCode.OK: 0>

## Manual Inspection Queries

Use these queries to manually inspect the table structure:

In [None]:
# Connect and display table structure
conn = psycopg2.connect(**DB_CONFIG)

# View all columns
df_columns = pd.read_sql("""
    SELECT 
        column_name,
        data_type,
        is_nullable,
        column_default
    FROM information_schema.columns
    WHERE table_schema = 'bronze'
      AND table_name = 'load_log'
    ORDER BY ordinal_position
""", conn)

print("\n📊 bronze.load_log Columns:")
display(df_columns)

# View all indexes
df_indexes = pd.read_sql("""
    SELECT 
        indexname,
        indexdef
    FROM pg_indexes
    WHERE schemaname = 'bronze'
      AND tablename = 'load_log'
    ORDER BY indexname
""", conn)

print("\n📇 bronze.load_log Indexes:")
display(df_indexes)

# View constraints
df_constraints = pd.read_sql("""
    SELECT 
        conname AS constraint_name,
        contype AS constraint_type,
        pg_get_constraintdef(oid) AS definition
    FROM pg_constraint
    WHERE conrelid = 'bronze.load_log'::regclass
    ORDER BY conname
""", conn)

print("\n🔒 bronze.load_log Constraints:")
display(df_constraints)

conn.close()