In [20]:
# generate_synthetic_procurement_data.py
# Creates 4 CSVs with synthetic procurement data and intentionally "bad" data.
# Outputs to: ./data/raw/

from __future__ import annotations

import os
import random
from datetime import datetime, timedelta
from typing import List, Dict

import numpy as np
import pandas as pd


def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


def random_date(start: datetime, end: datetime) -> datetime:
    """Random datetime between start and end."""
    delta = end - start
    seconds = random.randint(0, int(delta.total_seconds()))
    return start + timedelta(seconds=seconds)


def main(
    out_dir: str = "data/raw",
    seed: int = 42,
    n_suppliers: int = 120,
    n_parts: int = 350,
    n_pos: int = 900,
    n_lines: int = 3200,
) -> None:
    random.seed(seed)
    np.random.seed(seed)

    ensure_dir(out_dir)

    # ---------- Reference values ----------
    countries = ["US", "CA", "MX", "GB", "DE", "FR", "IN", "CN", "JP", "BR", "AU"]
    supplier_statuses = ["ACTIVE", "INACTIVE", "BLOCKED"]
    part_types = ["RAW", "FINISHED", "PACKAGING", "SERVICE", "MRO"]
    uoms = ["EA", "KG", "LB", "L", "M", "BOX"]
    po_statuses = ["OPEN", "APPROVED", "CLOSED", "CANCELLED"]

    # "Good" currency list + some wrong ones weâ€™ll inject later
    good_currencies = ["USD", "EUR", "GBP", "CAD", "JPY", "INR", "AUD"]
    bad_currency_codes = ["US", "EURO", "usd", "XXX", "", None]

    # ---------- suppliers.csv ----------
    supplier_ids = np.arange(1000, 1000 + n_suppliers)

    suppliers = pd.DataFrame({
        "supplier_id": supplier_ids,
        "supplier_name": [f"Supplier {i}" for i in supplier_ids],
        "status": np.random.choice(supplier_statuses, size=n_suppliers, p=[0.75, 0.18, 0.07]),
        "country": np.random.choice(countries, size=n_suppliers),
    })

    # Seed bad data: nulls, duplicates, invalid values
    # 1) Null supplier_name
    suppliers.loc[np.random.choice(suppliers.index, size=3, replace=False), "supplier_name"] = None
    # 2) Duplicate supplier_id rows (exact duplicates)
    dup_rows = suppliers.sample(2, random_state=seed)
    suppliers = pd.concat([suppliers, dup_rows], ignore_index=True)
    # 3) Invalid status
    suppliers.loc[np.random.choice(suppliers.index, size=2, replace=False), "status"] = "ACTIV"  # typo
    # 4) Invalid country code length
    suppliers.loc[np.random.choice(suppliers.index, size=2, replace=False), "country"] = "UNITED_STATES"

    suppliers.to_csv(os.path.join(out_dir, "suppliers.csv"), index=False)

    # ---------- parts.csv ----------
    part_ids = np.arange(20000, 20000 + n_parts)

    parts = pd.DataFrame({
        "part_id": part_ids,
        "part_name": [f"Part {pid}" for pid in part_ids],
        "part_type": np.random.choice(part_types, size=n_parts),
        "uom": np.random.choice(uoms, size=n_parts),
        "is_active": np.random.choice([True, False], size=n_parts, p=[0.9, 0.1]),
    })

    # Seed bad data
    # 1) Null uom
    parts.loc[np.random.choice(parts.index, size=4, replace=False), "uom"] = None
    # 2) Duplicate part_id
    parts = pd.concat([parts, parts.sample(3, random_state=seed)], ignore_index=True)
    # 3) Invalid UOM
    parts.loc[np.random.choice(parts.index, size=3, replace=False), "uom"] = "EACH"
    # 4) Invalid is_active (string)
    parts.loc[np.random.choice(parts.index, size=2, replace=False), "is_active"] = "yes"

    parts.to_csv(os.path.join(out_dir, "parts.csv"), index=False)

    # ---------- purchase_orders.csv ----------
    po_ids = np.arange(500000, 500000 + n_pos)

    start = datetime.now() - timedelta(days=365 * 2)
    end = datetime.now()

    purchase_orders = pd.DataFrame({
        "po_id": po_ids,
        "supplier_id": np.random.choice(supplier_ids, size=n_pos),
        "po_date": [random_date(start, end).date().isoformat() for _ in range(n_pos)],
        "currency": np.random.choice(good_currencies, size=n_pos, p=[0.55, 0.12, 0.08, 0.1, 0.05, 0.07, 0.03]),
        "total_amount": np.round(np.random.gamma(shape=2.0, scale=1200.0, size=n_pos), 2),
        "status": np.random.choice(po_statuses, size=n_pos, p=[0.25, 0.45, 0.25, 0.05]),
    })

    # Seed bad data
    # 1) Null currency
    purchase_orders.loc[np.random.choice(purchase_orders.index, size=5, replace=False), "currency"] = None
    # 2) Bad currency codes
    purchase_orders.loc[np.random.choice(purchase_orders.index, size=6, replace=False), "currency"] = np.random.choice(
        bad_currency_codes, size=6
    )
    # 3) Negative total_amount
    purchase_orders.loc[np.random.choice(purchase_orders.index, size=4, replace=False), "total_amount"] *= -1
    # 4) Duplicate po_id rows
    purchase_orders = pd.concat([purchase_orders, purchase_orders.sample(3, random_state=seed)], ignore_index=True)
    # 5) Supplier_id that does NOT exist (referential integrity break)
    purchase_orders.loc[np.random.choice(purchase_orders.index, size=3, replace=False), "supplier_id"] = 999999

    purchase_orders.to_csv(os.path.join(out_dir, "purchase_orders.csv"), index=False)

    # ---------- po_lines.csv ----------
    # Make lines by sampling po_id + part_id, then computing amounts.
    po_line_ids = np.arange(9000000, 9000000 + n_lines)

    po_lines = pd.DataFrame({
        "po_line_id": po_line_ids,
        "po_id": np.random.choice(po_ids, size=n_lines),
        "part_id": np.random.choice(part_ids, size=n_lines),
        "qty": np.random.randint(1, 50, size=n_lines).astype(float),
        "unit_price": np.round(np.random.lognormal(mean=3.0, sigma=0.6, size=n_lines), 2),
    })
    po_lines["line_amount"] = np.round(po_lines["qty"] * po_lines["unit_price"], 2)

    # Seed bad data
    # 1) Null qty and unit_price
    po_lines.loc[np.random.choice(po_lines.index, size=6, replace=False), "qty"] = np.nan
    po_lines.loc[np.random.choice(po_lines.index, size=6, replace=False), "unit_price"] = np.nan
    # 2) Negative qty
    po_lines.loc[np.random.choice(po_lines.index, size=6, replace=False), "qty"] = -np.random.randint(1, 20, size=6)
    # 3) Negative unit_price
    po_lines.loc[np.random.choice(po_lines.index, size=4, replace=False), "unit_price"] *= -1
    # 4) Incorrect line_amount (doesn't match qty * unit_price)
    idx_bad_amount = np.random.choice(po_lines.index, size=8, replace=False)
    po_lines.loc[idx_bad_amount, "line_amount"] = np.round(
        po_lines.loc[idx_bad_amount, "line_amount"] * np.random.uniform(0.3, 1.8, size=8), 2
    )
    # 5) Duplicate po_line_id
    po_lines = pd.concat([po_lines, po_lines.sample(5, random_state=seed)], ignore_index=True)
    # 6) po_id that does NOT exist
    po_lines.loc[np.random.choice(po_lines.index, size=4, replace=False), "po_id"] = 123456789
    # 7) part_id that does NOT exist
    po_lines.loc[np.random.choice(po_lines.index, size=4, replace=False), "part_id"] = 88888888

    po_lines.to_csv(os.path.join(out_dir, "po_lines.csv"), index=False)

    print(f"Done. Wrote CSVs to: {os.path.abspath(out_dir)}")
    print("Files:")
    for f in ["suppliers.csv", "parts.csv", "purchase_orders.csv", "po_lines.csv"]:
        path = os.path.join(out_dir, f)
        print(f" - {f}: {os.path.getsize(path):,} bytes")


if __name__ == "__main__":
    main()


Done. Wrote CSVs to: c:\Users\Bhaskar\Git\Automation\data\raw
Files:
 - suppliers.csv: 3,734 bytes
 - parts.csv: 11,635 bytes
 - purchase_orders.csv: 39,068 bytes
 - po_lines.csv: 126,310 bytes


  parts.loc[np.random.choice(parts.index, size=2, replace=False), "is_active"] = "yes"


In [21]:
from __future__ import annotations
import pandas as pd
from pathlib import Path

def load_csvs(data_dir: str | Path) -> dict[str, pd.DataFrame]:
    data_dir = Path(data_dir)
    dfs = {}
    for name in ["suppliers", "parts", "purchase_orders", "po_lines"]:
        path = data_dir / f"{name}.csv"
        if not path.exists():
            raise FileNotFoundError(f"Missing file: {path}")
        dfs[name] = pd.read_csv(path)
    return dfs


In [22]:
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Any
import pandas as pd

@dataclass
class CheckResult:
    check_name: str
    table: str
    severity: str  # "CRITICAL" | "WARN"
    passed: bool
    failed_count: int
    sample_failures: list[dict[str, Any]]

def results_to_json(results: list[CheckResult]) -> list[dict[str, Any]]:
    return [asdict(r) for r in results]

def results_to_dataframe(results: list[CheckResult]) -> pd.DataFrame:
    return pd.DataFrame([asdict(r) for r in results])


In [23]:
from __future__ import annotations
from typing import Any, Iterable
import pandas as pd
import numpy as np

def _sample_rows(df: pd.DataFrame, mask: pd.Series, cols: list[str], n: int = 5) -> list[dict[str, Any]]:
    if mask is None or mask.sum() == 0:
        return []
    sample = df.loc[mask, cols].head(n)
    return sample.to_dict(orient="records")

def check_required_columns(df: pd.DataFrame, table: str, required: Iterable[str], severity: str = "CRITICAL") -> CheckResult:
    required = list(required)
    missing = [c for c in required if c not in df.columns]
    passed = len(missing) == 0
    return CheckResult(
        check_name="required_columns",
        table=table,
        severity=severity,
        passed=passed,
        failed_count=len(missing),
        sample_failures=[{"missing_column": c} for c in missing][:5],
    )

def check_primary_key_unique(df: pd.DataFrame, table: str, pk: str, severity: str = "CRITICAL") -> CheckResult:
    if pk not in df.columns:
        return CheckResult("primary_key_unique", table, severity, False, 1, [{"error": f"pk column missing: {pk}"}])
    dup_mask = df[pk].duplicated(keep=False) & df[pk].notna()
    failed_count = int(dup_mask.sum())
    return CheckResult(
        check_name="primary_key_unique",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=_sample_rows(df, dup_mask, [pk]),
    )

def check_not_null(df: pd.DataFrame, table: str, col: str, severity: str = "CRITICAL") -> CheckResult:
    if col not in df.columns:
        return CheckResult("not_null", table, severity, False, 1, [{"error": f"column missing: {col}"}])
    null_mask = df[col].isna()
    failed_count = int(null_mask.sum())
    return CheckResult(
        check_name=f"not_null:{col}",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=_sample_rows(df, null_mask, [col]),
    )

def check_allowed_values(df: pd.DataFrame, table: str, col: str, allowed: set[Any], severity: str = "CRITICAL") -> CheckResult:
    if col not in df.columns:
        return CheckResult("allowed_values", table, severity, False, 1, [{"error": f"column missing: {col}"}])
    bad_mask = df[col].notna() & ~df[col].isin(list(allowed))
    failed_count = int(bad_mask.sum())
    return CheckResult(
        check_name=f"allowed_values:{col}",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=_sample_rows(df, bad_mask, [col]),
    )

def check_numeric_min(df: pd.DataFrame, table: str, col: str, min_value: float, severity: str = "CRITICAL") -> CheckResult:
    if col not in df.columns:
        return CheckResult("numeric_min", table, severity, False, 1, [{"error": f"column missing: {col}"}])
    series = pd.to_numeric(df[col], errors="coerce")
    bad_mask = series.notna() & (series < min_value)
    failed_count = int(bad_mask.sum())
    return CheckResult(
        check_name=f"numeric_min:{col}",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=_sample_rows(df.assign(**{col: series}), bad_mask, [col]),
    )

def check_fk_exists(child: pd.DataFrame, parent: pd.DataFrame, table: str, fk_col: str, parent_key: str, severity: str = "CRITICAL") -> CheckResult:
    if fk_col not in child.columns:
        return CheckResult("fk_exists", table, severity, False, 1, [{"error": f"fk column missing: {fk_col}"}])
    if parent_key not in parent.columns:
        return CheckResult("fk_exists", table, severity, False, 1, [{"error": f"parent key missing: {parent_key}"}])

    parent_keys = set(parent[parent_key].dropna().unique().tolist())
    bad_mask = child[fk_col].notna() & ~child[fk_col].isin(list(parent_keys))
    failed_count = int(bad_mask.sum())
    return CheckResult(
        check_name=f"fk_exists:{fk_col}->{parent_key}",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=_sample_rows(child, bad_mask, [fk_col]),
    )

def check_line_amount_math(po_lines: pd.DataFrame, severity: str = "CRITICAL", tolerance: float = 0.01) -> CheckResult:
    table = "po_lines"
    needed = ["qty", "unit_price", "line_amount"]
    for c in needed:
        if c not in po_lines.columns:
            return CheckResult("line_amount_math", table, severity, False, 1, [{"error": f"missing column: {c}"}])

    qty = pd.to_numeric(po_lines["qty"], errors="coerce")
    unit_price = pd.to_numeric(po_lines["unit_price"], errors="coerce")
    line_amount = pd.to_numeric(po_lines["line_amount"], errors="coerce")

    expected = qty * unit_price
    diff = (line_amount - expected).abs()

    bad_mask = diff.notna() & (diff > tolerance)
    failed_count = int(bad_mask.sum())

    tmp = po_lines.copy()
    tmp["_expected"] = expected
    tmp["_diff"] = diff

    return CheckResult(
        check_name="line_amount_math",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=_sample_rows(tmp, bad_mask, ["qty", "unit_price", "line_amount", "_expected", "_diff"]),
    )

def check_po_totals_reconcile(purchase_orders: pd.DataFrame, po_lines: pd.DataFrame, severity: str = "WARN", tolerance: float = 0.05) -> CheckResult:
    table = "purchase_orders"
    needed_po = ["po_id", "total_amount"]
    needed_lines = ["po_id", "line_amount"]
    for c in needed_po:
        if c not in purchase_orders.columns:
            return CheckResult("po_totals_reconcile", table, severity, False, 1, [{"error": f"missing purchase_orders.{c}"}])
    for c in needed_lines:
        if c not in po_lines.columns:
            return CheckResult("po_totals_reconcile", table, severity, False, 1, [{"error": f"missing po_lines.{c}"}])

    lines_amount = pd.to_numeric(po_lines["line_amount"], errors="coerce")
    po_lines2 = po_lines.copy()
    po_lines2["line_amount"] = lines_amount

    sums = po_lines2.groupby("po_id", dropna=False)["line_amount"].sum().reset_index()
    merged = purchase_orders[["po_id", "total_amount"]].merge(sums, on="po_id", how="left", suffixes=("_po", "_lines"))

    merged["total_amount"] = pd.to_numeric(merged["total_amount"], errors="coerce")
    merged["line_amount"] = merged["line_amount"].fillna(0)

    merged["_diff"] = (merged["total_amount"] - merged["line_amount"]).abs()
    bad_mask = merged["_diff"].notna() & (merged["_diff"] > tolerance)

    failed_count = int(bad_mask.sum())
    return CheckResult(
        check_name="po_totals_reconcile",
        table=table,
        severity=severity,
        passed=failed_count == 0,
        failed_count=failed_count,
        sample_failures=merged.loc[bad_mask, ["po_id", "total_amount", "line_amount", "_diff"]].head(5).to_dict(orient="records"),
    )


In [24]:
from __future__ import annotations
from pathlib import Path
import json
import logging

def run_all_checks(data_dir: str | Path) -> list:
    dfs = load_csvs(data_dir)
    suppliers = dfs["suppliers"]
    parts = dfs["parts"]
    purchase_orders = dfs["purchase_orders"]
    po_lines = dfs["po_lines"]

    results = []

    # Schema
    results.append(check_required_columns(suppliers, "suppliers", ["supplier_id", "supplier_name", "status", "country"]))
    results.append(check_required_columns(parts, "parts", ["part_id", "part_name", "part_type", "uom", "is_active"]))
    results.append(check_required_columns(purchase_orders, "purchase_orders", ["po_id", "supplier_id", "po_date", "currency", "total_amount", "status"]))
    results.append(check_required_columns(po_lines, "po_lines", ["po_line_id", "po_id", "part_id", "qty", "unit_price", "line_amount"]))

    # PK
    results.append(check_primary_key_unique(suppliers, "suppliers", "supplier_id"))
    results.append(check_primary_key_unique(parts, "parts", "part_id"))
    results.append(check_primary_key_unique(purchase_orders, "purchase_orders", "po_id"))
    results.append(check_primary_key_unique(po_lines, "po_lines", "po_line_id"))

    # Not null IDs
    results.append(check_not_null(suppliers, "suppliers", "supplier_id"))
    results.append(check_not_null(parts, "parts", "part_id"))
    results.append(check_not_null(purchase_orders, "purchase_orders", "po_id"))
    results.append(check_not_null(po_lines, "po_lines", "po_line_id"))
    results.append(check_not_null(purchase_orders, "purchase_orders", "supplier_id"))
    results.append(check_not_null(po_lines, "po_lines", "po_id"))
    results.append(check_not_null(po_lines, "po_lines", "part_id"))

    # Allowed values
    results.append(check_allowed_values(suppliers, "suppliers", "status", {"Active", "Inactive"}, severity="WARN"))
    results.append(check_allowed_values(parts, "parts", "is_active", {0, 1, "0", "1"}, severity="WARN"))
    results.append(check_allowed_values(purchase_orders, "purchase_orders", "status", {"Open", "Closed", "Cancelled"}, severity="WARN"))
    results.append(check_allowed_values(purchase_orders, "purchase_orders", "currency", {"USD", "EUR", "GBP", "JPY", "CAD", "AUD"}, severity="WARN"))

    # Numeric checks
    results.append(check_numeric_min(po_lines, "po_lines", "qty", 0.000001))
    results.append(check_numeric_min(po_lines, "po_lines", "unit_price", 0.0))
    results.append(check_numeric_min(purchase_orders, "purchase_orders", "total_amount", 0.0, severity="WARN"))

    # FK checks
    results.append(check_fk_exists(purchase_orders, suppliers, "purchase_orders", "supplier_id", "supplier_id"))
    results.append(check_fk_exists(po_lines, purchase_orders, "po_lines", "po_id", "po_id"))
    results.append(check_fk_exists(po_lines, parts, "po_lines", "part_id", "part_id"))

    # Math + reconcile
    results.append(check_line_amount_math(po_lines))
    results.append(check_po_totals_reconcile(purchase_orders, po_lines, severity="WARN"))

    return results

def write_reports(results: list, out_dir: str | Path) -> dict:
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    json_path = out_dir / "validation_report.json"
    csv_path = out_dir / "validation_report.csv"

    payload = results_to_json(results)
    json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")

    df = results_to_dataframe(results)
    df.to_csv(csv_path, index=False)

    summary = {
        "total_checks": len(results),
        "passed": sum(1 for r in results if r.passed),
        "failed": sum(1 for r in results if not r.passed),
        "critical_failed": sum(1 for r in results if (not r.passed and r.severity == "CRITICAL")),
        "json_report": str(json_path),
        "csv_report": str(csv_path),
    }
    return summary

def exit_code(results: list) -> int:
    critical_failed = any((not r.passed) and (r.severity == "CRITICAL") for r in results)
    return 2 if critical_failed else 0

def setup_logging(log_dir: str | Path) -> None:
    log_dir = Path(log_dir)
    log_dir.mkdir(parents=True, exist_ok=True)
    log_path = log_dir / "validation.log"

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
        handlers=[logging.FileHandler(log_path), logging.StreamHandler()],
    )


In [26]:
from __future__ import annotations
from pathlib import Path
import json

# For notebook execution, call functions directly with default parameters
# instead of using argparse (which conflicts with Jupyter kernel args)

data_dir = "data/raw"
out_dir = "reports"
log_dir = "logs"

setup_logging(log_dir)

results = run_all_checks(data_dir)
summary = write_reports(results, out_dir)

print(json.dumps(summary, indent=2))


{
  "total_checks": 27,
  "passed": 11,
  "failed": 16,
  "critical_failed": 10,
  "json_report": "reports\\validation_report.json",
  "csv_report": "reports\\validation_report.csv"
}


In [27]:
# Generate synthetic test data first
main(out_dir="data")

Done. Wrote CSVs to: c:\Users\Bhaskar\Git\Automation\data
Files:
 - suppliers.csv: 3,734 bytes
 - parts.csv: 11,635 bytes
 - purchase_orders.csv: 39,068 bytes
 - po_lines.csv: 126,310 bytes


  parts.loc[np.random.choice(parts.index, size=2, replace=False), "is_active"] = "yes"
