# Silver C - Delta Live Tables

This notebook implements the Silver C stage using Delta Live Tables (DLT).
Silver C performs deduplication and filtering on Silver B tables, representing the final Silver layer stage.

## Key Features:
- Ingests Silver B delta tables
- Performs deduplication based on business key columns
- Applies filtering rules and data quality scoring
- Creates materialized Silver C tables for each source/lob/domain combination
- Uses DLT decorators for automatic table creation and dependency management
- Currently acts as pass-through but designed for future enhancements

## Setup and Configuration

In [None]:
# Import required libraries
import dlt
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.window import Window
from typing import Dict, List, Any, Optional
import logging

# Import pipeline modules
from utils.config_loader import config_loader
import silver_pipeline_stages as stages

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load configuration
config = config_loader.load_config()
source_combinations = config_loader.get_source_combinations()

# Get active Spark session
spark = SparkSession.getActiveSession()

print(f"Silver C DLT Pipeline - Configuration loaded")
print(f"Processing {len(source_combinations)} source combinations")
print(f"Silver schema: {config.silver_schema}")

## Deduplication and Filtering Configuration

In [None]:
# Configuration for deduplication and filtering logic
# These can be made configurable through business rules tables in the future

DEDUPLICATION_CONFIG = {
    # Domain-specific key columns for deduplication
    "pharmacy": ["patient_id", "ndc", "service_date"],
    "medical": ["patient_id", "procedure_code", "service_date"],  
    "member": ["member_id", "effective_date"]
}

# Default key columns if domain-specific not found
DEFAULT_DEDUP_KEYS = ["patient_id", "service_date"]

# Quality score thresholds (for future enhancement)
QUALITY_THRESHOLDS = {
    "min_quality_score": 0.7,
    "completeness_threshold": 0.8
}

print(f"Deduplication configuration loaded:")
for domain, keys in DEDUPLICATION_CONFIG.items():
    print(f"  {domain}: {keys}")
print(f"Default deduplication keys: {DEFAULT_DEDUP_KEYS}")

## Helper Functions for Silver C Processing

In [None]:
def get_deduplication_keys(domain: str) -> List[str]:
    """
    Get deduplication key columns for a specific domain
    
    Args:
        domain: Domain (pharmacy, medical, member)
        
    Returns:
        List of column names to use for deduplication
    """
    return DEDUPLICATION_CONFIG.get(domain, DEFAULT_DEDUP_KEYS)


def apply_deduplication(df: DataFrame, dedup_keys: List[str]) -> DataFrame:
    """
    Apply deduplication logic to DataFrame
    
    Args:
        df: Input DataFrame
        dedup_keys: List of columns to use for deduplication
        
    Returns:
        Deduplicated DataFrame
    """
    if not dedup_keys:
        logger.info("No deduplication keys provided, returning original DataFrame")
        return df
    
    # Check which dedup keys exist in the DataFrame
    available_keys = [key for key in dedup_keys if key in df.columns]
    
    if not available_keys:
        logger.warning(f"None of the deduplication keys {dedup_keys} found in DataFrame columns: {df.columns}")
        return df
    
    logger.info(f"Applying deduplication using keys: {available_keys}")
    
    # Add row number for deduplication (keep first occurrence)
    window_spec = Window.partitionBy(*available_keys).orderBy(F.lit(1))
    
    deduplicated_df = (
        df.withColumn("_row_num", F.row_number().over(window_spec))
          .filter(F.col("_row_num") == 1)
          .drop("_row_num")
    )
    
    original_count = df.count()
    deduplicated_count = deduplicated_df.count()
    duplicates_removed = original_count - deduplicated_count
    
    logger.info(f"Deduplication complete: {original_count} -> {deduplicated_count} rows (removed {duplicates_removed} duplicates)")
    
    return deduplicated_df


def add_quality_score(df: DataFrame) -> DataFrame:
    """
    Add data quality score to DataFrame (placeholder for future enhancement)
    
    Args:
        df: Input DataFrame
        
    Returns:
        DataFrame with quality score column added
    """
    logger.info("Adding quality score column (placeholder implementation)")
    
    # Placeholder implementation - calculate completeness score
    # In production, this would be more sophisticated
    total_columns = len(df.columns)
    
    # Count non-null values per row and calculate completeness ratio
    non_null_count = sum([F.when(F.col(col).isNotNull(), 1).otherwise(0) for col in df.columns])
    
    quality_df = df.withColumn(
        "quality_score",
        (non_null_count / total_columns).cast("decimal(3,2)")
    )
    
    logger.info("Quality score column added based on field completeness")
    return quality_df


def apply_quality_filtering(df: DataFrame, min_quality_score: float = 0.7) -> DataFrame:
    """
    Apply quality-based filtering (placeholder for future enhancement)
    
    Args:
        df: Input DataFrame with quality_score column
        min_quality_score: Minimum quality score threshold
        
    Returns:
        Filtered DataFrame
    """
    if "quality_score" not in df.columns:
        logger.warning("No quality_score column found, skipping quality filtering")
        return df
    
    logger.info(f"Applying quality filtering with minimum score: {min_quality_score}")
    
    original_count = df.count()
    filtered_df = df.filter(F.col("quality_score") >= min_quality_score)
    filtered_count = filtered_df.count()
    
    filtered_out = original_count - filtered_count
    logger.info(f"Quality filtering complete: {original_count} -> {filtered_count} rows (filtered out {filtered_out} low-quality rows)")
    
    return filtered_df

## Main Silver C Processing Function

In [None]:
def create_silver_c_table(source: str, lob: str, domain: str) -> DataFrame:
    """
    Create Silver C table for a specific source/lob/domain combination
    
    Args:
        source: Source identifier
        lob: Line of business
        domain: Domain (pharmacy, medical, member)
        
    Returns:
        Processed Silver C DataFrame
    """
    logger.info(f"Creating Silver C table for {source}/{lob}/{domain}")
    
    # Read Silver B table using DLT
    silver_b_table_name = f"{source}_{lob}_{domain}_silver_b"
    
    try:
        # Use DLT read to get the Silver B table
        silver_b_df = dlt.read(silver_b_table_name)
        logger.info(f"Successfully read Silver B table: {silver_b_table_name}")
        
        # Get row count for logging
        row_count = silver_b_df.count()
        logger.info(f"Silver B table contains {row_count} rows")
        
    except Exception as e:
        logger.error(f"Failed to read Silver B table {silver_b_table_name}: {e}")
        # Return empty DataFrame if Silver B doesn't exist
        empty_schema = StructType([StructField("placeholder", StringType(), True)])
        return spark.createDataFrame([], empty_schema)
    
    # Start Silver C processing
    result_df = silver_b_df
    
    # Step 1: Apply deduplication
    dedup_keys = get_deduplication_keys(domain)
    result_df = apply_deduplication(result_df, dedup_keys)
    
    # Step 2: Add quality score (placeholder for future enhancement)
    result_df = add_quality_score(result_df)
    
    # Step 3: Apply quality filtering (placeholder for future enhancement)
    min_quality_score = QUALITY_THRESHOLDS.get("min_quality_score", 0.7)
    # Commented out for now - enable when quality scoring is fully implemented
    # result_df = apply_quality_filtering(result_df, min_quality_score)
    
    # Step 4: Add processing metadata
    result_df = result_df.withColumn("processed_timestamp", F.current_timestamp())
    result_df = result_df.withColumn("pipeline_stage", F.lit("silver_c"))
    
    final_count = result_df.count()
    logger.info(f"Silver C processing complete - {final_count} rows, columns: {result_df.columns}")
    
    return result_df

## Data Quality Expectations for Silver C

In [None]:
def apply_silver_c_quality_checks(df: DataFrame, source: str, lob: str, domain: str) -> DataFrame:
    """
    Apply data quality checks specific to Silver C stage
    
    Args:
        df: Silver C DataFrame
        source: Source identifier
        lob: Line of business
        domain: Domain
        
    Returns:
        DataFrame with quality checks applied
    """
    if not config.validation_enabled:
        logger.info("Data quality validation is disabled")
        return df
    
    logger.info(f"Applying Silver C quality checks for {source}/{lob}/{domain}")
    
    # Apply standard required field validation
    validated_df = stages.validate_required_fields(df, config, source, lob, domain)
    
    # Additional Silver C specific validations
    
    # Check for duplicate records (should be minimal after deduplication)
    dedup_keys = get_deduplication_keys(domain)
    available_keys = [key for key in dedup_keys if key in validated_df.columns]
    
    if available_keys:
        duplicate_count = (
            validated_df.groupBy(*available_keys)
                       .count()
                       .filter(F.col("count") > 1)
                       .count()
        )
        
        if duplicate_count > 0:
            logger.warning(f"Found {duplicate_count} duplicate groups after deduplication for {source}/{lob}/{domain}")
        else:
            logger.info(f"No duplicates found after deduplication for {source}/{lob}/{domain}")
    
    # Check quality score distribution if present
    if "quality_score" in validated_df.columns:
        quality_stats = validated_df.select(
            F.avg("quality_score").alias("avg_quality"),
            F.min("quality_score").alias("min_quality"),
            F.max("quality_score").alias("max_quality")
        ).collect()[0]
        
        logger.info(f"Quality score statistics for {source}/{lob}/{domain}: "
                   f"avg={quality_stats['avg_quality']:.3f}, "
                   f"min={quality_stats['min_quality']:.3f}, "
                   f"max={quality_stats['max_quality']:.3f}")
    
    logger.info(f"Quality checks complete - {validated_df.count()} rows passed validation")
    return validated_df


def validate_silver_c_completeness(df: DataFrame, source: str, lob: str, domain: str) -> bool:
    """
    Validate that Silver C processing was completed successfully
    
    Args:
        df: Processed Silver C DataFrame
        source: Source identifier
        lob: Line of business
        domain: Domain
        
    Returns:
        Boolean indicating if processing was successful
    """
    try:
        # Check if DataFrame has data
        row_count = df.count()
        if row_count == 0:
            logger.warning(f"No data in Silver C table for {source}/{lob}/{domain}")
            return False
        
        # Check if required metadata columns are present
        required_metadata = ["processed_timestamp", "pipeline_stage"]
        missing_metadata = [col for col in required_metadata if col not in df.columns]
        
        if missing_metadata:
            logger.warning(f"Missing metadata columns in Silver C for {source}/{lob}/{domain}: {missing_metadata}")
            return False
        
        logger.info(f"Silver C completeness validation passed for {source}/{lob}/{domain} - {row_count} rows")
        return True
        
    except Exception as e:
        logger.error(f"Silver C completeness validation failed for {source}/{lob}/{domain}: {e}")
        return False

## DLT Table Definitions

Dynamic generation of DLT tables for each source/lob/domain combination

In [None]:
# Generate DLT table definitions for each source/lob/domain combination
for source, lob, domain in source_combinations:
    
    # Create a closure to capture the current values of source, lob, domain
    def make_silver_c_table(src, lb, dom):
        
        @dlt.table(
            name=f"{src}_{lb}_{dom}_silver_c",
            comment=f"Silver C stage for {src}/{lb}/{dom} - Final Silver layer with deduplication and quality filtering",
            table_properties={
                "quality": "silver",
                "layer": "silver_c",
                "source": src,
                "lob": lb,
                "domain": dom,
                "deduplication_keys": ",".join(get_deduplication_keys(dom))
            }
        )
        @dlt.expect_all_or_drop("valid_silver_c_data")
        def silver_c_table():
            """
            Create Silver C table with deduplication and filtering from Silver B
            """
            # Create Silver C DataFrame
            silver_c_df = create_silver_c_table(src, lb, dom)
            
            # Validate processing completeness
            if not validate_silver_c_completeness(silver_c_df, src, lb, dom):
                logger.warning(f"Silver C completeness validation failed for {src}/{lb}/{dom}")
            
            # Apply quality checks
            validated_df = apply_silver_c_quality_checks(silver_c_df, src, lb, dom)
            
            return validated_df
        
        return silver_c_table
    
    # Create the table function and add it to the global namespace
    table_func = make_silver_c_table(source, lob, domain)
    globals()[f"{source}_{lob}_{domain}_silver_c"] = table_func
    
    dedup_keys = get_deduplication_keys(domain)
    print(f"Created DLT table definition: {source}_{lob}_{domain}_silver_c (dedup keys: {dedup_keys})")

print(f"\nTotal Silver C tables defined: {len(source_combinations)}")

## Advanced DLT Expectations for Final Data Quality

In [None]:
# Advanced DLT expectations for Silver C final data quality
# These ensure the final Silver layer meets high-quality standards

def create_final_quality_expectations():
    """
    Create advanced DLT expectations for Silver C final quality validation
    """
    
    # Expect no duplicate records based on business keys
    @dlt.expect("no_duplicates_on_business_keys")
    def expect_no_duplicates(df):
        # This would be customized per domain
        return True  # Placeholder - actual implementation would check for duplicates
    
    # Expect reasonable quality scores
    @dlt.expect("acceptable_quality_scores")
    def expect_quality_scores(df):
        if "quality_score" in df.columns:
            return F.col("quality_score") >= 0.5
        return True
    
    # Expect processed timestamp to be recent
    @dlt.expect("recent_processing_timestamp")
    def expect_recent_timestamp(df):
        if "processed_timestamp" in df.columns:
            # Expect processing within last 24 hours
            return F.col("processed_timestamp") >= F.date_sub(F.current_timestamp(), 1)
        return True
    
    # Expect pipeline stage to be correctly set
    @dlt.expect("correct_pipeline_stage")
    def expect_pipeline_stage(df):
        if "pipeline_stage" in df.columns:
            return F.col("pipeline_stage") == "silver_c"
        return True
    
    logger.info("Advanced final quality expectations defined")

# Uncomment the line below to enable advanced final quality expectations
# create_final_quality_expectations()

print("Advanced final quality expectations available but not enabled by default")
print("Uncomment create_final_quality_expectations() call to enable")

## Pipeline Monitoring and Logging

In [None]:
# Log pipeline configuration for monitoring
logger.info("="*60)
logger.info("SILVER C DLT PIPELINE CONFIGURATION")
logger.info("="*60)
logger.info(f"Pipeline stage: Silver C (Deduplication & Filtering)")
logger.info(f"Silver schema: {config.silver_schema}")
logger.info(f"Validation enabled: {config.validation_enabled}")
logger.info(f"Source combinations: {len(source_combinations)}")
logger.info(f"Quality thresholds: {QUALITY_THRESHOLDS}")

logger.info("\nDeduplication configuration:")
for domain, keys in DEDUPLICATION_CONFIG.items():
    logger.info(f"  {domain}: {keys}")

logger.info("\nTable dependencies:")
for i, (source, lob, domain) in enumerate(source_combinations, 1):
    dedup_keys = get_deduplication_keys(domain)
    logger.info(f"  {i}. {source}/{lob}/{domain}:")
    logger.info(f"     Input:  {source}_{lob}_{domain}_silver_b")
    logger.info(f"     Output: {source}_{lob}_{domain}_silver_c")
    logger.info(f"     Dedup:  {dedup_keys}")

logger.info("="*60)
logger.info("SILVER C DLT PIPELINE READY")
logger.info("="*60)

## Usage Instructions

To use this notebook in Databricks DLT:

1. **Create a new DLT Pipeline** in the Databricks workspace
2. **Set the source** to this notebook (`silver_c_dlt.ipynb`)
3. **Configure pipeline settings**:
   - Target schema: `silver` (or your configured silver schema)
   - Pipeline mode: `Triggered` for batch processing
   - Cluster configuration: Based on your data volume
4. **Ensure dependencies**:
   - Silver B tables must exist (from Silver B DLT pipeline)
   - Utils modules must be available in the workspace
5. **Pipeline Dependencies**:
   - This pipeline should run AFTER the Silver B pipeline
   - Set up proper scheduling or triggering to ensure Silver B tables are available

**Input Tables**: This pipeline reads from:
- `{silver_schema}.{source}_{lob}_{domain}_silver_b`

**Output Tables**: This pipeline creates:
- `{silver_schema}.{source}_{lob}_{domain}_silver_c`

**Dependencies**: This pipeline depends on:
- Silver B tables: `{silver_schema}.{source}_{lob}_{domain}_silver_b`

**Processing Logic**:
1. **Deduplication**: Removes duplicate records based on domain-specific business keys
2. **Quality Scoring**: Adds quality scores based on data completeness (placeholder)
3. **Quality Filtering**: Filters out low-quality records (placeholder, currently disabled)
4. **Metadata Addition**: Adds processing timestamp and pipeline stage information

**Domain-Specific Deduplication Keys**:
- **Pharmacy**: patient_id, ndc, service_date
- **Medical**: patient_id, procedure_code, service_date
- **Member**: member_id, effective_date
- **Default**: patient_id, service_date

**Future Enhancements**:
- Advanced quality scoring algorithms
- Configurable deduplication rules through business rules tables
- Data lineage tracking
- Performance optimization for large datasets

**Monitoring**:
- Check DLT pipeline logs for deduplication statistics
- Monitor quality score distributions
- Validate that duplicate counts are minimal after processing