# CDC Pipeline Unit Tests Runner

This notebook executes unit tests for the CDC pipeline using **dbx_test** and **pytest**.

## Features
- **pytest fixtures**: Reusable test setup components
- **pytest.mark.parametrize**: Data-driven testing with multiple parameter sets
- **dbx_test integration**: Execute tests directly from Databricks notebooks

## Test Coverage
- `01-CDC-CDF-simple-pipeline.ipynb`: Deduplication, merge logic, CDF processing
- `02-CDC-CDF-full-multi-tables.ipynb`: Multi-table processing, concurrent operations


## 1. Install Required Packages


In [0]:
%pip install dbx-test pytest pytest-html --quiet


In [0]:
# Restart Python to pick up new packages
dbutils.library.restartPython()


## 2. Setup Test Environment


In [0]:
import sys
import os
import pytest
from pathlib import Path

# Get the notebook's directory path
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
workspace_path = "/Workspace" + notebook_path.rsplit("/", 1)[0]

# Add tests directory to Python path
if workspace_path not in sys.path:
    sys.path.insert(0, workspace_path)

print(f"Test directory: {workspace_path}")
print(f"Python path updated: {workspace_path in sys.path}")


## 3. Define Test Configuration


In [0]:
# Test configuration
TEST_CONFIG = {
    "verbose": True,
    "show_locals": True,
    "capture": "no",  # Show print statements during tests
    "markers": None,  # Run all markers, or specify: "not slow"
    "test_files": [
        "test_simple_pipeline.py",
        "test_multi_tables.py"
    ]
}

print("Test Configuration:")
for key, value in TEST_CONFIG.items():
    print(f"  {key}: {value}")


## 4. Run All Tests


In [0]:
def run_pytest_tests(test_path: str, verbose: bool = True, 
                     show_locals: bool = True, markers: str = None) -> int:
    """
    Execute pytest tests and return exit code.
    
    Args:
        test_path: Path to test file or directory
        verbose: Enable verbose output
        show_locals: Show local variables in tracebacks
        markers: Pytest marker expression to filter tests
        
    Returns:
        pytest exit code (0 = success, non-zero = failure)
    """
    args = [test_path]
    
    if verbose:
        args.append("-v")
    
    if show_locals:
        args.append("--tb=short")
        args.append("--showlocals")
    
    if markers:
        args.extend(["-m", markers])
    
    # Add color output
    args.append("--color=yes")
    
    # Disable warnings for cleaner output
    args.append("-W")
    args.append("ignore::DeprecationWarning")
    
    print(f"\n{'='*60}")
    print(f"Running: pytest {' '.join(args)}")
    print(f"{'='*60}\n")
    
    return pytest.main(args)


In [0]:
# Run all tests
exit_code = run_pytest_tests(
    test_path=workspace_path,
    verbose=TEST_CONFIG["verbose"],
    show_locals=TEST_CONFIG["show_locals"],
    markers=TEST_CONFIG["markers"]
)

print(f"\n{'='*60}")
if exit_code == 0:
    print("✅ All tests PASSED!")
else:
    print(f"❌ Tests FAILED with exit code: {exit_code}")
print(f"{'='*60}")


## 5. Run Specific Test Modules


In [0]:
# Run only Simple Pipeline tests
print("Running Simple Pipeline Tests...")
simple_pipeline_result = run_pytest_tests(
    test_path=f"{workspace_path}/test_simple_pipeline.py",
    verbose=True
)


In [0]:
# Run only Multi-Table tests
print("Running Multi-Table Tests...")
multi_table_result = run_pytest_tests(
    test_path=f"{workspace_path}/test_multi_tables.py",
    verbose=True
)


## 6. Run Specific Test Classes


In [0]:
# Run specific test class - Deduplication tests
print("Running Deduplication Tests Only...")
dedup_result = run_pytest_tests(
    test_path=f"{workspace_path}/test_simple_pipeline.py::TestDeduplication",
    verbose=True
)


In [0]:
# Run specific test class - Merge Logic tests
print("Running Merge Logic Tests Only...")
merge_result = run_pytest_tests(
    test_path=f"{workspace_path}/test_simple_pipeline.py::TestMergeLogic",
    verbose=True
)


## 7. Run Parameterized Tests


In [0]:
# Run a specific parameterized test
print("Running Parameterized Deduplication Test...")
param_result = run_pytest_tests(
    test_path=f"{workspace_path}/test_simple_pipeline.py::TestDeduplication::test_deduplication_keeps_single_record",
    verbose=True
)


## 8. Generate Test Report


In [0]:
def generate_test_report(test_path: str, report_path: str = "/tmp/test_report.html") -> int:
    """
    Generate HTML test report using pytest-html.
    
    Args:
        test_path: Path to test file or directory
        report_path: Output path for HTML report
        
    Returns:
        pytest exit code
    """
    args = [
        test_path,
        "-v",
        "--tb=short",
        f"--html={report_path}",
        "--self-contained-html",
        "-W", "ignore::DeprecationWarning"
    ]
    
    print(f"Generating test report at: {report_path}")
    exit_code = pytest.main(args)
    
    if exit_code == 0:
        print(f"\n✅ Report generated successfully: {report_path}")
    
    return exit_code

# Generate report
report_exit_code = generate_test_report(workspace_path)


## 9. Test Summary and Statistics


In [0]:
# Display test statistics
print("Test Modules and Classes:")
print("=" * 60)
print()
print("test_simple_pipeline.py:")
print("  - TestDeduplication: Tests for CDC data deduplication")
print("  - TestMergeLogic: Tests for MERGE INTO operations")
print("  - TestCDFProcessing: Tests for Change Data Feed")
print("  - TestDataCleaning: Tests for data transformations")
print("  - TestEndToEndPipeline: Integration tests")
print("  - TestSchemaHandling: Schema validation tests")
print()
print("test_multi_tables.py:")
print("  - TestMultiTableDeduplication: Multi-table dedup tests")
print("  - TestColumnMapping: Column mapping tests")
print("  - TestMultiTableMergeLogic: Multi-table merge tests")
print("  - TestSchemaValidation: Schema validation tests")
print("  - TestBatchProcessing: Batch processing tests")
print("  - TestConcurrentProcessing: Concurrency tests")
print("  - TestErrorHandling: Error handling tests")
print("  - TestDataIntegrity: Data integrity tests")


## 10. dbx_test Integration for Remote Execution


In [0]:
# dbx_test integration for running tests on Databricks clusters
try:
    from dbx_test import dbx_test
    
    # Example dbx_test configuration
    DBX_TEST_CONFIG = {
        "cluster_id": None,  # Set your cluster ID here
        "test_path": workspace_path,
        "timeout": 600,  # 10 minutes
    }
    
    print("dbx_test is available for remote test execution.")
    print("To run tests remotely, configure DBX_TEST_CONFIG with your cluster_id.")
    print()
    print("Example usage:")
    print('  !dbx_test run --cluster-id <your-cluster-id> --test-path tests/')
    
except ImportError:
    print("dbx_test not available. Install with: %pip install dbx-test")


## 11. Custom Test Execution with Filters


In [0]:
def run_filtered_tests(test_path: str, keyword: str = None, 
                       exclude_keyword: str = None) -> int:
    """
    Run tests with keyword filtering.
    
    Args:
        test_path: Path to test directory
        keyword: Only run tests matching this keyword (-k)
        exclude_keyword: Exclude tests matching this keyword
        
    Returns:
        pytest exit code
    """
    args = [test_path, "-v", "--tb=short"]
    
    if keyword and exclude_keyword:
        args.extend(["-k", f"{keyword} and not {exclude_keyword}"])
    elif keyword:
        args.extend(["-k", keyword])
    elif exclude_keyword:
        args.extend(["-k", f"not {exclude_keyword}"])
    
    return pytest.main(args)

# Example: Run only tests with 'dedup' in the name
# run_filtered_tests(workspace_path, keyword="dedup")

# Example: Run all tests except 'concurrent'
# run_filtered_tests(workspace_path, exclude_keyword="concurrent")


## 12. Quick Test Validation


In [0]:
# Quick validation - run a subset of fast tests
print("Quick Test Validation - Running fast tests only...")
print("=" * 60)

# Run just the basic tests for quick validation
quick_tests = [
    "TestDeduplication::test_deduplication_keeps_single_record",
    "TestMergeLogic::test_insert_new_record",
    "TestMultiTableDeduplication::test_deduplication_per_table"
]

for test in quick_tests:
    # Find the correct file for each test
    if "MultiTable" in test:
        test_file = "test_multi_tables.py"
    else:
        test_file = "test_simple_pipeline.py"
    
    result = pytest.main([f"{workspace_path}/{test_file}::{test}", "-v", "--tb=line"])
    status = "✅" if result == 0 else "❌"
    print(f"{status} {test}")


---

## Test Documentation

### Fixtures Available (from conftest.py)

| Fixture | Scope | Description |
|---------|-------|-------------|
| `spark` | session | SparkSession for testing |
| `cdc_schema` | module | Schema for CDC input data |
| `silver_schema` | module | Schema for Silver layer |
| `gold_schema` | module | Schema for Gold layer |
| `base_timestamp` | function | Base timestamp for test data |
| `sample_cdc_data` | function | Sample CDC data |
| `sample_cdc_with_updates` | function | CDC data with updates |
| `sample_cdc_with_deletes` | function | CDC data with deletes |
| `sample_cdc_with_duplicates` | function | CDC data for dedup testing |
| `create_test_table` | function | Factory for creating test tables |

### Parametrized Tests

Many tests use `@pytest.mark.parametrize` for data-driven testing:

```python
@pytest.mark.parametrize("num_duplicates,expected_count", [
    (1, 1),   # Single record
    (3, 1),   # Three duplicates
    (5, 1),   # Five duplicates
])
def test_deduplication_keeps_single_record(...):
    ...
```

### Running Specific Parameter Sets

```python
# Run specific parameter combination
pytest tests/test_simple_pipeline.py::TestDeduplication::test_deduplication_keeps_single_record[3-1] -v
```
