<a href="https://colab.research.google.com/github/BHARATH077/Automated-Data-Quality-Metadata-Management-Framework/blob/main/Data_quality_framework.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project Title: Automated Data Quality & Metadata Management Framework

In [7]:
# ===============================
# Step 1: Setup & Sample Data Creation
# ===============================

import os
import pandas as pd
from datetime import datetime

# -------------------------------
# 1. Create Folder Structure
# -------------------------------
os.makedirs("data/raw", exist_ok=True)
os.makedirs("data/processed", exist_ok=True)
os.makedirs("data/metadata", exist_ok=True)
print("‚úÖ Folder structure created.")

# -------------------------------
# 2. Generate Sample Data
# -------------------------------

# Customer Data
customers = pd.DataFrame({
    "customer_id": [1, 2, 3, 4, 5],
    "name": ["Alice", "Bob", "Charlie", "David", "Eva"],
    "email": ["alice@x.com", "bob@x.com", "charlie@x.com", "david@x.com", None],
    "signup_date": ["2023-01-10", "2023-02-15", "2023-02-28", "2023-03-05", "2023-03-20"]
})
customers.to_csv("data/raw/customers.csv", index=False)

# Orders Data
orders = pd.DataFrame({
    "order_id": [101, 102, 103, 104, 105],
    "customer_id": [1, 2, 3, 6, 3],  # note: customer_id=6 doesn‚Äôt exist in customers
    "order_amount": [250, 300, None, 150, 500],
    "order_date": ["2023-03-10", "2023-03-12", "2023-03-15", "2023-03-16", "2023-03-18"]
})
orders.to_csv("data/raw/orders.csv", index=False)

# Products Data
products = pd.DataFrame({
    "product_id": [11, 12, 13, 14, 15],
    "product_name": ["Laptop", "Phone", "Tablet", "Monitor", "Mouse"],
    "price": [1200, 800, 300, 200, 50]
})
products.to_csv("data/raw/products.csv", index=False)

print("‚úÖ Sample CSVs created in data/raw/")

# -------------------------------
# 3. Initialize Metadata Catalog
# -------------------------------

metadata_catalog = pd.DataFrame({
    "table_name": ["customers", "orders", "products"],
    "file_path": [
        "data/raw/customers.csv",
        "data/raw/orders.csv",
        "data/raw/products.csv"
    ],
    "record_count": [len(customers), len(orders), len(products)],
    "last_updated": [datetime.now().strftime("%Y-%m-%d %H:%M:%S")]*3
})

metadata_catalog.to_csv("data/metadata/catalog.csv", index=False)
print("‚úÖ Metadata catalog created at data/metadata/catalog.csv")

# -------------------------------
# 4. Preview Metadata
# -------------------------------
pd.read_csv("data/metadata/catalog.csv")


‚úÖ Folder structure created.
‚úÖ Sample CSVs created in data/raw/
‚úÖ Metadata catalog created at data/metadata/catalog.csv


Unnamed: 0,table_name,file_path,record_count,last_updated
0,customers,data/raw/customers.csv,5,2025-10-17 15:35:45
1,orders,data/raw/orders.csv,5,2025-10-17 15:35:45
2,products,data/raw/products.csv,5,2025-10-17 15:35:45


In [8]:
# ===============================
# Step 2: Schema Validation Framework
# ===============================

import pandas as pd
import os
from datetime import datetime

# -------------------------------
# 1. Define Expected Schemas
# -------------------------------
expected_schemas = {
    "customers": {
        "customer_id": "int64",
        "name": "object",
        "email": "object",
        "signup_date": "object"
    },
    "orders": {
        "order_id": "int64",
        "customer_id": "int64",
        "order_amount": "float64",
        "order_date": "object"
    },
    "products": {
        "product_id": "int64",
        "product_name": "object",
        "price": "int64"
    }
}

# -------------------------------
# 2. Load Metadata Catalog
# -------------------------------
catalog_path = "data/metadata/catalog.csv"
metadata_catalog = pd.read_csv(catalog_path)

validation_results = []

# -------------------------------
# 3. Schema Validation Function
# -------------------------------
def validate_schema(table_name, expected_schema, file_path):
    try:
        df = pd.read_csv(file_path)
        actual_cols = list(df.columns)
        expected_cols = list(expected_schema.keys())

        # Column comparison
        missing_cols = [c for c in expected_cols if c not in actual_cols]
        extra_cols = [c for c in actual_cols if c not in expected_cols]

        # Data type validation
        type_mismatches = {}
        for col, expected_type in expected_schema.items():
            if col in df.columns:
                actual_type = str(df[col].dtype)
                if actual_type != expected_type:
                    type_mismatches[col] = f"{actual_type} != {expected_type}"

        # Status
        status = "PASS" if not missing_cols and not extra_cols and not type_mismatches else "FAIL"

        result = {
            "table_name": table_name,
            "status": status,
            "missing_cols": ",".join(missing_cols) if missing_cols else None,
            "extra_cols": ",".join(extra_cols) if extra_cols else None,
            "type_mismatches": str(type_mismatches) if type_mismatches else None,
            "validated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

        print(f"‚úÖ Schema validation for {table_name}: {status}")
        return result

    except Exception as e:
        print(f"‚ùå Error validating {table_name}: {e}")
        return {
            "table_name": table_name,
            "status": "ERROR",
            "missing_cols": None,
            "extra_cols": None,
            "type_mismatches": None,
            "validated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_message": str(e)
        }

# -------------------------------
# 4. Run Validation for All Tables
# -------------------------------
for _, row in metadata_catalog.iterrows():
    tbl = row["table_name"]
    file_path = row["file_path"]
    result = validate_schema(tbl, expected_schemas[tbl], file_path)
    validation_results.append(result)

schema_report = pd.DataFrame(validation_results)

# -------------------------------
# 5. Save Schema Validation Report
# -------------------------------
os.makedirs("data/metadata/reports", exist_ok=True)
schema_report_path = "data/metadata/reports/schema_validation_report.csv"
schema_report.to_csv(schema_report_path, index=False)
print(f"üìÑ Schema validation report saved to {schema_report_path}")

# -------------------------------
# 6. Update Metadata Catalog with Validation Results
# -------------------------------
metadata_catalog["schema_status"] = metadata_catalog["table_name"].map(
    schema_report.set_index("table_name")["status"]
)
metadata_catalog["last_validated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
metadata_catalog.to_csv(catalog_path, index=False)
print("‚úÖ Metadata catalog updated with schema validation results.")

# -------------------------------
# 7. Display Reports
# -------------------------------
print("\n=== SCHEMA VALIDATION REPORT ===")
display(schema_report)

print("\n=== UPDATED METADATA CATALOG ===")
display(metadata_catalog)


‚úÖ Schema validation for customers: PASS
‚úÖ Schema validation for orders: PASS
‚úÖ Schema validation for products: PASS
üìÑ Schema validation report saved to data/metadata/reports/schema_validation_report.csv
‚úÖ Metadata catalog updated with schema validation results.

=== SCHEMA VALIDATION REPORT ===


Unnamed: 0,table_name,status,missing_cols,extra_cols,type_mismatches,validated_at
0,customers,PASS,,,,2025-10-17 15:35:45
1,orders,PASS,,,,2025-10-17 15:35:45
2,products,PASS,,,,2025-10-17 15:35:45



=== UPDATED METADATA CATALOG ===


Unnamed: 0,table_name,file_path,record_count,last_updated,schema_status,last_validated
0,customers,data/raw/customers.csv,5,2025-10-17 15:35:45,PASS,2025-10-17 15:35:45
1,orders,data/raw/orders.csv,5,2025-10-17 15:35:45,PASS,2025-10-17 15:35:45
2,products,data/raw/products.csv,5,2025-10-17 15:35:45,PASS,2025-10-17 15:35:45


In [9]:
# ===============================
# Step 3: Data Quality Validation Framework
# ===============================

import pandas as pd
import os
from datetime import datetime

# -------------------------------
# 1. Load Data & Metadata
# -------------------------------
catalog = pd.read_csv("data/metadata/catalog.csv")

# Helper to load tables as DataFrames
def load_table(table_name):
    path = catalog.loc[catalog["table_name"] == table_name, "file_path"].values[0]
    return pd.read_csv(path)

customers = load_table("customers")
orders = load_table("orders")
products = load_table("products")

# -------------------------------
# 2. Define Data Quality Checks
# -------------------------------

def check_nulls(df, table_name):
    null_counts = df.isnull().sum()
    total_nulls = int(null_counts.sum())
    return {
        "table_name": table_name,
        "check_type": "NULL_CHECK",
        "result": "FAIL" if total_nulls > 0 else "PASS",
        "details": str(null_counts[null_counts > 0].to_dict())
    }

def check_duplicates(df, table_name):
    dup_count = df.duplicated().sum()
    return {
        "table_name": table_name,
        "check_type": "DUPLICATE_CHECK",
        "result": "FAIL" if dup_count > 0 else "PASS",
        "details": f"{dup_count} duplicate rows"
    }

def check_referential_integrity(child_df, parent_df, child_key, parent_key, child_table, parent_table):
    missing_refs = ~child_df[child_key].isin(parent_df[parent_key])
    invalid_count = missing_refs.sum()
    return {
        "table_name": child_table,
        "check_type": "REFERENTIAL_INTEGRITY",
        "result": "FAIL" if invalid_count > 0 else "PASS",
        "details": f"{invalid_count} invalid {child_key} not found in {parent_table}"
    }

def check_negative_values(df, table_name, numeric_columns):
    negatives = {}
    for col in numeric_columns:
        if col in df.columns:
            negatives[col] = int((df[col].fillna(0) < 0).sum())
    total_negatives = sum(negatives.values())
    return {
        "table_name": table_name,
        "check_type": "NEGATIVE_VALUE_CHECK",
        "result": "FAIL" if total_negatives > 0 else "PASS",
        "details": str({k:v for k,v in negatives.items() if v > 0})
    }

# -------------------------------
# 3. Run Data Quality Checks
# -------------------------------
results = []

# Customers
results.append(check_nulls(customers, "customers"))
results.append(check_duplicates(customers, "customers"))

# Orders
results.append(check_nulls(orders, "orders"))
results.append(check_duplicates(orders, "orders"))
results.append(check_referential_integrity(orders, customers, "customer_id", "customer_id", "orders", "customers"))
results.append(check_negative_values(orders, "orders", ["order_amount"]))

# Products
results.append(check_nulls(products, "products"))
results.append(check_duplicates(products, "products"))
results.append(check_negative_values(products, "products", ["price"]))

dq_report = pd.DataFrame(results)

# -------------------------------
# 4. Save Data Quality Report
# -------------------------------
os.makedirs("data/metadata/reports", exist_ok=True)
dq_report_path = "data/metadata/reports/data_quality_report.csv"
dq_report.to_csv(dq_report_path, index=False)
print(f"üìä Data Quality Report saved to {dq_report_path}")

# -------------------------------
# 5. Update Metadata Catalog
# -------------------------------
dq_summary = dq_report.groupby("table_name")["result"].apply(lambda x: "FAIL" if "FAIL" in x.values else "PASS").reset_index()
dq_summary.columns = ["table_name", "data_quality_status"]

catalog = catalog.merge(dq_summary, on="table_name", how="left")
catalog["last_quality_check"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
catalog.to_csv("data/metadata/catalog.csv", index=False)

# -------------------------------
# 6. Display Reports
# -------------------------------
print("\n=== DATA QUALITY REPORT ===")
display(dq_report)

print("\n=== UPDATED METADATA CATALOG ===")
display(catalog)


üìä Data Quality Report saved to data/metadata/reports/data_quality_report.csv

=== DATA QUALITY REPORT ===


Unnamed: 0,table_name,check_type,result,details
0,customers,NULL_CHECK,FAIL,{'email': 1}
1,customers,DUPLICATE_CHECK,PASS,0 duplicate rows
2,orders,NULL_CHECK,FAIL,{'order_amount': 1}
3,orders,DUPLICATE_CHECK,PASS,0 duplicate rows
4,orders,REFERENTIAL_INTEGRITY,FAIL,1 invalid customer_id not found in customers
5,orders,NEGATIVE_VALUE_CHECK,PASS,{}
6,products,NULL_CHECK,PASS,{}
7,products,DUPLICATE_CHECK,PASS,0 duplicate rows
8,products,NEGATIVE_VALUE_CHECK,PASS,{}



=== UPDATED METADATA CATALOG ===


Unnamed: 0,table_name,file_path,record_count,last_updated,schema_status,last_validated,data_quality_status,last_quality_check
0,customers,data/raw/customers.csv,5,2025-10-17 15:35:45,PASS,2025-10-17 15:35:45,FAIL,2025-10-17 15:35:46
1,orders,data/raw/orders.csv,5,2025-10-17 15:35:45,PASS,2025-10-17 15:35:45,FAIL,2025-10-17 15:35:46
2,products,data/raw/products.csv,5,2025-10-17 15:35:45,PASS,2025-10-17 15:35:45,PASS,2025-10-17 15:35:46


In [10]:
# ===============================
# Step 4: Metadata Management Framework
# ===============================

import pandas as pd
import os
from datetime import datetime
import hashlib
import json

# -------------------------------
# 1. Load Previous Metadata & Data Quality Results
# -------------------------------
catalog_path = "data/metadata/catalog.csv"
dq_report_path = "data/metadata/reports/data_quality_report.csv"

catalog = pd.read_csv(catalog_path)
dq_report = pd.read_csv(dq_report_path)

# -------------------------------
# 2. Define Helper: Schema & Stats Extractor
# -------------------------------

def extract_metadata(df: pd.DataFrame, table_name: str, file_path: str):
    """Extracts metadata like schema, record count, column stats, and hash for versioning"""
    schema_info = {col: str(df[col].dtype) for col in df.columns}
    record_count = len(df)

    # Create a schema hash to detect structural changes
    schema_hash = hashlib.md5(json.dumps(schema_info, sort_keys=True).encode()).hexdigest()

    metadata = {
        "table_name": table_name,
        "file_path": file_path,
        "record_count": record_count,
        "schema_hash": schema_hash,
        "schema_json": json.dumps(schema_info),
        "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    return metadata

# -------------------------------
# 3. Extract Metadata for All Tables
# -------------------------------

def load_table(table_name):
    path = catalog.loc[catalog["table_name"] == table_name, "file_path"].values[0]
    return pd.read_csv(path), path

metadata_records = []

for table_name in catalog["table_name"]:
    df, path = load_table(table_name)
    metadata_records.append(extract_metadata(df, table_name, path))

metadata_df = pd.DataFrame(metadata_records)

# -------------------------------
# 4. Merge Data Quality & Metadata
# -------------------------------

# Summarize DQ results
dq_summary = dq_report.groupby("table_name")["result"].apply(lambda x: "FAIL" if "FAIL" in x.values else "PASS").reset_index()
dq_summary.columns = ["table_name", "dq_status"]

# Merge into metadata
metadata_combined = pd.merge(metadata_df, dq_summary, on="table_name", how="left")

# Add version info
metadata_combined["version"] = datetime.now().strftime("%Y%m%d_%H%M%S")

# -------------------------------
# 5. Save Historical Metadata Catalog
# -------------------------------
os.makedirs("data/metadata/history", exist_ok=True)

version_file = f"data/metadata/history/catalog_{metadata_combined['version'][0]}.csv"
metadata_combined.to_csv(version_file, index=False)

# Update the latest metadata catalog file
metadata_combined.to_csv("data/metadata/metadata_catalog.csv", index=False)

print(f"üìò Metadata catalog updated and versioned at: {version_file}")

# -------------------------------
# 6. Display Updated Metadata
# -------------------------------
print("\n=== CURRENT METADATA CATALOG ===")
display(metadata_combined[["table_name", "record_count", "dq_status", "version", "last_updated"]])


üìò Metadata catalog updated and versioned at: data/metadata/history/catalog_20251017_153546.csv

=== CURRENT METADATA CATALOG ===


Unnamed: 0,table_name,record_count,dq_status,version,last_updated
0,customers,5,FAIL,20251017_153546,2025-10-17 15:35:46
1,orders,5,FAIL,20251017_153546,2025-10-17 15:35:46
2,products,5,PASS,20251017_153546,2025-10-17 15:35:46


In [11]:
# ===============================
# Step 5: Orchestration Framework (Airflow Simulation)
# ===============================

import os
import pandas as pd
from datetime import datetime
import time
import traceback
import networkx as nx

# -------------------------------
# 1. Define Task Functions
# -------------------------------

def task_ingest_data():
    """Simulate data ingestion (re-run of Day 2 mock data creation)"""
    print("üì• Running data ingestion...")
    time.sleep(2)  # simulate processing delay
    return "SUCCESS"

def task_validate_data():
    """Run data quality checks (simulate Day 3)"""
    print("üîç Running data quality validation...")
    time.sleep(2)
    # load DQ report and check pass/fail
    dq_report = pd.read_csv("data/metadata/reports/data_quality_report.csv")
    if "FAIL" in dq_report["result"].values:
        print("‚ö†Ô∏è Some data quality checks failed.")
        return "FAIL_WITH_WARNINGS"
    return "SUCCESS"

def task_update_metadata():
    """Simulate metadata management framework (Day 4)"""
    print("üß≠ Updating metadata catalog...")
    time.sleep(2)
    catalog = pd.read_csv("data/metadata/metadata_catalog.csv")
    catalog["last_orchestrated_run"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    catalog.to_csv("data/metadata/metadata_catalog.csv", index=False)
    return "SUCCESS"

# -------------------------------
# 2. Define Orchestration DAG
# -------------------------------

dag = nx.DiGraph()

# Define nodes (tasks)
dag.add_node("ingest_data")
dag.add_node("validate_data")
dag.add_node("update_metadata")

# Define dependencies (edges)
dag.add_edge("ingest_data", "validate_data")
dag.add_edge("validate_data", "update_metadata")

# -------------------------------
# 3. DAG Executor
# -------------------------------

def execute_dag(dag):
    execution_log = []
    print("\nüöÄ Starting DAG execution...\n")

    for task in nx.topological_sort(dag):
        print(f"üü¢ Executing Task: {task}")
        start_time = datetime.now()
        status = "FAILED"
        try:
            if task == "ingest_data":
                status = task_ingest_data()
            elif task == "validate_data":
                status = task_validate_data()
            elif task == "update_metadata":
                status = task_update_metadata()
        except Exception as e:
            print(f"‚ùå Task {task} failed: {e}")
            traceback.print_exc()
            status = "FAILED"
        end_time = datetime.now()

        execution_log.append({
            "task_name": task,
            "status": status,
            "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S"),
            "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S"),
            "duration_sec": (end_time - start_time).total_seconds()
        })

        if status not in ["SUCCESS", "FAIL_WITH_WARNINGS"]:
            print(f"‚õî DAG halted due to failure in task: {task}")
            break

        print(f"‚úÖ Task {task} completed with status: {status}\n")

    print("üéâ DAG Execution Complete!\n")
    return execution_log

# -------------------------------
# 4. Execute the DAG
# -------------------------------

execution_results = execute_dag(dag)

# -------------------------------
# 5. Save DAG Run Metadata
# -------------------------------
os.makedirs("data/orchestration", exist_ok=True)
run_log_path = "data/orchestration/pipeline_runs.csv"

df_run_log = pd.DataFrame(execution_results)
df_run_log["run_id"] = datetime.now().strftime("%Y%m%d_%H%M%S")

if os.path.exists(run_log_path):
    existing_log = pd.read_csv(run_log_path)
    updated_log = pd.concat([existing_log, df_run_log], ignore_index=True)
else:
    updated_log = df_run_log

updated_log.to_csv(run_log_path, index=False)

print(f"üìÑ DAG execution log saved at: {run_log_path}")

# Display DAG log
display(updated_log.tail(10))



üöÄ Starting DAG execution...

üü¢ Executing Task: ingest_data
üì• Running data ingestion...
‚úÖ Task ingest_data completed with status: SUCCESS

üü¢ Executing Task: validate_data
üîç Running data quality validation...
‚ö†Ô∏è Some data quality checks failed.

üü¢ Executing Task: update_metadata
üß≠ Updating metadata catalog...
‚úÖ Task update_metadata completed with status: SUCCESS

üéâ DAG Execution Complete!

üìÑ DAG execution log saved at: data/orchestration/pipeline_runs.csv


Unnamed: 0,task_name,status,start_time,end_time,duration_sec,run_id
0,ingest_data,SUCCESS,2025-10-17 15:25:32,2025-10-17 15:25:34,2.000181,20251017_152538
1,validate_data,FAIL_WITH_WARNINGS,2025-10-17 15:25:34,2025-10-17 15:25:36,2.002751,20251017_152538
2,update_metadata,SUCCESS,2025-10-17 15:25:36,2025-10-17 15:25:38,2.003868,20251017_152538
3,ingest_data,SUCCESS,2025-10-17 15:35:46,2025-10-17 15:35:48,2.000197,20251017_153552
4,validate_data,FAIL_WITH_WARNINGS,2025-10-17 15:35:48,2025-10-17 15:35:50,2.00254,20251017_153552
5,update_metadata,SUCCESS,2025-10-17 15:35:50,2025-10-17 15:35:52,2.004107,20251017_153552


In [12]:
# ===============================
# Step 6: Monitoring & Dashboard
# ===============================

import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

# -------------------------------
# 1. Load DAG Run Log
# -------------------------------
run_log_path = "data/orchestration/pipeline_runs.csv"
dag_runs = pd.read_csv(run_log_path)
print(f"üìÑ Loaded DAG run log: {run_log_path}")

# -------------------------------
# 2. Summary Metrics
# -------------------------------
total_runs = dag_runs['run_id'].nunique()
failed_tasks = dag_runs[dag_runs['status'].str.contains("FAIL")].shape[0]
success_tasks = dag_runs[dag_runs['status'] == "SUCCESS"].shape[0]

print(f"Total DAG runs: {total_runs}")
print(f"Total failed tasks: {failed_tasks}")
print(f"Total successful tasks: {success_tasks}")

# -------------------------------
# 3. Generate Alerts (Simulation)
# -------------------------------
for _, row in dag_runs.iterrows():
    if "FAIL" in row['status']:
        print(f"‚ö†Ô∏è ALERT: Task {row['task_name']} failed in run {row['run_id']}")

# -------------------------------
# 4. Visualize DAG Execution
# -------------------------------

# Task Status Bar Chart
fig_status = px.bar(
    dag_runs,
    x="task_name",
    y="duration_sec",
    color="status",
    title="Task Execution Status & Duration",
    labels={"task_name": "Task Name", "duration_sec": "Duration (seconds)"}
)

fig_status.show()

# DAG Execution Timeline
fig_timeline = px.timeline(
    dag_runs,
    x_start="start_time",
    x_end="end_time",
    y="task_name",
    color="status",
    title="DAG Execution Timeline"
)
fig_timeline.update_yaxes(autorange="reversed")
fig_timeline.show()

# -------------------------------
# 5. Data Quality Status per Table
# -------------------------------
metadata_catalog = pd.read_csv("data/metadata/metadata_catalog.csv")

fig_dq = px.bar(
    metadata_catalog,
    x="table_name",
    y="record_count",
    color="dq_status",
    title="Data Quality Status per Table",
    labels={"dq_status": "Data Quality Status", "record_count": "Record Count"}
)
fig_dq.show()


üìÑ Loaded DAG run log: data/orchestration/pipeline_runs.csv
Total DAG runs: 2
Total failed tasks: 2
Total successful tasks: 4
‚ö†Ô∏è ALERT: Task validate_data failed in run 20251017_152538
‚ö†Ô∏è ALERT: Task validate_data failed in run 20251017_153552
