In [1]:
import numpy as np
import pandas as pd
from scipy import stats
import warnings

# Suppress potential warnings from SciPy KS test with identical data
warnings.filterwarnings("ignore", message="p-value may be inaccurate")

# --- Introduction ---
# This script demonstrates how to monitor feature distribution changes
# using Python to detect potential data drift.
# We will use both basic statistics and the Kolmogorov-Smirnov (KS) test.
#
# We will:
# 1. Create sample "original" and "drifted" DataFrames.
# 2. Define a function to compare statistics (mean, std dev) for numerical features.
# 3. Define a function to perform the KS test for numerical features.
# 4. Define a function to compare frequencies for categorical features (using Chi-squared test - conceptual here, KS is for continuous).
# 5. Iterate through features and apply relevant checks.
# 6. Report features exhibiting potential drift based on thresholds/p-values.

# --- 1. Create Sample DataFrames ---
# Create a sample original DataFrame
np.random.seed(42)
original_data = {
    'numerical_feature_1': np.random.normal(loc=10, scale=2, size=500),
    'numerical_feature_2': np.random.beta(a=5, b=1, size=500) * 100, # Beta distribution
    'categorical_feature_1': np.random.choice(['A', 'B', 'C'], size=500, p=[0.5, 0.3, 0.2]),
    'categorical_feature_2': np.random.choice(['X', 'Y'], size=500, p=[0.7, 0.3]),
    'target': np.random.rand(500) # Example target, not used for feature drift
}
original_df = pd.DataFrame(original_data)

print("--- Original DataFrame Head ---")
print(original_df.head())
print("\n")

# Create a sample drifted DataFrame - introduce changes
np.random.seed(100) # Different seed for drifted data
drifted_data = {
    'numerical_feature_1': np.random.normal(loc=11.5, scale=2.5, size=500), # Shifted mean, increased std dev
    'numerical_feature_2': np.random.beta(a=3, b=1.5, size=500) * 100, # Changed beta parameters
    'categorical_feature_1': np.random.choice(['A', 'B', 'C'], size=500, p=[0.3, 0.4, 0.3]), # Changed proportions
    'categorical_feature_2': np.random.choice(['X', 'Y'], size=500, p=[0.6, 0.4]), # Changed proportions
    'target': np.random.rand(500) # Example target
}
drifted_df = pd.DataFrame(drifted_data)

print("--- Drifted DataFrame Head ---")
print(drifted_df.head())
print("\n")

# --- 2. Function to Compare Basic Statistics ---
def compare_basic_stats(original_series, drifted_series, threshold_mean=0.5, threshold_std=0.3):
    """
    Compares mean and standard deviation between two numerical series.

    Args:
        original_series (pd.Series): The original data series.
        drifted_series (pd.Series): The drifted data series.
        threshold_mean (float): Absolute difference in mean to flag as potential drift.
        threshold_std (float): Absolute difference in std dev to flag as potential drift.

    Returns:
        dict: A dictionary reporting the differences and if thresholds are exceeded.
    """
    mean_orig = original_series.mean()
    std_orig = original_series.std()
    mean_drift = drifted_series.mean()
    std_drift = drifted_series.std()

    mean_diff = abs(mean_drift - mean_orig)
    std_diff = abs(std_drift - std_orig)

    report = {
        'mean_original': mean_orig,
        'std_original': std_orig,
        'mean_drifted': mean_drift,
        'std_drifted': std_drift,
        'mean_difference': mean_diff,
        'std_difference': std_diff,
        'mean_drift_detected': mean_diff > threshold_mean,
        'std_drift_detected': std_diff > threshold_std
    }
    return report

# --- 3. Function to Perform KS Test ---
def perform_ks_test(original_series, drifted_series, alpha=0.05):
    """
    Performs the two-sample Kolmogorov-Smirnov test.

    Args:
        original_series (pd.Series): The original data series.
        drifted_series (pd.Series): The drifted data series.
        alpha (float): Significance level for the test.

    Returns:
        dict: A dictionary reporting the KS statistic, p-value, and drift detection result.
    """
    # Ensure no NaNs as KS test doesn't handle them
    original_clean = original_series.dropna()
    drifted_clean = drifted_series.dropna()

    if len(original_clean) < 2 or len(drifted_clean) < 2:
         return {'error': 'Not enough non-null data for KS test'}


    ks_statistic, p_value = stats.ks_2samp(original_clean, drifted_clean)

    report = {
        'ks_statistic': ks_statistic,
        'p_value': p_value,
        'drift_detected_ks': p_value < alpha,
        'alpha': alpha
    }
    return report

# --- 4. Function to Compare Frequencies (Conceptual for Categorical) ---
# Note: KS test is for continuous data. For categorical data,
# Chi-squared test is more appropriate. This function is a placeholder
# and would require implementing or using scipy.stats.chi2_contingency.
def compare_categorical_frequencies(original_series, drifted_series, alpha=0.05):
    """
    Compares frequency distributions for categorical data (Conceptual - Chi-squared needed).

    Args:
        original_series (pd.Series): The original data series.
        drifted_series (pd.Series): The drifted data series.
        alpha (float): Significance level for the test.

    Returns:
        dict: A dictionary reporting potential drift based on frequency changes.
    """
    # Get value counts and normalize for comparison
    orig_counts = original_series.value_counts(normalize=True)
    drift_counts = drifted_series.value_counts(normalize=True)

    # Combine indices to ensure all categories are considered
    all_categories = orig_counts.index.union(drift_counts.index)

    # Reindex to align and fill missing categories with 0
    orig_aligned = orig_counts.reindex(all_categories, fill_value=0)
    drift_aligned = drift_counts.reindex(all_categories, fill_value=0)

    # Calculate absolute differences in proportions
    proportion_diffs = abs(orig_aligned - drift_aligned)

    # Simple check: flag if any proportion difference exceeds a threshold (e.g., 10%)
    # A more robust method would use Chi-squared test (scipy.stats.chi2_contingency)
    # or other statistical distance metrics (e.g., Total Variation Distance).
    threshold_proportion_diff = 0.10
    significant_changes = proportion_diffs[proportion_diffs > threshold_proportion_diff]

    report = {
        'original_proportions': orig_counts.to_dict(),
        'drifted_proportions': drift_counts.to_dict(),
        'proportion_differences': proportion_diffs.to_dict(),
        'significant_proportion_changes_detected': not significant_changes.empty,
        'significant_changes_details': significant_changes.to_dict() if not significant_changes.empty else {}
    }
    return report


# --- 5. Iterate Through Features and Apply Checks ---
print("--- Monitoring Features for Data Drift ---")

drift_report = {}
alpha_ks = 0.05 # Significance level for KS test
basic_stats_mean_threshold = 0.5 # Threshold for mean difference
basic_stats_std_threshold = 0.3  # Threshold for std dev difference

# Assume both dataframes have the same columns for simplicity in this example
for col in original_df.columns:
    print(f"\nChecking feature: '{col}'")
    drift_report[col] = {}

    if pd.api.types.is_numeric_dtype(original_df[col].dtype):
        print("  Type: Numerical")
        # Perform basic statistics comparison
        stats_report = compare_basic_stats(
            original_df[col],
            drifted_df[col],
            threshold_mean=basic_stats_mean_threshold,
            threshold_std=basic_stats_std_threshold
        )
        drift_report[col]['basic_stats'] = stats_report
        print(f"  Basic Stats Diff (Mean: {stats_report['mean_difference']:.2f}, Std Dev: {stats_report['std_difference']:.2f})")
        if stats_report['mean_drift_detected'] or stats_report['std_drift_detected']:
             print("  Potential drift detected based on basic stats thresholds.")

        # Perform KS test
        ks_report = perform_ks_test(original_df[col], drifted_df[col], alpha=alpha_ks)
        drift_report[col]['ks_test'] = ks_report
        if 'error' in ks_report:
            print(f"  KS Test Error: {ks_report['error']}")
        else:
            print(f"  KS Test (Statistic: {ks_report['ks_statistic']:.4f}, P-value: {ks_report['p_value']:.4f})")
            if ks_report['drift_detected_ks']:
                print(f"  Significant drift detected based on KS test (p < {alpha_ks}).")
            else:
                 print(f"  No significant drift detected based on KS test (p >= {alpha_ks}).")

    elif original_df[col].dtype == 'object' or pd.api.types.is_categorical_dtype(original_df[col].dtype):
        print("  Type: Categorical")
        # Perform categorical frequency comparison (Conceptual)
        freq_report = compare_categorical_frequencies(original_df[col], drifted_df[col])
        drift_report[col]['categorical_freq'] = freq_report
        print("  Comparing category frequencies...")
        if freq_report['significant_proportion_changes_detected']:
             print(f"  Potential drift detected based on significant category proportion changes.")
             print(f"    Details: {freq_report['significant_changes_details']}")
        else:
             print("  No significant category proportion changes detected (based on simple threshold).")

    else:
        print(f"  Type: {original_df[col].dtype} - Monitoring not implemented for this type.")
        drift_report[col]['status'] = 'Monitoring not implemented for this type'


# --- 6. Report Features Exhibiting Potential Drift ---
print("\n--- Summary of Features with Potential Drift ---")
features_with_drift = []

for col, report in drift_report.items():
    is_drifted = False
    if 'basic_stats' in report and (report['basic_stats'].get('mean_drift_detected') or report['basic_stats'].get('std_drift_detected')):
        is_drifted = True
    if 'ks_test' in report and report['ks_test'].get('drift_detected_ks'):
        is_drifted = True
    if 'categorical_freq' in report and report['categorical_freq'].get('significant_proportion_changes_detected'):
         is_drifted = True

    if is_drifted:
        features_with_drift.append(col)

if features_with_drift:
    print("The following features show potential signs of data drift:")
    for feature in features_with_drift:
        print(f"- {feature}")
else:
    print("No significant data drift detected in any feature based on the checks performed.")

# You can inspect the full drift_report dictionary for detailed results per feature
# print("\n--- Full Drift Report ---")
# import json
# print(json.dumps(drift_report, indent=2))

# --- Conclusion ---
# This script provides a framework for monitoring data drift by comparing
# feature distributions using basic statistics and the KS test.
# For a production system, you would integrate this logic into your monitoring
# pipeline and potentially use more sophisticated libraries like Evidently,
# Deepchecks, or Fiddler for comprehensive drift detection and reporting.






--- Original DataFrame Head ---
   numerical_feature_1  numerical_feature_2 categorical_feature_1  \
0            10.993428            61.801447                     A   
1             9.723471            97.395434                     B   
2            11.295377            91.010028                     C   
3            13.046060            61.914344                     A   
4             9.531693            54.163188                     B   

  categorical_feature_2    target  
0                     Y  0.464239  
1                     X  0.722738  
2                     X  0.656729  
3                     X  0.708766  
4                     X  0.008364  


--- Drifted DataFrame Head ---
   numerical_feature_1  numerical_feature_2 categorical_feature_1  \
0             7.125586            44.768947                     C   
1            12.356701            96.674478                     A   
2            14.382590            90.422237                     C   
3            10.868910      

In [2]:
import pandas as pd
import numpy as np

# --- Introduction ---
# This script demonstrates setting up basic automated data validation
# using pure Python and the pandas library.
# It covers checks for missing values, data types, and value ranges.
#
# We will:
# 1. Create a sample DataFrame with some data quality issues.
# 2. Define validation functions for different types of checks.
# 3. Apply these validation functions to the DataFrame.
# 4. Report the validation results.

# --- 1. Create a Sample DataFrame ---
print("--- Creating Sample DataFrame ---")
data = {
    'ProductID': [101, 102, 103, 104, 105, 106, 107, 108, 109, 110],
    'ProductName': ['Laptop', 'Keyboard', 'Mouse', 'Monitor', 'Webcam', 'Printer', 'Speaker', 'Headphones', 'Microphone', 'Router'],
    'Category': ['Electronics', 'Electronics', 'Electronics', 'Electronics', 'Electronics', 'Electronics', 'Audio', np.nan, 'Audio', 'Network'], # Missing value
    'Price': [1200.50, 75.00, 25.99, 300.00, -50.00, 250.00, 150.00, 99.50, 70.00, 80.00], # Negative price
    'StockQuantity': [10, 50, 0, 15, 25, 5, 12, 30, 8, 20],
    'ReleaseDate': ['2023-01-10', '2023-01-11', '2023-01-11', '2023-01-12', '2023-01-12', '2023-01-13', '2023-01-13', '2023-01-14', '2023-01-14', 'InvalidDate'], # Invalid date format
    'IsActive': [True, False, True, True, False, True, True, False, True, True]
}
df = pd.DataFrame(data)

print(df)
print("\n")

# --- 2. Define Validation Functions ---

def check_missing_values(df, column_name, is_required=True):
    """Checks for missing values in a specified column."""
    if is_required and df[column_name].isnull().any():
        null_count = df[column_name].isnull().sum()
        return f"Column '{column_name}': Contains {null_count} missing values, but is required."
    elif df[column_name].isnull().any() and not is_required:
         # Optional columns can have missing values, maybe just report count
         # null_count = df[column_name].isnull().sum()
         # return f"Column '{column_name}': Contains {null_count} missing values (optional column)."
         pass # No violation if not required
    return None

def check_data_type(df, column_name, expected_dtype):
    """Checks if a column's data type matches the expected type."""
    # Pandas dtypes can be tricky, especially with NaNs.
    # This is a basic check. More robust checks might involve type casting.
    if df[column_name].dtype != expected_dtype:
        # Allow numeric types to be checked against compatible numeric dtypes
        if not (pd.api.types.is_numeric_dtype(df[column_name].dtype) and pd.api.types.is_numeric_dtype(expected_dtype)):
             return f"Column '{column_name}': Incorrect data type. Expected '{expected_dtype}', got '{df[column_name].dtype}'."
    return None

def check_value_range(df, column_name, min_value=None, max_value=None):
    """Checks if values in a numerical column are within a specified range."""
    if not pd.api.types.is_numeric_dtype(df[column_name].dtype):
        return f"Column '{column_name}': Cannot perform range check on non-numeric type '{df[column_name].dtype}'."

    violations = []
    if min_value is not None:
        # Check for values below the minimum, ignoring NaNs
        if (df[column_name].dropna() < min_value).any():
            invalid_count = (df[column_name].dropna() < min_value).sum()
            violations.append(f"Contains {invalid_count} values below the minimum ({min_value}).")

    if max_value is not None:
         # Check for values above the maximum, ignoring NaNs
         if (df[column_name].dropna() > max_value).any():
             invalid_count = (df[column_name].dropna() > max_value).sum()
             violations.append(f"Contains {invalid_count} values above the maximum ({max_value}).")

    if violations:
        return f"Column '{column_name}': Value range violations: {', '.join(violations)}"
    return None

def check_allowed_values(df, column_name, allowed_list):
    """Checks if values in a column are within a list of allowed values."""
    # Check if non-null unique values are in the allowed list
    invalid_values = df[column_name].dropna()[~df[column_name].dropna().isin(allowed_list)].unique()
    if invalid_values.size > 0:
        return f"Column '{column_name}': Contains invalid values: {list(invalid_values)}"
    return None

# --- 3. Apply Validation ---
print("--- Applying Validation Checks ---")

validation_issues = []

# Define checks to apply
# Format: (check_function, column_name, *args)
checks_to_run = [
    (check_missing_values, 'ProductID', True), # ProductID required
    (check_data_type, 'ProductID', 'int64'),
    (check_missing_values, 'ProductName', True), # ProductName required
    (check_data_type, 'ProductName', 'object'),
    (check_missing_values, 'Category', True), # Category required
    (check_data_type, 'Category', 'object'),
    (check_missing_values, 'Price', True), # Price required
    (check_data_type, 'Price', 'float64'),
    (check_value_range, 'Price', 0.0, None), # Price must be >= 0
    (check_missing_values, 'StockQuantity', True), # StockQuantity required
    (check_data_type, 'StockQuantity', 'int64'),
    (check_value_range, 'StockQuantity', 0, None), # StockQuantity must be >= 0
    (check_missing_values, 'ReleaseDate', True), # ReleaseDate required
    (check_data_type, 'ReleaseDate', 'object'), # Basic type check, format check below
    # Note: Date format validation requires more specific logic (e.g., using pd.to_datetime with format)
    # Example (conceptual):
    # (check_date_format, 'ReleaseDate', '%Y-%m-%d'),
    (check_missing_values, 'IsActive', True), # IsActive required
    (check_data_type, 'IsActive', 'bool'),
    (check_allowed_values, 'Category', ['Electronics', 'Audio', 'Accessories', 'Software', 'Network']), # Allowed categories
]

# Run checks
for check_func, col_name, *args in checks_to_run:
    # Ensure column exists before checking
    if col_name not in df.columns:
        if args and args[0] is True: # Check if it was a required column check
             validation_issues.append(f"Required Column Missing: Column '{col_name}' is defined in checks but not found in DataFrame.")
        continue # Skip checks for non-existent columns

    issue = check_func(df, col_name, *args)
    if issue:
        validation_issues.append(issue)

# --- 4. Report Validation Results ---
print("--- Data Validation Report ---")

if not validation_issues:
    print("Validation Successful: No issues found based on defined checks.")
else:
    print("Validation Failed: The following data quality issues were detected:")
    for issue in validation_issues:
        print(f"- {issue}")

# --- Conclusion ---
# This script provides a basic framework for automated data validation
# using pandas. You can extend this by adding more validation functions
# and defining comprehensive checks_to_run for your specific dataset.
# For more complex scenarios, dedicated data validation libraries are recommended.





--- Creating Sample DataFrame ---
   ProductID ProductName     Category    Price  StockQuantity  ReleaseDate  \
0        101      Laptop  Electronics  1200.50             10   2023-01-10   
1        102    Keyboard  Electronics    75.00             50   2023-01-11   
2        103       Mouse  Electronics    25.99              0   2023-01-11   
3        104     Monitor  Electronics   300.00             15   2023-01-12   
4        105      Webcam  Electronics   -50.00             25   2023-01-12   
5        106     Printer  Electronics   250.00              5   2023-01-13   
6        107     Speaker        Audio   150.00             12   2023-01-13   
7        108  Headphones          NaN    99.50             30   2023-01-14   
8        109  Microphone        Audio    70.00              8   2023-01-14   
9        110      Router      Network    80.00             20  InvalidDate   

   IsActive  
0      True  
1     False  
2      True  
3      True  
4     False  
5      True  
6      Tr