## Architecture to Monitor Data Quality Over Time

**Description**: Design a monitoring system in Python that checks and logs data quality metrics (accuracy, completeness) for a dataset over time.

**Steps to follow:**
1. Implement a Scheduled Script:
    - Use schedule library to periodically run a script.
2. Script to Calculate Metrics:
    - For simplicity, use a function calculate_quality_metrics() that calculates and logs metrics such as missing rate or mismatch rate.
3. Store Logs:
    - Use Python's logging library to save these metrics over time.

In [None]:
# Install required library
# If you're using Jupyter Notebook, uncomment the line below:


import pandas as pd
import logging
import schedule
import time
import random

# ----------------------------------------
# Step 1: Set up logging
# ----------------------------------------
logging.basicConfig(
    filename='data_quality.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# ----------------------------------------
# Step 2: Simulated data loading function
# Replace this with pd.read_csv('your_data.csv') for real use
# ----------------------------------------
def load_data():
    data = {
        "transaction_id": [1001, 1002, None, 1004, None],
        "amount": [200, None, 300, 400, 500],
        "date": ["2024-01-01", "2024-01-02", "2024-01-03", None, "2024-01-05"],
        "status": ["Completed", "Failed", "Completed", "Completed", random.choice(["Failed", None])]
    }
    df = pd.DataFrame(data)
    return df

# ----------------------------------------
# Step 3: Calculate quality metrics
# ----------------------------------------
def calculate_quality_metrics(df):
    metrics = {}

    # Completeness: % missing in critical fields
    critical_fields = ['transaction_id', 'amount', 'date']
    for col in critical_fields:
        missing_pct = df[col].isnull().mean() * 100
        metrics[f"{col}_missing_pct"] = round(missing_pct, 2)

    # Accuracy: status should be either 'Completed' or 'Failed'
    valid_statuses = {'Completed', 'Failed'}
    invalid_status_pct = (~df['status'].isin(valid_statuses)).mean() * 100
    metrics['status_accuracy_pct'] = round(100 - invalid_status_pct, 2)

    return metrics

# ----------------------------------------
# Step 4: Scheduled job to run the checks
# ----------------------------------------
def job():
    df = load_data()
    metrics = calculate_quality_metrics(df)
    log_message = " | ".join([f"{key}: {value}%" for key, value in metrics.items()])
    logging.info(f"Data Quality Metrics - {log_message}")
    print(f"Logged: {log_message}")

# ----------------------------------------
# Step 5: Schedule job to run every minute
# ----------------------------------------
schedule.every(1).minutes.do(job)

print("⏳ Data Quality Monitor is running. Press Ctrl+C to stop.")

while True:
    schedule.run_pending()
# Install required library
# If you're using Jupyter Notebook, uncomment the line below:


import pandas as pd
import logging
import schedule
import time
import random

# ----------------------------------------
# Step 1: Set up logging
# ----------------------------------------
logging.basicConfig(
    filename='data_quality.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# ----------------------------------------
# Step 2: Simulated data loading function
# Replace this with pd.read_csv('your_data.csv') for real use
# ----------------------------------------
def load_data():
    data = {
        "transaction_id": [1001, 1002, None, 1004, None],
        "amount": [200, None, 300, 400, 500],
        "date": ["2024-01-01", "2024-01-02", "2024-01-03", None, "2024-01-05"],
        "status": ["Completed", "Failed", "Completed", "Completed", random.choice(["Failed", None])]
    }
    df = pd.DataFrame(data)
    return df

# ----------------------------------------
# Step 3: Calculate quality metrics
# ----------------------------------------
def calculate_quality_metrics(df):
    metrics = {}

    # Completeness: % missing in critical fields
    critical_fields = ['transaction_id', 'amount', 'date']
    for col in critical_fields:
        missing_pct = df[col].isnull().mean() * 100
        metrics[f"{col}_missing_pct"] = round(missing_pct, 2)

    # Accuracy: status should be either 'Completed' or 'Failed'
    valid_statuses = {'Completed', 'Failed'}
    invalid_status_pct = (~df['status'].isin(valid_statuses)).mean() * 100
    metrics['status_accuracy_pct'] = round(100 - invalid_status_pct, 2)

    return metrics

# ----------------------------------------
# Step 4: Scheduled job to run the checks
# ----------------------------------------
def job():
    df = load_data()
    metrics = calculate_quality_metrics(df)
    log_message = " | ".join([f"{key}: {value}%" for key, value in metrics.items()])
    logging.info(f"Data Quality Metrics - {log_message}")
    print(f"Logged: {log_message}")

# ----------------------------------------
# Step 5: Schedule job to run every minute
# ----------------------------------------
schedule.every(1).minutes.do(job)

print("⏳ Data Quality Monitor is running. Press Ctrl+C to stop.")

while True:
    schedule.run_pending()
    time.sleep(1)    time.sleep(1)

⏳ Data Quality Monitor is running. Press Ctrl+C to stop.
Logged: transaction_id_missing_pct: 40.0% | amount_missing_pct: 20.0% | date_missing_pct: 20.0% | status_accuracy_pct: 80.0%
Logged: transaction_id_missing_pct: 40.0% | amount_missing_pct: 20.0% | date_missing_pct: 20.0% | status_accuracy_pct: 80.0%
Logged: transaction_id_missing_pct: 40.0% | amount_missing_pct: 20.0% | date_missing_pct: 20.0% | status_accuracy_pct: 80.0%
Logged: transaction_id_missing_pct: 40.0% | amount_missing_pct: 20.0% | date_missing_pct: 20.0% | status_accuracy_pct: 100.0%
Logged: transaction_id_missing_pct: 40.0% | amount_missing_pct: 20.0% | date_missing_pct: 20.0% | status_accuracy_pct: 80.0%
