In [7]:
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime
import smtplib
from email.mime.text import MIMEText

In [9]:
# --- Database connection ---
engine = create_engine("mysql+pymysql://root:290506@localhost:3306/news_events_db")

# --- Thresholds ---
thresholds = {
    "completeness": 0.9,     # at least 90% complete
    "validity": 0.9,         # at least 90% valid URLs
    "uniqueness": 0.98,      # at least 98% unique event_ids
    "consistency": 0.9,      # at least 90% consistent records
    "timeliness": 24         # average latency should be < 24 hours
}

# --- Queries for metrics ---
queries = {
    "completeness": "SELECT AVG(completeness_pct) AS metric FROM vw_completeness",
    "validity": "SELECT AVG(validity_pct) AS metric FROM vw_validity",
    "uniqueness": "SELECT AVG(uniqueness_pct) AS metric FROM vw_uniqueness",
    "consistency": "SELECT AVG(consistency_pct) AS metric FROM vw_consistency",
    "timeliness": "SELECT AVG(avg_latency_hours) AS metric FROM vw_timeliness_by_day"
}

alerts = []

# --- Main Monitoring Loop ---
for metric_name, query in queries.items():
    df = pd.read_sql(query, engine)
    value = df["metric"].iloc[0]

    # --- Threshold checks ---
    if metric_name == "timeliness":
        if value > thresholds[metric_name]:
            alerts.append(
                f"⚠️ Timeliness issue: average latency = {value:.2f} hrs (threshold = {thresholds[metric_name]} hrs)"
            )
    else:
        if value < thresholds[metric_name]:
            alerts.append(
                f"⚠️ {metric_name.capitalize()} below threshold: {value:.2%} < {thresholds[metric_name]*100}%"
            )

    # --- Insert metric log safely ---
    with engine.begin() as conn:
        conn.execute(
            text("""
                INSERT INTO dq_audit_log (metric_name, metric_value, check_time)
                VALUES (:metric_name, :metric_value, :check_time)
            """),
            {
                "metric_name": metric_name,
                "metric_value": float(value),
                "check_time": datetime.now()
            }
        )

In [11]:
try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT DATABASE();"))
        db_name = result.scalar()
        print(f"✅ Successfully connected to database: {db_name}")
except Exception as e:
    print("❌ Connection failed:", e)


✅ Successfully connected to database: news_events_db


In [12]:
#quick data fetch test

df = pd.read_sql("SELECT * FROM vw_completeness LIMIT 5", engine)
print(df.head())


      column_name  completeness_pct
0          amount              6.74
1  article_author             14.08
2   article_title            100.00
3    company_name             97.57
4        event_id            100.00
