# 📘 verificacao_qualidade
### **Objetivo:**
- Avaliar integridade, completude e consistência dos dados (PK/FK, nulos, duplicatas, domínios, coerência temporal, outliers) e consolidar um relatório final de qualidade.

## Setup

In [1]:
from pathlib import Path
import json, re, math
import numpy as np
import pandas as pd

pd.set_option("display.max_columns", 150)

# Paths (ajuste se necessário)
ROOT = Path("..")  # este notebook está em lab/
DATA = ROOT / "data" / "processed"
LAB_DATA = ROOT / "lab" / "data" / "processed"

FILES = {
    "deputados": DATA / "deputados.json",
    "freq_eventos": DATA / "freq_eventos.csv",
    "eventos_sessoes": DATA / "eventos_sessoes.csv",
    "votos_deputados": DATA / "votos_deputados.csv",
    "presencas": LAB_DATA / "presencas.csv",  # pode estar ausente
    "cod_situacao": DATA / "cod_situacao_deputados.json",  # catálogo de situações
    "remuneracoes": DATA / "remuneracoes.csv",  # tem coluna 'situacao' no seu 01
}


## Utilitários

In [2]:
def exists(p: Path) -> bool:
    try:
        return p.exists() and p.is_file()
    except Exception:
        return False

def read_json_records(path: Path) -> pd.DataFrame | None:
    if not exists(path): 
        return None
    try:
        return pd.read_json(path, orient="records")
    except ValueError:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        return pd.json_normalize(data)

def read_csv(path: Path, **kwargs) -> pd.DataFrame | None:
    if not exists(path): 
        return None
    try:
        return pd.read_csv(path, **kwargs)
    except Exception as e:
        print(f"[WARN] Falha ao ler CSV {path}: {e}")
        return None

def coerce_int(series, allow_na=True):
    s = pd.to_numeric(series, errors="coerce")
    return s.astype("Int64") if allow_na else s.astype("int64", copy=False)

def percent(x, tot):
    return round((x / tot * 100), 2) if tot else np.nan


## Carregar dados + tipagem mínima

In [3]:
df_deps   = read_json_records(FILES["deputados"])
df_freq   = read_csv(FILES["freq_eventos"])
df_evt    = read_csv(FILES["eventos_sessoes"])
df_votos  = read_csv(FILES["votos_deputados"])
df_pres   = read_csv(FILES["presencas"])
df_sit    = read_json_records(FILES["cod_situacao"])
df_rem    = read_csv(FILES["remuneracoes"])

# Tipagem chaves
if df_deps is not None and "id" in df_deps.columns:
    df_deps["id"] = coerce_int(df_deps["id"])

if df_freq is not None:
    if "id_deputado" in df_freq.columns:
        df_freq["id_deputado"] = coerce_int(df_freq["id_deputado"])
    if "num_eventos" in df_freq.columns:
        df_freq["num_eventos"] = pd.to_numeric(df_freq["num_eventos"], errors="coerce")

if df_evt is not None:
    # normalizar possíveis variações do id do evento
    cand_evt = [c for c in df_evt.columns if c.lower() in {"id_evento","idevento","evento_id","id"}]
    if cand_evt:
        df_evt.rename(columns={cand_evt[0]:"id_evento"}, inplace=True)
        df_evt["id_evento"] = coerce_int(df_evt["id_evento"])
    # datas
    for c in ("dataHoraInicio","dataHoraFim"):
        if c in df_evt.columns:
            df_evt[c] = pd.to_datetime(df_evt[c], errors="coerce")

if df_pres is not None:
    for cc in ("id_evento","id_deputado"):
        if cc in df_pres.columns:
            df_pres[cc] = coerce_int(df_pres[cc])
    if "ano_origem" in df_pres.columns:
        df_pres["ano_origem"] = coerce_int(df_pres["ano_origem"])

# votos: detectar a coluna de id_deputado
v_id_dep_col = None
if df_votos is not None:
    cand = [c for c in df_votos.columns 
            if ("deput" in c.lower() and "id" in c.lower()) 
            or c.lower() in {"iddeputado","id_deputado","idparlamentar","idecadastro"}]
    if cand:
        v_id_dep_col = cand[0]
        df_votos[v_id_dep_col] = coerce_int(df_votos[v_id_dep_col])


[WARN] Falha ao ler CSV ../data/processed/eventos_sessoes.csv: No columns to parse from file


# Checks

## Existência de datasets

In [4]:
exist_checks = pd.DataFrame([
    {"dataset": k, "path": str(v), "existe": exists(v)}
    for k, v in FILES.items()
])
exist_checks


Unnamed: 0,dataset,path,existe
0,deputados,../data/processed/deputados.json,True
1,freq_eventos,../data/processed/freq_eventos.csv,True
2,eventos_sessoes,../data/processed/eventos_sessoes.csv,True
3,votos_deputados,../data/processed/votos_deputados.csv,True
4,presencas,../lab/data/processed/presencas.csv,True
5,cod_situacao,../data/processed/cod_situacao_deputados.json,True
6,remuneracoes,../data/processed/remuneracoes.csv,True


## Esquema mínimo esperado (PK, colunas-chave)

In [5]:
# Esquema mínimo (flexível) — marca presença/ausência
expected = {
    "deputados": {"id","nome","siglaPartido","siglaUf"},
    "freq_eventos": {"id_deputado","num_eventos"},
    "eventos_sessoes": {"id_evento","dataHoraInicio","dataHoraFim"},
    "votos_deputados": set(),  # variável; tentamos detectar id_deputado
    "presencas": {"id_evento","id_deputado","tipo_presenca"},
    "remuneracoes": {"id_deputado"},  # situacao é útil, mas não obrigatório
    "cod_situacao": {"sigla","nome"},  # catálogo
}

dfs = {
    "deputados": df_deps, "freq_eventos": df_freq, "eventos_sessoes": df_evt,
    "votos_deputados": df_votos, "presencas": df_pres, "remuneracoes": df_rem,
    "cod_situacao": df_sit
}

rows = []
for name, df in dfs.items():
    if df is None:
        rows.append({"dataset": name, "ok": False, "faltando": list(expected[name]) if name in expected else []})
        continue
    cols = set(df.columns)
    exp = expected.get(name, set())
    faltando = sorted(list(exp - cols))
    rows.append({"dataset": name, "ok": len(faltando)==0, "faltando": faltando})

schema_check = pd.DataFrame(rows)
schema_check


Unnamed: 0,dataset,ok,faltando
0,deputados,True,[]
1,freq_eventos,True,[]
2,eventos_sessoes,False,"[dataHoraInicio, id_evento, dataHoraFim]"
3,votos_deputados,True,[]
4,presencas,True,[]
5,remuneracoes,True,[]
6,cod_situacao,True,[]


## Duplicatas em chaves primárias

In [6]:
rows = []

# deputados.id
if df_deps is not None and "id" in df_deps.columns:
    dup = int(df_deps.duplicated(subset=["id"]).sum())
    rows.append({"dataset":"deputados", "coluna":"id", "duplicatas": dup})

# eventos.id_evento
if df_evt is not None and "id_evento" in df_evt.columns:
    dup = int(df_evt.duplicated(subset=["id_evento"]).sum())
    rows.append({"dataset":"eventos_sessoes", "coluna":"id_evento", "duplicatas": dup})

# presenças (chave composta razoável)
if df_pres is not None and all(c in df_pres.columns for c in ["id_evento","id_deputado","tipo_presenca"]):
    dup = int(df_pres.duplicated(subset=["id_evento","id_deputado","tipo_presenca"]).sum())
    rows.append({"dataset":"presencas", "coluna":"(id_evento,id_deputado,tipo_presenca)", "duplicatas": dup})

dup_check = pd.DataFrame(rows)
dup_check


Unnamed: 0,dataset,coluna,duplicatas
0,deputados,id,0
1,presencas,"(id_evento,id_deputado,tipo_presenca)",0


## Completude (nulos) nas colunas principais

In [7]:
def null_report(df: pd.DataFrame, cols: list[str], dataset: str) -> pd.DataFrame:
    out = []
    for c in cols:
        if c not in df.columns: 
            out.append({"dataset": dataset, "coluna": c, "nulos": np.nan, "pct_nulos": np.nan})
            continue
        n = int(df[c].isna().sum())
        out.append({"dataset": dataset, "coluna": c, "nulos": n, "pct_nulos": percent(n, len(df))})
    return pd.DataFrame(out)

null_checks = []
if df_deps is not None:
    null_checks.append(null_report(df_deps, ["id","nome","siglaPartido","siglaUf","email"], "deputados"))
if df_freq is not None:
    null_checks.append(null_report(df_freq, ["id_deputado","num_eventos"], "freq_eventos"))
if df_evt is not None:
    null_checks.append(null_report(df_evt, ["id_evento","dataHoraInicio","dataHoraFim"], "eventos_sessoes"))
if df_pres is not None:
    null_checks.append(null_report(df_pres, ["id_evento","id_deputado","tipo_presenca"], "presencas"))
if df_votos is not None and v_id_dep_col is not None:
    null_checks.append(null_report(df_votos, [v_id_dep_col], "votos_deputados"))
if df_rem is not None:
    null_checks.append(null_report(df_rem, ["id_deputado","situacao"], "remuneracoes"))

null_checks_df = pd.concat(null_checks, ignore_index=True) if null_checks else pd.DataFrame()
null_checks_df


Unnamed: 0,dataset,coluna,nulos,pct_nulos
0,deputados,id,0,0.0
1,deputados,nome,0,0.0
2,deputados,siglaPartido,0,0.0
3,deputados,siglaUf,0,0.0
4,deputados,email,0,0.0
5,freq_eventos,id_deputado,0,0.0
6,freq_eventos,num_eventos,0,0.0
7,presencas,id_evento,0,0.0
8,presencas,id_deputado,0,0.0
9,presencas,tipo_presenca,564081,100.0


## Integridade referencial (FK ⊆ PK)

In [8]:
ref_rows = []

# freq_eventos.id_deputado ⊆ deputados.id
if (df_freq is not None) and (df_deps is not None) and "id_deputado" in df_freq.columns and "id" in df_deps.columns:
    left = df_freq["id_deputado"].dropna().unique()
    right = set(df_deps["id"].dropna().unique())
    out = [x for x in left if x not in right]
    ref_rows.append({"fk":"freq_eventos.id_deputado", "pk":"deputados.id", "qtde_fk_nao_encontrada": len(out)})

# presencas.id_deputado ⊆ deputados.id
if (df_pres is not None) and (df_deps is not None) and "id_deputado" in df_pres.columns and "id" in df_deps.columns:
    left = df_pres["id_deputado"].dropna().unique()
    right = set(df_deps["id"].dropna().unique())
    out = [x for x in left if x not in right]
    ref_rows.append({"fk":"presencas.id_deputado", "pk":"deputados.id", "qtde_fk_nao_encontrada": len(out)})

# presencas.id_evento ⊆ eventos.id_evento
if (df_pres is not None) and (df_evt is not None) and "id_evento" in df_pres.columns and "id_evento" in df_evt.columns:
    left = df_pres["id_evento"].dropna().unique()
    right = set(df_evt["id_evento"].dropna().unique())
    out = [x for x in left if x not in right]
    ref_rows.append({"fk":"presencas.id_evento", "pk":"eventos_sessoes.id_evento", "qtde_fk_nao_encontrada": len(out)})

# votos.id_deputado ⊆ deputados.id
if (df_votos is not None) and (df_deps is not None) and (v_id_dep_col is not None) and "id" in df_deps.columns:
    left = df_votos[v_id_dep_col].dropna().unique()
    right = set(df_deps["id"].dropna().unique())
    out = [x for x in left if x not in right]
    ref_rows.append({"fk":f"votos_deputados.{v_id_dep_col}", "pk":"deputados.id", "qtde_fk_nao_encontrada": len(out)})

ref_check = pd.DataFrame(ref_rows)
ref_check


Unnamed: 0,fk,pk,qtde_fk_nao_encontrada
0,freq_eventos.id_deputado,deputados.id,0
1,presencas.id_deputado,deputados.id,411
2,votos_deputados.id_deputado,deputados.id,289


## Coerência temporal dos eventos

In [9]:
temporal_rows = []
if df_evt is not None and all(c in df_evt.columns for c in ["dataHoraInicio","dataHoraFim"]):
    n_total = len(df_evt)
    # fim >= inicio
    incoerentes = int((df_evt["dataHoraFim"] < df_evt["dataHoraInicio"]).sum(skipna=True))
    temporal_rows.append({"checagem":"dataFim >= dataInicio (eventos)", 
                          "inconsistencias": incoerentes, 
                          "pct": percent(incoerentes, n_total)})

    # janela 2020-01-01 -> hoje (tolerante com NaT)
    jan_2020 = pd.Timestamp("2020-01-01")
    hoje = pd.Timestamp.today().normalize()
    fora_janela = int(((df_evt["dataHoraInicio"] < jan_2020) | (df_evt["dataHoraInicio"] > hoje)).sum(skipna=True))
    temporal_rows.append({"checagem":"eventos fora da janela [2020..hoje] (dataInicio)", 
                          "inconsistencias": fora_janela, 
                          "pct": percent(fora_janela, n_total)})

temporal_check = pd.DataFrame(temporal_rows) if temporal_rows else pd.DataFrame()
temporal_check


## Domínios / Catálogos (situação x remunerações; tipo_presenca)

In [10]:
domain_rows = []

# Situações: catálogo vs. remuneracoes.situacao (se existir)
sit_validas = set()
if df_sit is not None and "sigla" in df_sit.columns:
    sit_validas = set(df_sit["sigla"].dropna().astype(str).str.strip().unique())

if df_rem is not None and "situacao" in df_rem.columns and sit_validas:
    # normaliza alfabético (pode já estar por extenso em df_rem; então aqui só reportamos cobertura da interseção)
    rem_vals = set(df_rem["situacao"].dropna().astype(str).str.strip().unique())
    inter = rem_vals.intersection(sit_validas)
    domain_rows.append({
        "checagem":"remuneracoes.situacao em catálogo cod_situacao.sigla",
        "total_distintos_remuneracoes": len(rem_vals),
        "no_catalogo": len(inter),
        "pct_no_catalogo": percent(len(inter), len(rem_vals))
    })

# tipo_presenca — apenas panorama (catálogo livre, mas reporta n_mais_comuns)
if df_pres is not None and "tipo_presenca" in df_pres.columns:
    top = (df_pres["tipo_presenca"].fillna("<NA>").value_counts().head(10).to_dict())
    domain_rows.append({"checagem":"top10 tipo_presenca (amostra)", **top})

domain_check = pd.DataFrame(domain_rows)
domain_check


Unnamed: 0,checagem,total_distintos_remuneracoes,no_catalogo,pct_no_catalogo,<NA>
0,remuneracoes.situacao em catálogo cod_situacao...,1.0,0.0,0.0,
1,top10 tipo_presenca (amostra),,,,564081.0


## Outliers em num_eventos (IQR)

In [11]:
outlier_rows = []
if df_freq is not None and "num_eventos" in df_freq.columns:
    s = df_freq["num_eventos"].dropna().astype(float)
    if len(s):
        q1, q3 = s.quantile(0.25), s.quantile(0.75)
        iqr = q3 - q1
        low_thr  = q1 - 1.5*iqr
        high_thr = q3 + 1.5*iqr
        n_low  = int((s < low_thr).sum())
        n_high = int((s > high_thr).sum())
        outlier_rows.append({
            "checagem":"outliers num_eventos (IQR 1.5x)",
            "lim_inf": round(low_thr,2),
            "lim_sup": round(high_thr,2),
            "n_outliers_baixos": n_low,
            "n_outliers_altos": n_high,
            "pct_outliers": percent(n_low+n_high, len(s))
        })

outliers_check = pd.DataFrame(outlier_rows)
outliers_check


Unnamed: 0,checagem,lim_inf,lim_sup,n_outliers_baixos,n_outliers_altos,pct_outliers
0,outliers num_eventos (IQR 1.5x),-203.0,1941.0,0,9,1.75


## Recomendações automáticas de limpeza (heurísticas)

In [12]:
recs = []

# duplicatas
if not dup_check.empty:
    for _, r in dup_check.iterrows():
        if r["duplicatas"] > 0:
            recs.append(f"Remover duplicatas em {r['dataset']} -> {r['coluna']} (encontradas {r['duplicatas']}).")

# referencial
if not ref_check.empty:
    for _, r in ref_check.iterrows():
        if r["qtde_fk_nao_encontrada"] > 0:
            recs.append(f"Ajustar integridade: {r['fk']} com {r['qtde_fk_nao_encontrada']} valores sem correspondente em {r['pk']}.")

# temporal
if not temporal_check.empty:
    for _, r in temporal_check.iterrows():
        if r["inconsistencias"] > 0:
            recs.append(f"Rever eventos com {r['checagem']}: {r['inconsistencias']} inconsistências.")

# nulos críticos
if not null_checks_df.empty:
    crit_cols = [("deputados","id"),("freq_eventos","id_deputado"),("eventos_sessoes","id_evento")]
    for ds, col in crit_cols:
        sub = null_checks_df[(null_checks_df["dataset"]==ds) & (null_checks_df["coluna"]==col)]
        if len(sub) and float(sub.iloc[0]["nulos"])>0:
            recs.append(f"Preencher/filtrar nulos em {ds}.{col} (chave crítica).")

# outliers
if not outliers_check.empty:
    n_out = int(outliers_check["n_outliers_altos"].fillna(0).sum() + outliers_check["n_outliers_baixos"].fillna(0).sum())
    if n_out > 0:
        recs.append("Investigar outliers em num_eventos (IQR); verificar mandato parcial/ausências prolongadas.")

recs = pd.DataFrame({"recomendacao": recs})
recs if len(recs) else pd.DataFrame({"recomendacao":["Nenhuma recomendação automática gerada. Dados parecem consistentes nas checagens aplicadas."]})


Unnamed: 0,recomendacao
0,Ajustar integridade: presencas.id_deputado com...
1,Ajustar integridade: votos_deputados.id_deputa...
2,Investigar outliers em num_eventos (IQR); veri...


## Scorecard de Qualidade (PASS/FAIL por regra)

In [13]:
checks = []

# Existência
for _, r in exist_checks.iterrows():
    checks.append({"check":"existencia", "dataset": r["dataset"], "descricao":"arquivo existe", "valor": bool(r["existe"]), "status": "PASS" if r["existe"] else "FAIL"})

# Esquema (faltando colunas)
for _, r in schema_check.iterrows():
    status = "PASS" if r["ok"] else "WARN"  # WARN para esquemas mínimos (flexível)
    checks.append({"check":"esquema_min", "dataset": r["dataset"], "descricao":"colunas mínimas presentes", "valor": r["ok"], "status": status, "detalhe": f"faltando={r['faltando']}"})

# Duplicatas
for _, r in dup_check.iterrows():
    status = "PASS" if r["duplicatas"]==0 else "FAIL"
    checks.append({"check":"duplicatas", "dataset": r["dataset"], "descricao": f"unicidade de {r['coluna']}", "valor": int(r["duplicatas"]), "status": status})

# Nulos críticos
if not null_checks_df.empty:
    for _, r in null_checks_df.iterrows():
        # define críticas: ID e chaves
        crit = (r["coluna"] in {"id","id_evento","id_deputado"})
        status = "PASS"
        if crit and pd.notna(r["nulos"]) and int(r["nulos"])>0:
            status = "FAIL"
        checks.append({"check":"nulos", "dataset": r["dataset"], "descricao": f"nulos em {r['coluna']}", "valor": r["pct_nulos"], "status": status})

# FK
for _, r in ref_check.iterrows():
    status = "PASS" if r["qtde_fk_nao_encontrada"]==0 else "FAIL"
    checks.append({"check":"fk", "dataset": r["fk"].split(".")[0], "descricao": f"{r['fk']} ⊆ {r['pk']}", "valor": int(r["qtde_fk_nao_encontrada"]), "status": status})

# Temporal
for _, r in temporal_check.iterrows():
    status = "PASS" if r["inconsistencias"]==0 else "FAIL"
    checks.append({"check":"temporal", "dataset": "eventos_sessoes", "descricao": r["checagem"], "valor": r["inconsistencias"], "status": status})

# Domínios (apenas informativo)
for _, r in domain_check.iterrows():
    checks.append({"check":"dominio", "dataset":"catalogos", "descricao": r["checagem"], "valor": 1, "status": "INFO"})

# Outliers
for _, r in outliers_check.iterrows():
    status = "PASS" if (r["n_outliers_baixos"] + r["n_outliers_altos"])==0 else "WARN"
    checks.append({"check":"outliers", "dataset":"freq_eventos", "descricao":"IQR 1.5x num_eventos", "valor": r["pct_outliers"], "status": status})

quality_report = pd.DataFrame(checks)[["check","dataset","descricao","valor","status"]].sort_values(["status","check","dataset"])
quality_report


Unnamed: 0,check,dataset,descricao,valor,status
30,fk,presencas,presencas.id_deputado ⊆ deputados.id,411,FAIL
31,fk,votos_deputados,votos_deputados.id_deputado ⊆ deputados.id,289,FAIL
32,dominio,catalogos,remuneracoes.situacao em catálogo cod_situacao...,1,INFO
33,dominio,catalogos,top10 tipo_presenca (amostra),1,INFO
14,duplicatas,deputados,unicidade de id,0,PASS
15,duplicatas,presencas,"unicidade de (id_evento,id_deputado,tipo_prese...",0,PASS
13,esquema_min,cod_situacao,colunas mínimas presentes,True,PASS
7,esquema_min,deputados,colunas mínimas presentes,True,PASS
8,esquema_min,freq_eventos,colunas mínimas presentes,True,PASS
11,esquema_min,presencas,colunas mínimas presentes,True,PASS


## Resumo Executivo de Qualidade

In [14]:
# KPIs
kpis = []

# cobertura freq_eventos
cov_freq = np.nan
if (df_freq is not None) and (df_deps is not None) and "id" in df_deps.columns and "id_deputado" in df_freq.columns:
    deps_com_freq = df_freq["id_deputado"].nunique()
    deps_total    = df_deps["id"].nunique()
    cov_freq = percent(deps_com_freq, deps_total)
    kpis.append({"indicador":"Cobertura freq_eventos (% deps)", "valor": cov_freq})

# duplicatas críticas
for _, r in dup_check.iterrows():
    if "id" in r["coluna"] or "id_evento" in r["coluna"]:
        kpis.append({"indicador": f"Duplicatas {r['dataset']}.{r['coluna']}", "valor": int(r["duplicatas"])})

# nulos críticos (id)
crit_nulls = null_checks_df[null_checks_df["coluna"].isin(["id","id_evento","id_deputado"])] if not null_checks_df.empty else pd.DataFrame()
if not crit_nulls.empty:
    for _, r in crit_nulls.iterrows():
        kpis.append({"indicador": f"Nulos {r['dataset']}.{r['coluna']} (%)", "valor": r["pct_nulos"]})

# fks quebradas
for _, r in ref_check.iterrows():
    kpis.append({"indicador": f"FK sem correspondência: {r['fk']}→{r['pk']}", "valor": int(r["qtde_fk_nao_encontrada"])})

# eventos incoerentes temporalmente
if not temporal_check.empty:
    inco = int(temporal_check["inconsistencias"].sum())
    kpis.append({"indicador":"Eventos temporalmente incoerentes (soma checagens)", "valor": inco})

# outliers num_eventos
if not outliers_check.empty:
    kpis.append({"indicador":"Outliers em num_eventos (%)", "valor": float(outliers_check["pct_outliers"].iloc[0])})

resumo_qualidade = pd.DataFrame(kpis)
resumo_qualidade


Unnamed: 0,indicador,valor
0,Cobertura freq_eventos (% deps),100.0
1,Duplicatas deputados.id,0.0
2,"Duplicatas presencas.(id_evento,id_deputado,ti...",0.0
3,Nulos deputados.id (%),0.0
4,Nulos freq_eventos.id_deputado (%),0.0
5,Nulos presencas.id_evento (%),0.0
6,Nulos presencas.id_deputado (%),0.0
7,Nulos votos_deputados.id_deputado (%),0.0
8,Nulos remuneracoes.id_deputado (%),0.0
9,FK sem correspondência: freq_eventos.id_deputa...,0.0
