# Site Data Integrity Triage & Remediation (Company-Agnostic)

This notebook is a **data integrity workflow** designed to:
- Identify *missing* and *inconsistent* records in a site inventory dataset
- Reconcile those gaps by searching other available datasets (where applicable)
- Produce a **prioritized work queue** for field/operations teams to validate and update data that cannot be reliably inferred

> This is a refactored, sanitized version of an operational “cleanup notebook.”  
> It is intentionally structured to be easy to follow and reuse with different datasets.

## Core idea

Automation is used to:
1. **Detect** where the data is incomplete or inconsistent  
2. **Locate** candidate values from other sources when possible  
3. **Route** unresolved items to humans (e.g., field techs) with clear, targeted work instructions


In [None]:
# --- Configuration (edit these) ---
from pathlib import Path

DATA_DIR   = Path("./data")
OUTPUT_DIR = Path("./output")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Primary dataset (site inventory)
SITE_INVENTORY_FILE = DATA_DIR / "site_inventory.xlsx"   # or .csv

# Optional supporting datasets (examples)
SUPPORTING_FILES = {
    "access_hours": DATA_DIR / "access_hours.xlsx",
    "meter_numbers": DATA_DIR / "meter_numbers.xlsx",
    "small_cell_inventory": DATA_DIR / "small_cell_inventory.xlsx",
    "assignments": DATA_DIR / "assignments.xlsx",
}

# Column mapping (rename to match your data)
COLUMN_MAP = {
    "site_id": "SITE_ID",
    "market": "MARKET",
    "site_type": "SITE_TYPE",
    "address": "ADDRESS",
    "city": "CITY",
    "state": "STATE",
    "zip": "ZIP",
    # examples of integrity-critical fields:
    "access_hours": "ACCESS_HOURS",
    "meter_number": "METER_NUMBER",
    "power_system": "POWER_SYSTEM",
    "fop_assignment": "FOPS_ASSIGNMENT",
    "lat": "LAT",
    "lon": "LON",
}

# Integrity rules: required fields and rule-specific thresholds
REQUIRED_FIELDS = ["site_id", "state"]
GEO_REQUIRED_FIELDS = ["lat", "lon"]   # if your site inventory is expected to have GPS built in

# Work queue settings
MAX_WORK_ITEMS_PER_CATEGORY = 500


## Imports

In [None]:
import pandas as pd
import numpy as np

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 160)


## Helper functions

In [None]:
# --- IO helpers ---
def read_table(path: str | 'Path') -> pd.DataFrame:
    path = str(path)
    if path.lower().endswith(".csv"):
        return pd.read_csv(path)
    if path.lower().endswith(".parquet"):
        return pd.read_parquet(path)
    # default to excel
    return pd.read_excel(path)

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out.columns = [str(c).strip() for c in out.columns]
    return out

def get_col(name: str) -> str:
    """Resolve logical column name via COLUMN_MAP."""
    if name not in COLUMN_MAP:
        raise KeyError(f"COLUMN_MAP missing key: {name}")
    return COLUMN_MAP[name]


In [None]:
# --- Data integrity helpers ---
def missing_summary(df: pd.DataFrame, logical_cols: list[str]) -> pd.DataFrame:
    cols = [get_col(c) for c in logical_cols if get_col(c) in df.columns]
    out = pd.DataFrame({
        "column": cols,
        "missing_count": [df[c].isna().sum() for c in cols],
        "missing_pct": [df[c].isna().mean() for c in cols],
    }).sort_values("missing_count", ascending=False)
    return out

def find_missing_required(df: pd.DataFrame, required_logical_cols: list[str]) -> pd.DataFrame:
    cols = [get_col(c) for c in required_logical_cols]
    missing_mask = False
    for c in cols:
        if c in df.columns:
            missing_mask = missing_mask | df[c].isna() | (df[c].astype(str).str.strip() == "")
    return df.loc[missing_mask].copy()

def find_duplicates(df: pd.DataFrame, key_logical_col: str) -> pd.DataFrame:
    key = get_col(key_logical_col)
    if key not in df.columns:
        return pd.DataFrame()
    return df[df.duplicated(subset=[key], keep=False)].sort_values(key)

def enforce_types(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # Example: coerce lat/lon to numeric if present
    for k in ["lat", "lon"]:
        col = get_col(k)
        if col in out.columns:
            out[col] = pd.to_numeric(out[col], errors="coerce")
    return out


In [None]:
# --- Reconciliation utilities ---
def left_join_fill(primary: pd.DataFrame, secondary: pd.DataFrame, on: list[str], fill_map: dict[str, str]) -> pd.DataFrame:
    """Join secondary onto primary and fill missing primary values from secondary.

    fill_map maps: primary_col -> secondary_col
    """
    merged = primary.merge(secondary, on=on, how="left", suffixes=("", "_src"))
    out = merged.copy()
    for primary_col, secondary_col in fill_map.items():
        if primary_col in out.columns and secondary_col in out.columns:
            out[primary_col] = out[primary_col].where(out[primary_col].notna() & (out[primary_col].astype(str).str.strip() != ""), out[secondary_col])
    # drop helper cols from secondary (optional)
    drop_cols = [c for c in out.columns if c.endswith("_src")]
    if drop_cols:
        out = out.drop(columns=drop_cols, errors="ignore")
    return out

def build_work_queue(df: pd.DataFrame, category: str, reason: str, site_id_col: str, extra_cols: list[str] | None = None) -> pd.DataFrame:
    """Produce a minimal work queue for operations/field validation."""
    out = pd.DataFrame({
        "category": category,
        "site_id": df[site_id_col].astype(str),
        "reason": reason,
    })
    if extra_cols:
        for c in extra_cols:
            if c in df.columns:
                out[c] = df[c]
    return out.drop_duplicates().head(MAX_WORK_ITEMS_PER_CATEGORY)


## Load data

In [None]:
# --- Load primary dataset ---
site_df = standardize_columns(read_table(SITE_INVENTORY_FILE))
site_df = enforce_types(site_df)

site_id_col = get_col("site_id")
print("Rows:", len(site_df))
site_df.head()


## Audit integrity

In [None]:
# --- Integrity audit (quick) ---
print("Missing required fields:")
display(missing_summary(site_df, REQUIRED_FIELDS))

print("\nMissing GPS fields (if expected):")
display(missing_summary(site_df, GEO_REQUIRED_FIELDS))

print("\nDuplicate site IDs:")
dupes = find_duplicates(site_df, "site_id")
display(dupes.head(25))


## Identify issue categories

In [None]:
# --- Identify issue categories ---
issues = {}

missing_required = find_missing_required(site_df, REQUIRED_FIELDS)
issues["missing_required"] = missing_required

missing_geo = find_missing_required(site_df, GEO_REQUIRED_FIELDS)
issues["missing_geo"] = missing_geo

access_col = get_col("access_hours")
issues["missing_access_hours"] = (
    site_df[site_df[access_col].isna() | (site_df[access_col].astype(str).str.strip()=="")].copy()
    if access_col in site_df.columns else pd.DataFrame()
)

meter_col = get_col("meter_number")
issues["missing_meter_number"] = (
    site_df[site_df[meter_col].isna() | (site_df[meter_col].astype(str).str.strip()=="")].copy()
    if meter_col in site_df.columns else pd.DataFrame()
)

power_col = get_col("power_system")
issues["unknown_power_system"] = (
    site_df[site_df[power_col].isna() | (site_df[power_col].astype(str).str.strip()=="")].copy()
    if power_col in site_df.columns else pd.DataFrame()
)

pd.Series({k: len(v) for k, v in issues.items()}).sort_values(ascending=False)


## Reconcile from supporting sources

In [None]:
# --- OPTIONAL: Reconcile from supporting datasets (fill what can be safely filled) ---
site_reconciled = site_df.copy()

# Fill access hours from a supporting file (if present)
if SUPPORTING_FILES.get("access_hours") and SUPPORTING_FILES["access_hours"].exists():
    access_df = standardize_columns(read_table(SUPPORTING_FILES["access_hours"]))
    # Ensure site_id column name aligns
    if site_id_col not in access_df.columns:
        print(f"WARNING: access_hours file missing key column: {site_id_col}")
    else:
        site_reconciled = left_join_fill(
            site_reconciled,
            access_df,
            on=[site_id_col],
            fill_map={get_col("access_hours"): get_col("access_hours")}
        )

# Fill meter number from a supporting file (if present)
if SUPPORTING_FILES.get("meter_numbers") and SUPPORTING_FILES["meter_numbers"].exists():
    meter_df = standardize_columns(read_table(SUPPORTING_FILES["meter_numbers"]))
    if site_id_col not in meter_df.columns:
        print(f"WARNING: meter_numbers file missing key column: {site_id_col}")
    else:
        site_reconciled = left_join_fill(
            site_reconciled,
            meter_df,
            on=[site_id_col],
            fill_map={get_col("meter_number"): get_col("meter_number")}
        )

# Compare issue counts after reconciliation
def issue_counts(df: pd.DataFrame) -> pd.Series:
    out = {}
    if access_col in df.columns:
        out["missing_access_hours"] = int((df[access_col].isna() | (df[access_col].astype(str).str.strip()=="")).sum())
    if meter_col in df.columns:
        out["missing_meter_number"] = int((df[meter_col].isna() | (df[meter_col].astype(str).str.strip()=="")).sum())
    if power_col in df.columns:
        out["unknown_power_system"] = int((df[power_col].isna() | (df[power_col].astype(str).str.strip()=="")).sum())
    return pd.Series(out).sort_values(ascending=False)

print("Before reconciliation:")
display(issue_counts(site_df))

print("After reconciliation:")
display(issue_counts(site_reconciled))


## Create technician / operations work queue

In [None]:
# --- Generate work queues (for humans) ---
work_items = []

# Power system (example of a field that typically needs technician verification)
if power_col in site_reconciled.columns:
    unknown_power = site_reconciled[site_reconciled[power_col].isna() | (site_reconciled[power_col].astype(str).str.strip()=="")].copy()
    if len(unknown_power):
        work_items.append(
            build_work_queue(
                unknown_power,
                category="field_verification",
                reason="Power system must be verified by technician and entered in the system of record.",
                site_id_col=site_id_col,
                extra_cols=[get_col("market"), get_col("site_type"), get_col("address"), get_col("city"), get_col("state"), get_col("zip")]
            )
        )

# Missing GPS (if expected) should be fixed at the source
lat_col, lon_col = get_col("lat"), get_col("lon")
if lat_col in site_reconciled.columns and lon_col in site_reconciled.columns:
    missing_geo2 = site_reconciled[site_reconciled[lat_col].isna() | site_reconciled[lon_col].isna()].copy()
    if len(missing_geo2):
        work_items.append(
            build_work_queue(
                missing_geo2,
                category="data_fix",
                reason="Missing LAT/LON. Verify GPS and update the system of record.",
                site_id_col=site_id_col,
                extra_cols=[get_col("market"), get_col("site_type"), get_col("address"), get_col("city"), get_col("state"), get_col("zip")]
            )
        )

work_queue_df = pd.concat(work_items, ignore_index=True) if work_items else pd.DataFrame(columns=["category","site_id","reason"])
work_queue_df.head(50)


## Export

In [None]:
# --- Export outputs ---
reconciled_out = OUTPUT_DIR / "site_inventory_reconciled.xlsx"
workqueue_out  = OUTPUT_DIR / "work_queue.xlsx"
audit_out      = OUTPUT_DIR / "audit_summary.xlsx"

site_reconciled.to_excel(reconciled_out, index=False)
work_queue_df.to_excel(workqueue_out, index=False)

with pd.ExcelWriter(audit_out) as writer:
    missing_summary(site_df, REQUIRED_FIELDS + GEO_REQUIRED_FIELDS + ["access_hours","meter_number","power_system"]).to_excel(writer, sheet_name="missing_summary", index=False)
    dupes.to_excel(writer, sheet_name="duplicate_site_ids", index=False)

print("Wrote:", reconciled_out)
print("Wrote:", workqueue_out)
print("Wrote:", audit_out)
