In [None]:
#corrected
# Part 1: Detecting Data Drift in AI/ML Models
# Objective: Understand data drift, how it affects machine learning models, and techniques tomonitor it.
# Task 1: Understanding Data Drift: Study a historical dataset used in training a simple linear regression model and
# compare it with recent unseen data to detect drift.
# Task 2: Monitoring Distribution Changes: Write the code to identify features that exhibit statistical distribution differences.
# Task 3: Visualizing Data Drift: Use visualization techniques to illustrate data drift.
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
np.random.seed(42)
X_train = np.random.rand(100, 1) * 10
y_train = 2.5 * X_train.squeeze() + np.random.randn(100) * 2
model = LinearRegression()
model.fit(X_train, y_train)
X_test_drifted = np.random.rand(100, 1) * 15  
y_test_drifted = 2.5 * X_test_drifted.squeeze() + np.random.randn(100) * 2
y_pred_drifted = model.predict(X_test_drifted)
mse_drifted = mean_squared_error(y_test_drifted, y_pred_drifted)
print(f"Mean Squared Error on Drifted Data: {mse_drifted:.2f}")
from scipy.stats import ks_2samp
ks_stat, ks_p_value = ks_2samp(X_train.squeeze(), X_test_drifted.squeeze())
print(f"KS Statistic: {ks_stat:.4f}, P-Value: {ks_p_value:.4f}")
import seaborn as sns
plt.figure(figsize=(10, 6))
sns.histplot(X_train.squeeze(), color='blue', label='Training Data', kde=True, stat='density')
sns.histplot(X_test_drifted.squeeze(), color='red', label='Drifted Test Data', kde=True, stat='density')
plt.title('Feature Distribution Comparison')
plt.xlabel('Feature Value')
plt.ylabel('Density')
plt.legend()
plt.show()

In [None]:
# Part 2: Automating Data Quality Checks
# Objective: Use Python and data quality frameworks to automate validation.

# Task 1: Setting Up Automated Validation with Python

# Task 2: Introduction to Great Expectations: Install the great_expectations package and set up a basic project.

# Task 3: Creating Expectations with Great Expectations: Use Great Expectations to define data validation expectations for a dataset.

# data_validator.py
import pandas as pd
import logging
import json
from datetime import datetime
import os

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class DataValidator:
    """Simple data validator for CSV files"""
    
    def __init__(self, filepath):
        """Initialize with path to CSV file"""
        self.filepath = filepath
        self.df = None
        self.validation_results = {
            "timestamp": datetime.now().isoformat(),
            "file": os.path.basename(filepath),
            "checks": [],
            "overall_status": "PASSED"
        }
    
    def load_data(self):
        """Load the CSV file"""
        try:
            self.df = pd.read_csv(self.filepath)
            logger.info(f"Successfully loaded {self.filepath}")
            return True
        except Exception as e:
            logger.error(f"Failed to load {self.filepath}: {str(e)}")
            self.validation_results["overall_status"] = "FAILED"
            self.validation_results["checks"].append({
                "name": "file_loading",
                "status": "FAILED",
                "message": f"Could not load file: {str(e)}"
            })
            return False
    
    def check_missing_values(self, column, threshold=0.0):
        """Check if missing values in a column are below threshold"""
        check_name = f"missing_values_{column}"
        
        if self.df is None:
            logger.error("Data not loaded. Call load_data() first.")
            return False
        
        if column not in self.df.columns:
            logger.warning(f"Column {column} not found in dataset")
            self.validation_results["checks"].append({
                "name": check_name,
                "status": "FAILED",
                "message": f"Column {column} not found in dataset"
            })
            self.validation_results["overall_status"] = "FAILED"
            return False
        
        missing_rate = self.df[column].isna().mean()
        passed = missing_rate <= threshold
        
        self.validation_results["checks"].append({
            "name": check_name,
            "status": "PASSED" if passed else "FAILED",
            "message": f"Missing rate: {missing_rate:.2%} (threshold: {threshold:.2%})",
            "details": {
                "missing_count": int(self.df[column].isna().sum()),
                "total_count": len(self.df),
                "missing_rate": float(missing_rate)
            }
        })
        
        if not passed:
            self.validation_results["overall_status"] = "FAILED"
        
        logger.info(f"Check {check_name}: {'PASSED' if passed else 'FAILED'}")
        return passed
    
    def check_value_range(self, column, min_val=None, max_val=None):
        """Check if values in a column are within specified range"""
        check_name = f"value_range_{column}"
        
        if self.df is None:
            logger.error("Data not loaded. Call load_data() first.")
            return False
        
        if column not in self.df.columns:
            logger.warning(f"Column {column} not found in dataset")
            self.validation_results["checks"].append({
                "name": check_name,
                "status": "FAILED",
                "message": f"Column {column} not found in dataset"
            })
            self.validation_results["overall_status"] = "FAILED"
            return False
        
        # Get non-null values
        values = self.df[column].dropna()
        
        # Check minimum value
        min_check_passed = True
        if min_val is not None:
            min_check_passed = values.min() >= min_val
        
        # Check maximum value
        max_check_passed = True
        if max_val is not None:
            max_check_passed = values.max() <= max_val
        
        passed = min_check_passed and max_check_passed
        
        self.validation_results["checks"].append({
            "name": check_name,
            "status": "PASSED" if passed else "FAILED",
            "message": f"Range check: {'PASSED' if passed else 'FAILED'}",
            "details": {
                "min_value": float(values.min()),
                "max_value": float(values.max()),
                "min_threshold": min_val,
                "max_threshold": max_val
            }
        })
        
        if not passed:
            self.validation_results["overall_status"] = "FAILED"
        
        logger.info(f"Check {check_name}: {'PASSED' if passed else 'FAILED'}")
        return passed
    
    def check_unique_values(self, column, expected_unique_count=None, min_unique_rate=None):
        """Check uniqueness of values in a column"""
        check_name = f"unique_values_{column}"
        
        if self.df is None:
            logger.error("Data not loaded. Call load_data() first.")
            return False
        
        if column not in self.df.columns:
            logger.warning(f"Column {column} not found in dataset")
            self.validation_results["checks"].append({
                "name": check_name,
                "status": "FAILED",
                "message": f"Column {column} not found in dataset"
            })
            self.validation_results["overall_status"] = "FAILED"
            return False
        
        # Get non-null values
        values = self.df[column].dropna()
        unique_count = values.nunique()
        unique_rate = unique_count / len(values) if len(values) > 0 else 0
        
        # Check if uniqueness meets criteria
        count_check_passed = True
        if expected_unique_count is not None:
            count_check_passed = unique_count == expected_unique_count
        
        rate_check_passed = True
        if min_unique_rate is not None:
            rate_check_passed = unique_rate >= min_unique_rate
        
        passed = count_check_passed and rate_check_passed
        
        self.validation_results["checks"].append({
            "name": check_name,
            "status": "PASSED" if passed else "FAILED",
            "message": f"Uniqueness check: {'PASSED' if passed else 'FAILED'}",
            "details": {
                "unique_count": int(unique_count),
                "total_count": int(len(values)),
                "unique_rate": float(unique_rate),
                "expected_unique_count": expected_unique_count,
                "min_unique_rate": min_unique_rate
            }
        })
        
        if not passed:
            self.validation_results["overall_status"] = "FAILED"
        
        logger.info(f"Check {check_name}: {'PASSED' if passed else 'FAILED'}")
        return passed
    
    def save_results(self, output_file="validation_results.json"):
        """Save validation results to a JSON file"""
        try:
            with open(output_file, 'w') as f:
                json.dump(self.validation_results, f, indent=2)
            logger.info(f"Validation results saved to {output_file}")
            return True
        except Exception as e:
            logger.error(f"Failed to save validation results: {str(e)}")
            return False
    
    def run_all_checks(self, checks_config):
        """Run all checks specified in config"""
        if not self.load_data():
            return False
        
        for check in checks_config:
            check_type = check["type"]
            
            if check_type == "missing_values":
                self.check_missing_values(
                    column=check["column"],
                    threshold=check.get("threshold", 0.0)
                )
            elif check_type == "value_range":
                self.check_value_range(
                    column=check["column"],
                    min_val=check.get("min_val"),
                    max_val=check.get("max_val")
                )
            elif check_type == "unique_values":
                self.check_unique_values(
                    column=check["column"],
                    expected_unique_count=check.get("expected_unique_count"),
                    min_unique_rate=check.get("min_unique_rate")
                )
        
        logger.info(f"All checks completed. Overall status: {self.validation_results['overall_status']}")
        return self.validation_results["overall_status"] == "PASSED"
    
    