In [None]:
import pandas as pd
df = pd.read_csv(r'C:\Users\Minfy.DESKTOP-3E50D5N\Desktop\final_capstone\raw_data\Lead Scoring.csv')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9240 entries, 0 to 9239
Data columns (total 37 columns):
 #   Column                                         Non-Null Count  Dtype  
---  ------                                         --------------  -----  
 0   Prospect ID                                    9240 non-null   object 
 1   Lead Number                                    9240 non-null   int64  
 2   Lead Origin                                    9240 non-null   object 
 3   Lead Source                                    9204 non-null   object 
 4   Do Not Email                                   9240 non-null   object 
 5   Do Not Call                                    9240 non-null   object 
 6   Converted                                      9240 non-null   int64  
 7   TotalVisits                                    9103 non-null   float64
 8   Total Time Spent on Website                    9240 non-null   int64  
 9   Page Views Per Visit                           9103 

In [1]:
import pandas as pd
import numpy as np
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_suite import MetricSuite
from evidently.metrics import (
    DataDriftTable, 
    DatasetDriftMetric, 
    ColumnDriftMetric,
    DatasetSummaryMetric,
    ColumnSummaryMetric
)
from evidently.test_suite import TestSuite
from evidently.tests import (
    TestNumberOfDriftedColumns,
    TestColumnDrift,
    TestShareOfDriftedColumns
)
from typing import Dict, List, Optional, Any
import warnings
warnings.filterwarnings('ignore')

class EvidentlyDataDriftDetector:
    """
    A comprehensive data drift detection system using Evidently AI library.
    Compares distributions between reference (main) and current datasets to identify data drift.
    """
    
    def __init__(self, 
                 drift_threshold: float = 0.5,
                 confidence_level: float = 0.95,
                 categorical_threshold: float = 0.2):
        """
        Initialize the Evidently Data Drift Detector.
        
        Args:
            drift_threshold (float): Threshold for detecting dataset-level drift (0-1)
            confidence_level (float): Confidence level for statistical tests
            categorical_threshold (float): Threshold for categorical drift detection
        """
        self.drift_threshold = drift_threshold
        self.confidence_level = confidence_level
        self.categorical_threshold = categorical_threshold
        self.column_mapping = None
        self.drift_report = None
        self.test_suite = None
        
    def setup_column_mapping(self, 
                           numerical_features: Optional[List[str]] = None,
                           categorical_features: Optional[List[str]] = None,
                           target_column: Optional[str] = None,
                           prediction_column: Optional[str] = None) -> None:
        """
        Set up column mapping for Evidently to understand data types.
        
        Args:
            numerical_features (List[str], optional): List of numerical column names
            categorical_features (List[str], optional): List of categorical column names
            target_column (str, optional): Name of target column
            prediction_column (str, optional): Name of prediction column
        """
        self.column_mapping = ColumnMapping(
            numerical_features=numerical_features,
            categorical_features=categorical_features,
            target=target_column,
            prediction=prediction_column
        )
        print("Column mapping configured successfully")
    
    def detect_drift(self, 
                    reference_data: pd.DataFrame, 
                    current_data: pd.DataFrame,
                    save_report: bool = True,
                    report_path: str = "drift_report.html") -> Dict[str, Any]:
        """
        Main method to detect data drift between reference and current datasets.
        
        Args:
            reference_data (pd.DataFrame): Reference/baseline dataset
            current_data (pd.DataFrame): Current dataset to compare against reference
            save_report (bool): Whether to save HTML report
            report_path (str): Path to save the HTML report
            
        Returns:
            Dict containing comprehensive drift detection results
        """
        # Validate input data
        self._validate_data(reference_data, current_data)
        
        # Auto-detect column types if not provided
        if self.column_mapping is None:
            self._auto_detect_column_types(reference_data)
        
        # Generate drift report
        self._generate_drift_report(reference_data, current_data)
        
        # Run drift tests
        self._run_drift_tests(reference_data, current_data)
        
        # Extract and return results
        results = self._extract_drift_results()
        
        # Save HTML report if requested
        if save_report:
            self.save_html_report(report_path)
            print(f"Drift report saved to: {report_path}")
        
        return results
    
    def _validate_data(self, reference_data: pd.DataFrame, current_data: pd.DataFrame) -> None:
        """
        Validate input datasets for basic requirements.
        
        Args:
            reference_data (pd.DataFrame): Reference dataset
            current_data (pd.DataFrame): Current dataset
            
        Raises:
            ValueError: If datasets don't meet basic requirements
        """
        if reference_data.empty or current_data.empty:
            raise ValueError("Both datasets must contain data")
        
        common_columns = set(reference_data.columns) & set(current_data.columns)
        if len(common_columns) == 0:
            raise ValueError("Datasets must have at least one common column")
        
        print(f"✓ Validation passed")
        print(f"  - Common columns: {len(common_columns)}")
        print(f"  - Reference data shape: {reference_data.shape}")
        print(f"  - Current data shape: {current_data.shape}")
    
    def _auto_detect_column_types(self, data: pd.DataFrame) -> None:
        """
        Automatically detect column types for Evidently mapping.
        
        Args:
            data (pd.DataFrame): Dataset to analyze for column types
        """
        numerical_features = []
        categorical_features = []
        
        for column in data.columns:
            if pd.api.types.is_numeric_dtype(data[column]):
                numerical_features.append(column)
            else:
                categorical_features.append(column)
        
        self.column_mapping = ColumnMapping(
            numerical_features=numerical_features if numerical_features else None,
            categorical_features=categorical_features if categorical_features else None
        )
        
        print(f"✓ Auto-detected column types:")
        print(f"  - Numerical: {len(numerical_features)} columns")
        print(f"  - Categorical: {len(categorical_features)} columns")
    
    def _generate_drift_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame) -> None:
        """
        Generate comprehensive drift report using Evidently metrics.
        
        Args:
            reference_data (pd.DataFrame): Reference dataset
            current_data (pd.DataFrame): Current dataset
        """
        # Create drift report with multiple metrics
        self.drift_report = Report(metrics=[
            DatasetDriftMetric(),
            DataDriftTable(),
            DatasetSummaryMetric(),
        ])
        
        # Add individual column drift metrics for each column
        for column in reference_data.columns:
            if column in current_data.columns:
                self.drift_report.metrics.append(ColumnDriftMetric(column_name=column))
                self.drift_report.metrics.append(ColumnSummaryMetric(column_name=column))
        
        # Run the report
        self.drift_report.run(
            reference_data=reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        
        print("✓ Drift report generated successfully")
    
    def _run_drift_tests(self, reference_data: pd.DataFrame, current_data: pd.DataFrame) -> None:
        """
        Run drift tests to get pass/fail results.
        
        Args:
            reference_data (pd.DataFrame): Reference dataset
            current_data (pd.DataFrame): Current dataset
        """
        # Create test suite with drift tests
        tests = [
            TestNumberOfDriftedColumns(),
            TestShareOfDriftedColumns(lt=self.drift_threshold),
        ]
        
        # Add individual column drift tests
        for column in reference_data.columns:
            if column in current_data.columns:
                tests.append(TestColumnDrift(column_name=column))
        
        self.test_suite = TestSuite(tests=tests)
        
        # Run the tests
        self.test_suite.run(
            reference_data=reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        
        print("✓ Drift tests completed")
    
    def _extract_drift_results(self) -> Dict[str, Any]:
        """
        Extract drift detection results from Evidently report and tests.
        
        Returns:
            Dict containing structured drift results
        """
        results = {
            'dataset_drift': {},
            'column_drift': {},
            'summary': {},
            'test_results': {}
        }
        
        # Extract dataset-level drift
        dataset_drift_metric = self.drift_report.get_metric("DatasetDriftMetric")
        if dataset_drift_metric:
            results['dataset_drift'] = {
                'drift_detected': dataset_drift_metric.result.drift_detected,
                'drift_score': dataset_drift_metric.result.drift_score,
                'number_of_drifted_columns': dataset_drift_metric.result.number_of_drifted_columns,
                'total_columns': len(dataset_drift_metric.result.drift_by_columns)
            }
        
        # Extract column-level drift
        data_drift_table = self.drift_report.get_metric("DataDriftTable")
        if data_drift_table:
            for column_name, drift_info in data_drift_table.result.drift_by_columns.items():
                results['column_drift'][column_name] = {
                    'drift_detected': drift_info.drift_detected,
                    'drift_score': drift_info.drift_score,
                    'stattest_name': drift_info.stattest_name,
                    'threshold': drift_info.threshold,
                    'p_value': getattr(drift_info, 'p_value', None)
                }
        
        # Extract dataset summary
        dataset_summary = self.drift_report.get_metric("DatasetSummaryMetric")
        if dataset_summary:
            results['summary'] = {
                'reference_data_rows': dataset_summary.result.reference.number_of_rows,
                'current_data_rows': dataset_summary.result.current.number_of_rows,
                'reference_data_columns': dataset_summary.result.reference.number_of_columns,
                'current_data_columns': dataset_summary.result.current.number_of_columns,
            }
        
        # Extract test results
        if self.test_suite:
            test_results = self.test_suite.as_dict()
            results['test_results'] = {
                'tests_passed': sum(1 for test in test_results['tests'] if test['status'] == 'SUCCESS'),
                'tests_failed': sum(1 for test in test_results['tests'] if test['status'] == 'FAIL'),
                'total_tests': len(test_results['tests']),
                'detailed_results': test_results['tests']
            }
        
        return results
    
    def save_html_report(self, file_path: str = "drift_report.html") -> None:
        """
        Save the drift report as an HTML file.
        
        Args:
            file_path (str): Path where to save the HTML report
        """
        if self.drift_report is None:
            raise ValueError("No drift report available. Run detect_drift() first.")
        
        self.drift_report.save_html(file_path)
        print(f"✓ HTML report saved to: {file_path}")
    
    def print_drift_summary(self, results: Dict[str, Any]) -> None:
        """
        Print a formatted summary of drift detection results.
        
        Args:
            results (Dict): Results from detect_drift method
        """
        print("\n" + "="*60)
        print("                 DRIFT DETECTION SUMMARY")
        print("="*60)
        
        # Dataset-level summary
        if 'dataset_drift' in results:
            dataset_drift = results['dataset_drift']
            print(f"\n📊 DATASET-LEVEL DRIFT:")
            print(f"  • Drift Detected: {'🚨 YES' if dataset_drift.get('drift_detected', False) else '✅ NO'}")
            print(f"  • Drift Score: {dataset_drift.get('drift_score', 'N/A'):.4f}")
            print(f"  • Drifted Columns: {dataset_drift.get('number_of_drifted_columns', 0)}/{dataset_drift.get('total_columns', 0)}")
        
        # Column-level summary
        if 'column_drift' in results:
            print(f"\n📋 COLUMN-LEVEL DRIFT:")
            drifted_columns = []
            stable_columns = []
            
            for column, drift_info in results['column_drift'].items():
                if drift_info.get('drift_detected', False):
                    drifted_columns.append(column)
                else:
                    stable_columns.append(column)
            
            print(f"  • Drifted Columns ({len(drifted_columns)}): {', '.join(drifted_columns) if drifted_columns else 'None'}")
            print(f"  • Stable Columns ({len(stable_columns)}): {', '.join(stable_columns) if stable_columns else 'None'}")
        
        # Test results summary
        if 'test_results' in results:
            test_results = results['test_results']
            print(f"\n🧪 TEST RESULTS:")
            print(f"  • Tests Passed: {test_results.get('tests_passed', 0)}")
            print(f"  • Tests Failed: {test_results.get('tests_failed', 0)}")
            print(f"  • Total Tests: {test_results.get('total_tests', 0)}")
        
        print("\n" + "="*60)


# Example usage and data loading section
def load_sample_data():
    """
    Generate sample datasets for demonstration.
    Replace this with your actual data loading logic.
    """
    print("\n📁 SAMPLE DATA GENERATION")
    print("Replace this section with your actual data loading code")
    
    # Generate reference dataset
    np.random.seed(42)
    reference_data = pd.DataFrame({
        'feature1': np.random.normal(0, 1, 1000),
        'feature2': np.random.normal(5, 2, 1000),
        'feature3': np.random.choice(['A', 'B', 'C'], 1000),
        'feature4': np.random.exponential(2, 1000),
        'target': np.random.choice([0, 1], 1000)
    })
    
    # Generate current dataset with some drift
    np.random.seed(123)
    current_data = pd.DataFrame({
        'feature1': np.random.normal(0.5, 1.2, 800),  # Slight drift in mean and std
        'feature2': np.random.normal(5, 2, 800),      # No drift
        'feature3': np.random.choice(['A', 'B', 'C', 'D'], 800),  # New category
        'feature4': np.random.exponential(1.5, 800),  # Drift in distribution
        'target': np.random.choice([0, 1], 800)
    })
    
    return reference_data, current_data


def main():
    """
    Main function demonstrating the data drift detection system.
    """
    print("🚀 STARTING DATA DRIFT DETECTION")
    
    # ==========================================
    # STEP 1: LOAD YOUR DATA HERE
    # ==========================================
    print("\n" + "="*50)
    print("STEP 1: DATA LOADING")
    print("="*50)
    
    # TODO: Replace with your actual data loading
    reference_data, current_data = load_sample_data()
    
    # Example of loading from files:
    # reference_data = pd.read_csv('path/to/your/reference_data.csv')
    # current_data = pd.read_csv('path/to/your/current_data.csv')
    
    print(f"✓ Reference data loaded: {reference_data.shape}")
    print(f"✓ Current data loaded: {current_data.shape}")
    
    # ==========================================
    # STEP 2: INITIALIZE DRIFT DETECTOR
    # ==========================================
    print("\n" + "="*50)
    print("STEP 2: INITIALIZING DRIFT DETECTOR")
    print("="*50)
    
    # Initialize the detector
    detector = EvidentlyDataDriftDetector(
        drift_threshold=0.5,  # Adjust based on your needs
        confidence_level=0.95,
        categorical_threshold=0.2
    )
    
    # Optional: Set up column mapping if you want to specify column types
    # detector.setup_column_mapping(
    #     numerical_features=['feature1', 'feature2', 'feature4'],
    #     categorical_features=['feature3'],
    #     target_column='target'
    # )
    
    # ==========================================
    # STEP 3: DETECT DRIFT
    # ==========================================
    print("\n" + "="*50)
    print("STEP 3: DETECTING DRIFT")
    print("="*50)
    
    # Run drift detection
    results = detector.detect_drift(
        reference_data=reference_data,
        current_data=current_data,
        save_report=True,
        report_path="data_drift_report.html"
    )
    
    # ==========================================
    # STEP 4: DISPLAY RESULTS
    # ==========================================
    print("\n" + "="*50)
    print("STEP 4: RESULTS")
    print("="*50)
    
    # Print summary
    detector.print_drift_summary(results)
    
    # Detailed results are available in the results dictionary
    print(f"\n📄 Detailed results available in 'results' dictionary")
    print(f"📊 Interactive HTML report saved as 'data_drift_report.html'")
    
    return results, detector


if __name__ == "__main__":
    # Run the drift detection system
    results, detector = main()
    
    # Access specific results
    print("\n🔍 ACCESSING SPECIFIC RESULTS:")
    print(f"Dataset drift detected: {results['dataset_drift'].get('drift_detected', 'N/A')}")
    print(f"Number of drifted columns: {results['dataset_drift'].get('number_of_drifted_columns', 'N/A')}")
    
    # You can also access individual column results
    for column_name, column_drift in results['column_drift'].items():
        if column_drift.get('drift_detected', False):
            print(f"Column '{column_name}' has drift (score: {column_drift.get('drift_score', 'N/A'):.4f})")

ImportError: cannot import name 'ColumnMapping' from 'evidently' (c:\Users\Minfy.DESKTOP-3E50D5N\Desktop\final_capstone\venv\Lib\site-packages\evidently\__init__.py)