## Architecture to Monitor Data Quality Over Time

**Description**: Design a monitoring system in Python that checks and logs data quality metrics (accuracy, completeness) for a dataset over time.

**Steps to follow:**
1. Implement a Scheduled Script:
    - Use schedule library to periodically run a script.
2. Script to Calculate Metrics:
    - For simplicity, use a function calculate_quality_metrics() that calculates and logs metrics such as missing rate or mismatch rate.
3. Store Logs:
    - Use Python's logging library to save these metrics over time.

In [None]:
# Write your code from here
# Write your code from here
import schedule
import time
import pandas as pd
import logging
from datetime import datetime
logging.basicConfig(filename='data_quality_monitoring.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

DATA_FILE = 'your_dataset.csv'  # Replace with your actual data file
TRUSTED_DATA_FILE = 'trusted_dataset.csv' # Replace with your trusted data file (if needed for accuracy)
ID_COLUMN = 'id'      # Replace with your unique identifier column name (if applicable for joins)
COMPLETENESS_COLUMNS = ['column_a', 'column_b', 'column_c'] # Replace with columns for completeness check
ACCURACY_COLUMN = 'data_value' # Replace with the column to check for accuracy against trusted data
TRUSTED_VALUE_COLUMN = 'trusted_value' # Replace with the column in the trusted dataset
def calculate_completeness(df, columns):
    """Calculates the completeness rate for specified columns."""
    completeness_metrics = {}
    total_rows = len(df)
    if total_rows > 0:
        for col in columns:
            missing_count = df[col].isnull().sum()
            completeness_rate = ((total_rows - missing_count) / total_rows) * 100
            completeness_metrics[f'completeness_{col}'] = f"{completeness_rate:.2f}%"
    return completeness_metrics
def calculate_accuracy(df, trusted_df, id_col, data_col, trusted_col):
    """Calculates accuracy by comparing a column with a trusted source."""
    accuracy_metrics = {}
    if trusted_df is not None and not trusted_df.empty and id_col in df.columns and data_col in df.columns and id_col in trusted_df.columns and trusted_col in trusted_df.columns:
        merged_df = pd.merge(df[[id_col, data_col]], trusted_df[[id_col, trusted_col]], on=id_col, suffixes=('_current', '_trusted'), how='inner')
        if not merged_df.empty:
            mismatches = merged_df[merged_df[f'{data_col}_current'] != merged_df[f'{trusted_col}_trusted']]
            total_compared = len(merged_df)
            mismatch_rate = (len(mismatches) / total_compared) * 100 if total_compared > 0 else 0
            accuracy_metrics[f'accuracy_{data_col}'] = f"{100 - mismatch_rate:.2f}%"
            accuracy_metrics[f'mismatch_rate_{data_col}'] = f"{mismatch_rate:.2f}%"
    return accuracy_metrics

def calculate_quality_metrics():
    """Calculates and logs data quality metrics."""
    try:
        df = pd.read_csv(DATA_FILE)
        logging.info(f"Data loaded successfully from {DATA_FILE} at {datetime.now()}")

        completeness = calculate_completeness(df, COMPLETENESS_COLUMNS)
        for metric, value in completeness.items():
            logging.info(f"Metric: {metric}, Value: {value}")

        trusted_df = None
        if TRUSTED_DATA_FILE:
            try:
                trusted_df = pd.read_csv(TRUSTED_DATA_FILE)
                logging.info(f"Trusted data loaded successfully from {TRUSTED_DATA_FILE}")
                accuracy = calculate_accuracy(df, trusted_df, ID_COLUMN, ACCURACY_COLUMN, TRUSTED_VALUE_COLUMN)
                for metric, value in accuracy.items():
                    logging.info(f"Metric: {metric}, Value: {value}")
            except FileNotFoundError:
                logging.warning(f"Trusted data file {TRUSTED_DATA_FILE} not found. Skipping accuracy checks.")
            except Exception as e:
                logging.error(f"Error loading trusted data: {e}")

    except FileNotFoundError:
        logging.error(f"Data file {DATA_FILE} not found.")
    except Exception as e:
        logging.error(f"Error calculating data quality metrics: {e}")

# 1. Implement a Scheduled Script:
schedule.every().hour.do(calculate_quality_metrics)
# schedule.every().day.at("09:00").do(calculate_quality_metrics)
# schedule.every(15).minutes.do(calculate_quality_metrics)

if __name__ == "__main__":
    logging.info("Data Quality Monitoring system started.")
    while True:
        schedule.run_pending()
        time.sleep(1)


KeyboardInterrupt: 