In [23]:
import warnings
warnings.filterwarnings("ignore")

In [1]:
from dotenv import load_dotenv
load_dotenv();

In [2]:
import pandas as pd
from source_file_utils import get_source_file_names

In [19]:
from functools import wraps
import re
from typing import Any, Callable, Dict, List, Mapping, Optional

import numpy as np
import pandas as pd
from pydantic import BaseModel, Field, field_validator


# ---------------------------
# Exceptions
# ---------------------------

class AnalyzerError(RuntimeError):
    """Base class for analyzer errors."""


class ConfigurationError(AnalyzerError):
    """Raised for invalid analyzer configuration."""


# ---------------------------
# Configuration & Reports
# ---------------------------

class ForeignKeySpec(BaseModel):
    """Declare a foreign key relationship for referential integrity checking."""
    child_table: str = Field(..., description="Table with the foreign key.")
    child_column: str = Field(..., description="Column in child that references a parent key.")
    parent_table: str = Field(..., description="Referenced table.")
    parent_column: str = Field(..., description="Primary/unique key column in the parent table.")


class AnalyzerConfig(BaseModel):
    """Configuration for the DataQualityAnalyzer."""
    sample_rows: Optional[int] = Field(
        None,
        description="Row count to sample per table for expensive checks. None = full table."
    )
    duplicate_key_hints: List[str] = Field(
        default_factory=lambda: ["id", "code", "number", "email", "sku", "name"],
        description="Substrings used to discover likely identifier-like columns."
    )
    string_format_checks: List[str] = Field(
        default_factory=lambda: ["date", "int_like", "float_like", "email", "phone", "uuid", "iso_datetime", "zip"],
        description="String format classifiers to run."
    )
    outlier_method: str = Field(
        default="iqr",
        description="Numeric outlier method: 'iqr' or 'zscore'."
    )
    z_threshold: float = Field(default=3.5, ge=0)
    iqr_multiplier: float = Field(default=1.5, ge=0)
    max_top_values: int = Field(default=10, ge=1)
    fk_specs: List[ForeignKeySpec] = Field(default_factory=list)

    @field_validator("outlier_method")
    def _validate_outlier_method(cls, v: str) -> str:
        v = v.lower()
        if v not in {"iqr", "zscore"}:
            raise ConfigurationError("outlier_method must be 'iqr' or 'zscore'")
        return v


class ColumnProfile(BaseModel):
    column: str
    dtype: str
    non_null: int
    null_pct: float
    unique: int
    constant: bool
    avg_len: Optional[float] = None
    pct_empty_str: Optional[float] = None
    pct_whitespace_str: Optional[float] = None
    min_val: Optional[float] = None
    max_val: Optional[float] = None
    example_values: List[str] = Field(default_factory=list)


class ColumnIssues(BaseModel):
    column: str
    has_mixed_types: bool = False
    detected_formats: Dict[str, float] = Field(default_factory=dict)  # format -> coverage %
    case_variants: Optional[List[str]] = None  # for likely categories


class DuplicateReport(BaseModel):
    exact_row_duplicates: int
    near_text_duplicates: Dict[str, int]  # column -> count (normalized by stripping/punct/case)
    candidate_key_dupes: Dict[str, int]   # column -> count of duplicate keys


class OutlierReport(BaseModel):
    method: str
    counts_by_column: Dict[str, int]  # numeric column -> outlier count


class FKIssue(BaseModel):
    spec: ForeignKeySpec
    missing_fk_count: int


class TableReport(BaseModel):
    table: str
    rows: int
    cols: int
    column_profiles: List[ColumnProfile]
    column_issues: List[ColumnIssues]
    duplicates: DuplicateReport
    outliers: OutlierReport
    notes: List[str] = Field(default_factory=list)


class AnalysisResult(BaseModel):
    summary: pd.DataFrame  # table-level summary
    tables: Dict[str, TableReport]
    fk_issues: List[FKIssue] = Field(default_factory=list)
    model_config = {'arbitrary_types_allowed': True}

    def to_excel(self, path: str) -> None:
        """Write the full analysis as a multi-sheet Excel workbook."""
        with pd.ExcelWriter(path, engine="xlsxwriter") as xw:
            self.summary.to_excel(xw, sheet_name="summary", index=False)
            for tname, trep in self.tables.items():
                # profiles
                pd.DataFrame([p.model_dump() for p in trep.column_profiles]).to_excel(
                    xw, sheet_name=f"{tname[:25]}_profiles", index=False
                )
                # issues
                pd.DataFrame([i.model_dump() for i in trep.column_issues]).to_excel(
                    xw, sheet_name=f"{tname[:25]}_issues", index=False
                )
                # duplicates
                dup_df = pd.DataFrame(
                    {
                        "metric": ["exact_row_duplicates"],
                        "value": [trep.duplicates.exact_row_duplicates],
                    }
                )
                if trep.duplicates.near_text_duplicates:
                    dup_df = pd.concat(
                        [
                            dup_df,
                            pd.DataFrame(
                                [
                                    {"metric": f"near_text_dupes:{c}", "value": v}
                                    for c, v in trep.duplicates.near_text_duplicates.items()
                                ]
                            ),
                        ],
                        ignore_index=True,
                    )
                if trep.duplicates.candidate_key_dupes:
                    dup_df = pd.concat(
                        [
                            dup_df,
                            pd.DataFrame(
                                [
                                    {"metric": f"candidate_key_dupes:{c}", "value": v}
                                    for c, v in trep.duplicates.candidate_key_dupes.items()
                                ]
                            ),
                        ],
                        ignore_index=True,
                    )
                dup_df.to_excel(xw, sheet_name=f"{tname[:25]}_dupes", index=False)

                # outliers
                out_df = pd.DataFrame(
                    [{"column": c, "outlier_count": n, "method": trep.outliers.method}
                     for c, n in trep.outliers.counts_by_column.items()]
                )
                out_df.to_excel(xw, sheet_name=f"{tname[:25]}_outliers", index=False)

            if self.fk_issues:
                pd.DataFrame(
                    [
                        {
                            "child_table": i.spec.child_table,
                            "child_column": i.spec.child_column,
                            "parent_table": i.spec.parent_table,
                            "parent_column": i.spec.parent_column,
                            "missing_fk_count": i.missing_fk_count,
                        }
                        for i in self.fk_issues
                    ]
                ).to_excel(xw, sheet_name="fk_issues", index=False)


# ---------------------------
# Check registry (pluggable)
# ---------------------------

CheckFn = Callable[[str, pd.DataFrame, AnalyzerConfig], Dict[str, Any]]
_CHECKS: List[CheckFn] = []


def register_check(fn: CheckFn) -> CheckFn:
    """Decorator to register a table-level check producing a dict payload."""
    @wraps(fn)
    def _wrapped(table: str, df: pd.DataFrame, cfg: AnalyzerConfig) -> Dict[str, Any]:
        return fn(table, df, cfg)
    _CHECKS.append(_wrapped)
    return _wrapped


# ---------------------------
# Helpers
# ---------------------------

_STRIP_PUNCT_RE = re.compile(r"[^\w\s]", flags=re.UNICODE)


def _maybe_sample(df: pd.DataFrame, n: Optional[int]) -> pd.DataFrame:
    if n is None or len(df) <= n:
        return df
    return df.sample(n=n, random_state=42)


def _try_parse_datetime(s: pd.Series) -> pd.Series:
    return pd.to_datetime(s, errors="coerce")


def _is_int_like(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"[+-]?\d+", na=False)


def _is_float_like(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"[+-]?(\d+(\.\d+)?|\.\d+)", na=False)


def _is_email(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"[^@\s]+@[^@\s]+\.[^@\s]+", na=False)


def _is_phone(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"(\+?\d{1,3}[\s.-]?)?\(?\d{2,4}\)?[\s.-]?\d{3,4}[\s.-]?\d{3,4}", na=False)


def _is_uuid(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}", na=False)


def _is_iso_datetime(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"\d{4}-\d{2}-\d{2}(T|\s)\d{2}:\d{2}(:\d{2})?(\.\d+)?(Z|[+-]\d{2}:\d{2})?", na=False)


def _is_zip(s: pd.Series) -> pd.Series:
    return s.str.fullmatch(r"\d{5}(-\d{4})?", na=False)


_FORMATTERS: Dict[str, Callable[[pd.Series], pd.Series]] = {
    "date": lambda s: _try_parse_datetime(s).notna(),
    "int_like": _is_int_like,
    "float_like": _is_float_like,
    "email": _is_email,
    "phone": _is_phone,
    "uuid": _is_uuid,
    "iso_datetime": _is_iso_datetime,
    "zip": _is_zip,
}


def _normalize_text_for_dupe(s: pd.Series) -> pd.Series:
    s2 = s.astype(str).str.lower().str.strip()
    s2 = s2.str.replace(_STRIP_PUNCT_RE, "", regex=True).str.replace(r"\s+", " ", regex=True)
    return s2


def _iqr_outliers(x: pd.Series, k: float = 1.5) -> pd.Series:
    q1 = x.quantile(0.25)
    q3 = x.quantile(0.75)
    iqr = q3 - q1
    low, high = q1 - k * iqr, q3 + k * iqr
    return (x < low) | (x > high)


def _zscore_outliers(x: pd.Series, z: float = 3.5) -> pd.Series:
    mu = x.mean()
    sigma = x.std(ddof=0)
    if sigma == 0 or np.isnan(sigma):
        return pd.Series(False, index=x.index)
    return (np.abs((x - mu) / sigma) > z)


def _top_examples(s: pd.Series, k: int) -> List[str]:
    vc = s.dropna().astype(str).head(10000).value_counts(dropna=False)
    return [str(v) for v in vc.head(k).index.tolist()]


def _likely_identifier_columns(df: pd.DataFrame, hints: List[str]) -> List[str]:
    cols = []
    for c in df.columns:
        lc = c.lower()
        if any(h in lc for h in hints):
            cols.append(c)
    return cols


# ---------------------------
# Checks
# ---------------------------

@register_check
def profile_columns(table: str, df: pd.DataFrame, cfg: AnalyzerConfig) -> Dict[str, Any]:
    """
    Column-level profiling: dtype, missingness, unique counts, constant columns,
    numeric min/max, string length stats, empty/whitespace ratio, examples.
    """
    profiles: List[ColumnProfile] = []
    sample = _maybe_sample(df, cfg.sample_rows)

    for c in df.columns:
        s = sample[c]
        dtype = str(df[c].dtype)
        non_null = int(s.notna().sum())
        null_pct = float((s.isna().mean() * 100.0))
        unique = int(s.nunique(dropna=True))
        constant = unique <= 1

        avg_len = pct_empty = pct_ws = None
        min_val = max_val = None

        if pd.api.types.is_numeric_dtype(df[c]):
            min_val = float(pd.to_numeric(s, errors="coerce").min(skipna=True)) if non_null else None
            max_val = float(pd.to_numeric(s, errors="coerce").max(skipna=True)) if non_null else None
        elif pd.api.types.is_string_dtype(df[c]) or df[c].dtype == "object":
            s_str = s.dropna().astype(str)
            if not s_str.empty:
                lengths = s_str.str.len()
                avg_len = float(lengths.mean())
                pct_empty = float((s_str == "").mean() * 100.0)
                pct_ws = float((s_str.str.fullmatch(r"\s*").mean()) * 100.0)

        profiles.append(
            ColumnProfile(
                column=c,
                dtype=dtype,
                non_null=non_null,
                null_pct=round(null_pct, 3),
                unique=unique,
                constant=constant,
                avg_len=avg_len,
                pct_empty_str=pct_empty,
                pct_whitespace_str=pct_ws,
                min_val=min_val,
                max_val=max_val,
                example_values=_top_examples(s, cfg.max_top_values),
            )
        )
    return {"column_profiles": profiles}


@register_check
def detect_type_and_format_issues(table: str, df: pd.DataFrame, cfg: AnalyzerConfig) -> Dict[str, Any]:
    """
    Detect mixed types in object-like columns and estimate format coverage (emails, phones, dates, etc.).
    Also report case variants for likely categorical fields.
    """
    issues: List[ColumnIssues] = []
    sample = _maybe_sample(df, cfg.sample_rows)

    for c in df.columns:
        s = sample[c]
        col_issue = ColumnIssues(column=c)

        # Mixed types detection
        if df[c].dtype == "object":
            pytypes = s.dropna().map(type)
            if not pytypes.empty:
                counts = pytypes.value_counts()
                col_issue.has_mixed_types = counts.shape[0] > 1

        # String format checks
        if pd.api.types.is_string_dtype(df[c]) or df[c].dtype == "object":
            s_str = s.dropna().astype(str)
            detected: Dict[str, float] = {}
            for name in cfg.string_format_checks:
                fn = _FORMATTERS.get(name)
                if fn and not s_str.empty:
                    m = fn(s_str)
                    detected[name] = round(float(m.mean() * 100.0), 2)
            col_issue.detected_formats = detected

            # Case-variant categories (heuristic): low-cardinality textual columns
            nunique = s_str.nunique(dropna=True)
            if 1 < nunique <= 50:
                lower_map = s_str.str.strip().str.lower().value_counts()
                if lower_map.shape[0] < nunique:
                    # same tokens differing by case/space exist
                    col_issue.case_variants = _top_examples(s_str, cfg.max_top_values)

        issues.append(col_issue)

    return {"column_issues": issues}


@register_check
def duplicate_scans(table: str, df: pd.DataFrame, cfg: AnalyzerConfig) -> Dict[str, Any]:
    """
    Exact row duplicates, normalized near-duplicates for text columns,
    and duplicate counts for likely identifier-like columns.
    """
    sample = _maybe_sample(df, cfg.sample_rows)
    # Exact row duplicates
    exact_dupes = int(sample.duplicated().sum())

    # Near text duplicates per column (case/space/punct normalized)
    near_text_dupes: Dict[str, int] = {}
    for c in sample.columns:
        if pd.api.types.is_string_dtype(df[c]) or df[c].dtype == "object":
            s_norm = _normalize_text_for_dupe(sample[c].fillna(""))
            dups = int(s_norm.duplicated().sum())
            if dups > 0:
                near_text_dupes[c] = dups

    # Candidate key dupes
    candidate_key_dupes: Dict[str, int] = {}
    for c in _likely_identifier_columns(df, cfg.duplicate_key_hints):
        dups = int(sample[c].duplicated(keep=False).sum()) if c in sample.columns else 0
        if dups > 0:
            candidate_key_dupes[c] = dups

    return {
        "duplicates": DuplicateReport(
            exact_row_duplicates=exact_dupes,
            near_text_duplicates=near_text_dupes,
            candidate_key_dupes=candidate_key_dupes,
        )
    }


@register_check
def numeric_outliers(table: str, df: pd.DataFrame, cfg: AnalyzerConfig) -> Dict[str, Any]:
    """
    Count numeric outliers per column using configured method.
    """
    sample = _maybe_sample(df, cfg.sample_rows)
    numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    counts: Dict[str, int] = {}
    for c in numeric_cols:
        x = pd.to_numeric(sample[c], errors="coerce").dropna()
        if x.empty:
            counts[c] = 0
            continue
        if cfg.outlier_method == "iqr":
            m = _iqr_outliers(x, cfg.iqr_multiplier)
        else:
            m = _zscore_outliers(x, cfg.z_threshold)
        counts[c] = int(m.sum())

    return {"outliers": OutlierReport(method=cfg.outlier_method, counts_by_column=counts)}


# ---------------------------
# Analyzer
# ---------------------------

class DataQualityAnalyzer:
    """Execute a suite of data-quality checks against pandas DataFrames."""

    def __init__(self, config: Optional[AnalyzerConfig] = None) -> None:
        self.config = config or AnalyzerConfig()

    def analyze(
        self,
        tables: Mapping[str, pd.DataFrame],
    ) -> AnalysisResult:
        """Run all registered checks on each table and build an aggregate report.

        Args:
            tables: Mapping of table name -> DataFrame.

        Returns:
            AnalysisResult containing table-level reports, summary, and FK issues (if any).
        """
        table_reports: Dict[str, TableReport] = {}
        summary_rows: List[Dict[str, Any]] = []

        # Per-table checks
        for tname, df in tables.items():
            payload: Dict[str, Any] = {}
            for check in _CHECKS:
                payload.update(check(tname, df, self.config))

            profiles: List[ColumnProfile] = payload["column_profiles"]
            issues: List[ColumnIssues] = payload["column_issues"]
            dupes: DuplicateReport = payload["duplicates"]
            outliers: OutlierReport = payload["outliers"]

            report = TableReport(
                table=tname,
                rows=int(df.shape[0]),
                cols=int(df.shape[1]),
                column_profiles=profiles,
                column_issues=issues,
                duplicates=dupes,
                outliers=outliers,
            )
            table_reports[tname] = report

            summary_rows.append(
                {
                    "table": tname,
                    "rows": report.rows,
                    "cols": report.cols,
                    "exact_row_dupes": dupes.exact_row_duplicates,
                    "columns_with_near_text_dupes": len(dupes.near_text_duplicates),
                    "columns_with_candidate_key_dupes": len(dupes.candidate_key_dupes),
                    "numeric_columns_with_outliers": sum(1 for v in outliers.counts_by_column.values() if v > 0),
                    "avg_null_pct": np.mean([p.null_pct for p in profiles]) if profiles else np.nan,
                    "constant_columns": sum(1 for p in profiles if p.constant),
                }
            )

        # Optional FK checks across tables
        fk_issues: List[FKIssue] = []
        if self.config.fk_specs:
            for spec in self.config.fk_specs:
                if spec.child_table not in tables or spec.parent_table not in tables:
                    continue
                child = tables[spec.child_table]
                parent = tables[spec.parent_table]
                if spec.child_column not in child.columns or spec.parent_column not in parent.columns:
                    continue
                child_keys = pd.Series(child[spec.child_column]).dropna().unique()
                parent_keys = pd.Series(parent[spec.parent_column]).dropna().unique()
                missing = ~pd.Series(child[spec.child_column]).isin(parent_keys)
                missing_count = int(missing.sum())
                if missing_count > 0:
                    fk_issues.append(FKIssue(spec=spec, missing_fk_count=missing_count))

        summary_df = pd.DataFrame(summary_rows).sort_values(["rows", "table"], ascending=[False, True], ignore_index=True)
        return AnalysisResult(summary=summary_df, tables=table_reports, fk_issues=fk_issues)

In [3]:
source_file_names = get_source_file_names()
print(f"{len(source_file_names)} source files retrieved.")

4 source files retrieved.


In [29]:
i = 3
df = pd.read_excel(source_file_names[i])

In [30]:
# Create config with defaults — no foreign keys
cfg = AnalyzerConfig(
    sample_rows=None,        # None = run checks on all rows
    outlier_method="iqr",    # can be "iqr" or "zscore"
)

# Initialize analyzer
analyzer = DataQualityAnalyzer(cfg)

# Analyze the single table (name it anything you like)
result = analyzer.analyze({"source_data": df})

# Quick summary in console
print(result.summary)

# Optional: Save full report to Excel with multiple sheets
result.to_excel(f"dq_report{i+1}.xlsx")

         table  rows  cols  exact_row_dupes  columns_with_near_text_dupes  \
0  source_data  5347    60               10                            41   

   columns_with_candidate_key_dupes  numeric_columns_with_outliers  \
0                                13                              1   

   avg_null_pct  constant_columns  
0     67.854917                21  
