In [0]:
# Install required packages (run only once)
%pip install evidently==0.7.19 pyyaml==6.0.2 pandas==2.2.3 numpy==2.2.1 azure-storage-file-datalake==12.20.0 azure-identity==1.19.0 plotly==5.24.1

print("Packages installed successfully")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Packages installed successfully


In [0]:
# Import required libraries
import sys
import os
from pathlib import Path
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Add utils to path (adjust path as needed for Databricks)
# For Databricks, you should upload utils as a library or use %run
sys.path.append('/Workspace/Users/ashu.009kamboj@gmail.com/data-drift-evidently-ai')  # Update this path
sys.path.append(str(Path.cwd().parent))  # For local testing

# Import custom utilities
from utils import ConfigManager, DriftDetector, ReportManager, DataLoader, setup_logger

print("âœ“ All libraries imported successfully")

âœ“ All libraries imported successfully


## Initialize Spark Session

In [0]:
# Get or create Spark session
# In Databricks, 'spark' is already available
try:
    # Verify spark session
    spark
    print("âœ“ Using existing Spark session from Databricks")
    print(f"  Spark version: {spark.version}")
    #print(f"  Application name: {spark.sparkContext.appName}")
except NameError:
    # Create Spark session (for local testing)
    spark = SparkSession.builder \
        .appName("DataDriftDetection") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    print("âœ“ Created new Spark session")

# Display Spark configuration
print("\nSpark Configuration:")
#print(f"  Master: {spark.sparkContext.master}")
#print(f"  Default Parallelism: {spark.sparkContext.defaultParallelism}")

âœ“ Using existing Spark session from Databricks
  Spark version: 4.0.0

Spark Configuration:


## Configuration and Setup

In [0]:
# Configuration file path (adjust as needed)
CONFIG_PATH = '../config/drift_config.yaml'

# For Databricks, use workspace path
# CONFIG_PATH = '/Workspace/Repos/<your-repo>/data-drift-evidently-ai/config/drift_config.yaml'

# Initialize configuration manager
try:
    config = ConfigManager(CONFIG_PATH)
    print("âœ“ Configuration loaded successfully")
    print(f"  - Catalog: {config.get_catalog_name()}")
    print(f"  - Schema: {config.get_schema_name()}")
    print(f"  - Tables to monitor: {len(config.get_tables())}")
    
    # Display statistical tests configuration
    print(f"\nStatistical Tests:")
    print(f"  - Numerical: {', '.join(config.get_statistical_tests('numerical'))}")
    print(f"  - Categorical: {', '.join(config.get_statistical_tests('categorical'))}")
except Exception as e:
    print(f"âœ— Error loading configuration: {e}")
    raise

âœ“ Configuration loaded successfully
  - Catalog: data_catalog
  - Schema: outputs
  - Tables to monitor: 3

Statistical Tests:
  - Numerical: ks, wasserstein
  - Categorical: chisquare, jensenshannon


In [0]:
# Setup logging
logger = setup_logger(
    log_level=config.get_log_level(),
    logger_name='data_drift_pyspark',
    adls_config=config.get_adls_config() if config.is_adls_output_enabled() else None
)

logger.info("="*80)
logger.info("Data Drift Detection - PySpark Version")
logger.info("="*80)
logger.info(f"Configuration loaded from: {CONFIG_PATH}")
logger.info(f"Spark Version: {spark.version}")

INFO - Data Drift Detection - PySpark Version
INFO - Configuration loaded from: ../config/drift_config.yaml
INFO - Spark Version: 4.0.0


## Initialize Components

In [0]:
# Initialize drift detector
drift_detector = DriftDetector(config)
logger.info("âœ“ Drift detector initialized")

# Initialize report manager
report_manager = ReportManager(config)
logger.info("âœ“ Report manager initialized")

# Initialize data loader with Spark session
data_loader = DataLoader(config, spark=spark)
logger.info("âœ“ Data loader initialized with Spark session")

print("\nâœ“ All components initialized successfully")
print("\nReady to process tables using PySpark for large-scale drift detection")

INFO - âœ“ Drift detector initialized
INFO - âœ“ Report manager initialized
INFO - âœ“ Data loader initialized with Spark session

âœ“ All components initialized successfully

Ready to process tables using PySpark for large-scale drift detection


## Helper Functions for PySpark Operations

In [0]:
def get_table_statistics(spark_df, table_name):
    """
    Get basic statistics for a Spark DataFrame.
    
    Args:
        spark_df: Spark DataFrame
        table_name: Name of the table
    
    Returns:
        dict: Statistics dictionary
    """
    stats = {
        'table_name': table_name,
        'row_count': spark_df.count(),
        'column_count': len(spark_df.columns),
        'columns': spark_df.columns,
        'size_mb': None  # Can be calculated if needed
    }
    
    # Get data types distribution
    dtypes_dict = dict(spark_df.dtypes)
    numerical_cols = [col for col, dtype in dtypes_dict.items() 
                     if dtype in ['int', 'bigint', 'double', 'float', 'decimal']]
    categorical_cols = [col for col, dtype in dtypes_dict.items() 
                       if dtype in ['string', 'boolean']]
    
    stats['numerical_columns'] = numerical_cols
    stats['categorical_columns'] = categorical_cols
    stats['numerical_count'] = len(numerical_cols)
    stats['categorical_count'] = len(categorical_cols)
    
    return stats

def optimize_spark_df_for_drift(spark_df, sampling_config):
    """
    Optimize Spark DataFrame for drift detection by applying sampling if needed.
    
    Args:
        spark_df: Input Spark DataFrame
        sampling_config: Sampling configuration dictionary
    
    Returns:
        Optimized Spark DataFrame
    """
    if not sampling_config.get('enabled', False):
        return spark_df
    
    method = sampling_config.get('method', 'fraction')
    random_seed = sampling_config.get('random_seed', 42)
    
    if method == 'fraction':
        fraction = sampling_config.get('fraction', 0.1)
        sampled_df = spark_df.sample(withReplacement=False, fraction=fraction, seed=random_seed)
        logger.info(f"Applied sampling with fraction: {fraction}")
    else:  # fixed size
        total_count = spark_df.count()
        fixed_size = sampling_config.get('fixed_size', 10000)
        if total_count > fixed_size:
            fraction = fixed_size / total_count
            sampled_df = spark_df.sample(withReplacement=False, fraction=fraction, seed=random_seed)
            logger.info(f"Applied sampling for fixed size: {fixed_size} from {total_count} rows")
        else:
            sampled_df = spark_df
    
    return sampled_df

print("âœ“ Helper functions defined")

âœ“ Helper functions defined


## Process Each Table with PySpark

Loop through configured tables and detect drift using distributed processing

In [0]:
# Get list of tables from configuration
tables = config.get_tables()
results_summary = []

# Get sampling configuration
sampling_config = config.get_sampling_config()

print(f"\nProcessing {len(tables)} tables for drift detection using PySpark...")
print("="*80)

for idx, table_config in enumerate(tables, 1):
    table_name = table_config['name']
    columns = table_config.get('columns', 'all')
    
    logger.info(f"\n{'='*80}")
    logger.info(f"Processing table {idx}/{len(tables)}: {table_name}")
    logger.info(f"{'='*80}")
    
    try:
        # Load data versions using PySpark
        logger.info(f"Loading data for table: {table_name}")
        reference_data, current_data = data_loader.load_table_versions(
            table_name=table_name,
            use_spark=True  # Using PySpark for distributed loading
        )
        
        logger.info(f"Data loaded - Reference: {len(reference_data)} rows, Current: {len(current_data)} rows")
        
        # Note: Data is already converted to Pandas by DataLoader
        # This is necessary because Evidently AI works with Pandas DataFrames
        
        # Apply sampling if configured (already handled in DataLoader, but can be applied again if needed)
        if sampling_config.get('enabled', False):
            logger.info("Applying additional sampling for drift detection...")
            reference_data = drift_detector.apply_sampling(reference_data)
            current_data = drift_detector.apply_sampling(current_data)
        
        # Detect drift
        logger.info(f"Running drift detection with Evidently AI...")
        report, drift_summary = drift_detector.detect_drift(
            reference_data=reference_data,
            current_data=current_data,
            table_name=table_name,
            columns=columns if columns != 'all' else None,
        )
        
        # Log drift results
        if drift_summary['dataset_drift']:
            logger.warning(f"âš ï¸  DRIFT DETECTED in {table_name}")
            logger.warning(f"   Drifted columns ({drift_summary['num_drifted_columns']}): {drift_summary['drifted_columns']}")
        else:
            logger.info(f"âœ“ No significant drift detected in {table_name}")
        
        logger.info(f"   Total columns analyzed: {drift_summary['num_columns']}")
        logger.info(f"   Numerical columns: {drift_summary['num_numerical_columns']}")
        logger.info(f"   Categorical columns: {drift_summary['num_categorical_columns']}")
        logger.info(f"   Drifted columns: {drift_summary['num_drifted_columns']}")
        logger.info(f"   Drift share: {drift_summary['drift_share']:.2%}")
        
        # Save reports
        logger.info(f"Saving reports for {table_name}...")
        saved_paths = report_manager.save_reports(
            report=report,
            drift_summary=drift_summary,
            table_name=table_name
        )
        
        for report_type, path in saved_paths.items():
            logger.info(f"   {report_type}: {path}")
        
        # Add to summary
        results_summary.append({
            'table_name': table_name,
            'total_columns': drift_summary['num_columns'],
            'numerical_columns': drift_summary['num_numerical_columns'],
            'categorical_columns': drift_summary['num_categorical_columns'],
            'drifted_columns': drift_summary['num_drifted_columns'],
            'drift_share': drift_summary['drift_share'],
            'dataset_drift': drift_summary['dataset_drift'],
            'drifted_column_names': ', '.join(drift_summary['drifted_columns']),
            'report_paths': saved_paths
        })
        
        print(f"\nâœ“ Completed: {table_name}")
        if drift_summary['dataset_drift']:
            print(f"  âš ï¸  DRIFT DETECTED")
            print(f"     - Drifted columns: {drift_summary['num_drifted_columns']}/{drift_summary['num_columns']}")
            print(f"     - Drift share: {drift_summary['drift_share']:.2%}")
            print(f"     - Columns: {', '.join(drift_summary['drifted_columns'][:5])}" + 
                  ("..." if len(drift_summary['drifted_columns']) > 5 else ""))
        else:
            print(f"  âœ“ No significant drift detected")
    
    except Exception as e:
        logger.error(f"âœ— Error processing table {table_name}: {e}", exc_info=True)
        print(f"\nâœ— Error processing {table_name}: {e}")
        results_summary.append({
            'table_name': table_name,
            'error': str(e)
        })
    
    print("-"*80)

logger.info("\n" + "="*80)
logger.info("PySpark drift detection completed for all tables")
logger.info("="*80)


Processing 3 tables for drift detection using PySpark...
INFO - 
INFO - Processing table 1/3: customer_data
INFO - Loading data for table: customer_data
Comparing UC versions: 2 (reference) vs 3 (current)
INFO - Data loaded - Reference: 10000 rows, Current: 10000 rows
INFO - Running drift detection with Evidently AI...
INFO -    Total columns analyzed: 9
INFO -    Numerical columns: 4
INFO -    Categorical columns: 5
INFO -    Drifted columns: 6
INFO -    Drift share: 75.00%
INFO - Saving reports for customer_data...
INFO -    local_html: /Workspace/Users/ashu.009kamboj@gmail.com/data-drift-evidently-ai/reports/html/customer_data_20260120_165722_drift_report.html
INFO -    local_json: /Workspace/Users/ashu.009kamboj@gmail.com/data-drift-evidently-ai/reports/json/customer_data_20260120_165722_drift_report.json

âœ“ Completed: customer_data
  âš ï¸  DRIFT DETECTED
     - Drifted columns: 6/9
     - Drift share: 75.00%
     - Columns: income, age, credit_score, data_version, account_typ

## Detailed Summary Report

Display comprehensive drift detection results

In [0]:
# Create summary DataFrame
summary_df = pd.DataFrame(results_summary)

print("\n" + "="*80)
print("DRIFT DETECTION SUMMARY (PySpark Version)")
print("="*80)

if 'error' in summary_df.columns:
    # Show tables with errors
    error_tables = summary_df[summary_df['error'].notna()]
    if not error_tables.empty:
        print("\nâš ï¸  Tables with errors:")
        for _, row in error_tables.iterrows():
            print(f"  - {row['table_name']}: {row['error']}")
        print()
else:
    # Ensure 'error' column exists (add it if missing)
    summary_df['error'] = None  # or pd.NA
    
# Show tables without errors
success_df = summary_df[~summary_df['table_name'].isin(summary_df[summary_df.get('error', pd.Series()).notna()]['table_name'])]

if not success_df.empty:
    print(f"\nâœ“ Successfully processed {len(success_df)} tables using PySpark\n")
    
    # Display summary table
    display_columns = ['table_name', 'total_columns', 'numerical_columns', 
                      'categorical_columns', 'drifted_columns', 'drift_share', 'dataset_drift']
    display_df = success_df[display_columns].copy()
    display_df['drift_share'] = display_df['drift_share'].apply(lambda x: f"{x:.2%}")
    display_df['drift_status'] = display_df['dataset_drift'].apply(lambda x: 'âš ï¸ DRIFT' if x else 'âœ“ OK')
    display_df = display_df.drop('dataset_drift', axis=1)
    display_df.columns = ['Table', 'Total Cols', 'Numerical', 'Categorical', 
                         'Drifted', 'Drift %', 'Status']
    
    print(display_df.to_string(index=False))
    
    # Show drifted columns for tables with drift
    drifted_tables = success_df[success_df['dataset_drift'] == True]
    if not drifted_tables.empty:
        print(f"\n{'='*80}")
        print("Drifted Column Details:")
        print(f"{'='*80}")
        for _, row in drifted_tables.iterrows():
            print(f"\n{row['table_name']}:")
            print(f"  Drifted columns: {row['drifted_column_names']}")
    
    # Overall statistics
    total_drifted = (display_df['Status'] == 'âš ï¸ DRIFT').sum()
    total_ok = (display_df['Status'] == 'âœ“ OK').sum()
    total_cols = display_df['Total Cols'].sum()
    total_drifted_cols = success_df['drifted_columns'].sum()
    
    print(f"\n{'='*80}")
    print(f"Overall Statistics:")
    print(f"  - Total tables processed: {len(display_df)}")
    print(f"  - Tables with drift: {total_drifted}")
    print(f"  - Tables without drift: {total_ok}")
    print(f"  - Total columns analyzed: {total_cols}")
    print(f"  - Total drifted columns: {total_drifted_cols}")
    if total_cols > 0:
        print(f"  - Overall drift rate: {(total_drifted_cols/total_cols)*100:.2f}%")
    print(f"{'='*80}\n")


DRIFT DETECTION SUMMARY (PySpark Version)

âœ“ Successfully processed 3 tables using PySpark

        Table  Total Cols  Numerical  Categorical  Drifted Drift %   Status
customer_data           9          4            5        6  75.00% âš ï¸ DRIFT
product_sales           4          2            2        3  75.00% âš ï¸ DRIFT
user_behavior           9          3            6        5  62.50% âš ï¸ DRIFT

Drifted Column Details:

customer_data:
  Drifted columns: income, age, credit_score, data_version, account_type, region

product_sales:
  Drifted columns: revenue, price, product_category

user_behavior:
  Drifted columns: session_duration_minutes, pages_viewed, device_type, data_version, traffic_source

Overall Statistics:
  - Total tables processed: 3
  - Tables with drift: 3
  - Tables without drift: 0
  - Total columns analyzed: 22
  - Total drifted columns: 14
  - Overall drift rate: 63.64%



## Access Reports

Information on accessing generated reports

In [0]:
print("\n" + "="*80)
print("Report Locations:")
print("="*80)

# Local reports are saved by default for testing
print(f"\n📁 Local Reports:")
print(f"   Path: reports")
print(f"   - HTML reports: reports/html/")
print(f"   - JSON reports: reports/json/")

if config.is_adls_output_enabled():
    adls_config = config.get_adls_config()
    print(f"\nâ˜ï¸  ADLS Reports:")
    print(f"   Container: {adls_config['container']}")
    print(f"   Base Path: {adls_config['base_path']}")
    print(f"   Storage Account: {adls_config['storage_account']}")

print("\n" + "="*80)
print("\nTo view reports:")
print("  1. HTML Reports (local): Open in web browser for interactive visualization")
print("  2. JSON Reports (local): Use for programmatic analysis or monitoring integration")
print("  3. ADLS (if enabled): Access via abfss path in your storage account")
print("="*80)


Report Locations:

📁 Local Reports:
   Path: reports
   - HTML reports: reports/html/
   - JSON reports: reports/json/


To view reports:
  1. HTML Reports (local): Open in web browser for interactive visualization
  2. JSON Reports (local): Use for programmatic analysis or monitoring integration
  3. ADLS (if enabled): Access via abfss path in your storage account


## Next Steps

### PySpark Advantages
- **Scalability**: Handles large datasets efficiently using distributed processing
- **Performance**: Leverages Spark's in-memory computation and optimization
- **Integration**: Seamless integration with Databricks Unity Catalog
- **Resource Management**: Better memory management for large-scale operations

### Key Differences from Python Version
1. **Data Loading**: Uses Spark for distributed data loading
2. **Processing**: Leverages Spark's parallel processing capabilities
3. **Memory**: More efficient memory usage for large datasets
4. **Sampling**: Can apply distributed sampling before conversion to Pandas

### Next Steps
1. **Review Reports**: Analyze detailed drift reports for affected tables
2. **Investigate Causes**: Identify root causes of detected drift
3. **Tune Configuration**: Adjust thresholds and tests based on results
4. **Schedule Jobs**: Set up Databricks jobs for automated monitoring
5. **Set Alerts**: Configure alerting based on drift detection results
6. **Scale Up**: Increase cluster size for larger datasets if needed

### Production Tips
- Use Databricks job scheduling for regular drift monitoring
- Enable ADLS output for centralized report storage
- Configure appropriate sampling for very large tables (>100M rows)
- Use Delta Lake time travel for precise version comparison
- Set up downstream alerts and notifications
- Monitor Spark job performance and optimize as needed