# W04A — Data Contract v1 + Calidad mínima (checks sí o sí)

**Objetivo:** formalizar un *contrato de datos* (mínimo viable) e implementar *checks* de calidad
que eviten problemas downstream (JOINs que inflan filas, métricas incoherentes, etc.).

**Recordemos que:**
- Un JOIN puede inflar filas si la clave no es única.
- Convierte esa lección en **política**: *antes* de transformar o unir, validamos.

**DDIA (Kleppmann):**
- Cap. 2: modelos de datos + “lo que prometemos” a consumidores.
- Cap. 4: evolución de esquema / compatibilidad: el contrato versiona expectativas.

In [None]:
import os
os.chdir("..")
os.getcwd()

In [None]:
# Setup común (cross-platform)
from pathlib import Path
import duckdb, json
from datetime import datetime, timezone

PROJECT_ROOT = Path(".").resolve()
RAW_DIR = PROJECT_ROOT / "data" / "raw"
DB_PATH = PROJECT_ROOT / "data" / "exoplanets.duckdb"
DOCS_DIR = PROJECT_ROOT / "docs"
ART_DIR = PROJECT_ROOT / "artifacts"

RAW_DIR.mkdir(parents=True, exist_ok=True)
DOCS_DIR.mkdir(parents=True, exist_ok=True)
ART_DIR.mkdir(parents=True, exist_ok=True)

raw_csv = RAW_DIR / "pscomppars.csv"

con = duckdb.connect(str(DB_PATH))

def sql_quote(s: str) -> str:
    return "'" + s.replace("'", "''") + "'"

# Bronze: raw_ps (schema-on-read)
if not raw_csv.exists():
    raise FileNotFoundError(
        f"No encuentro {raw_csv}. "
        "Asegúrate de tener el CSV en data/raw/pscomppars.csv (de W01/W02)."
    )

raw_csv_abs = raw_csv.resolve()
con.execute(
    f"CREATE OR REPLACE VIEW raw_ps AS SELECT * FROM read_csv_auto({sql_quote(str(raw_csv_abs))})"
)

# Ver columnas disponibles
con.sql("DESCRIBE raw_ps").show()

## 1) ¿Qué es un “data contract” (mínimo viable)?

Un **contrato de datos** es el *acuerdo explícito* entre productor y consumidor sobre:
- **Esquema** (nombres, tipos aproximados, significado)
- **Claves / granularidad** (qué identifica una fila)
- **Reglas de calidad** (nulos permitidos, rangos plausibles, unicidad, consistencia)
- **Versionado** (v1, v2…) y cómo cambian las expectativas

> En este curso lo tratamos como una “interfaz” entre etapas **Bronze → Silver → Gold**.

| Check                  | Tipo         | Qué valida                                                |
|------------------------|--------------|------------------------------------------------------------|
| pk_unique_pl_name      | uniqueness   | Que no existan duplicados en `pl_name`                    |
| nulls_pl_name          | completeness | Que `pl_name` no tenga valores nulos                      |
| nulls_hostname         | completeness | Que `hostname` no tenga valores nulos                     |
| disc_year_range        | validity     | Que el año de descubrimiento esté dentro de un rango válido |

El threshold es la cantidad máxima de errores que aceptas antes de decir “esto está mal”.

In [None]:
# DEMO (lectura guiada): contrato v1 (muy corto) como JSON para versionar expectativas
contract_v1 = {
  "dataset": "nasa_exoplanets_pscomppars",
  "version": "1.0.0",
  "table_bronze": "raw_ps",
  "grain": "1 fila ~ 1 planeta (pl_name) en catálogo",
  "primary_key_candidate": ["pl_name"],
  "required_columns": ["pl_name", "hostname"],
  "quality_checks": [
    {"name": "pk_unique_pl_name", "type": "uniqueness", "threshold": 0, "metric": "n_duplicates"},
    {"name": "nulls_pl_name", "type": "completeness", "threshold": 0, "metric": "n_nulls"},
    {"name": "nulls_hostname", "type": "completeness", "threshold": 0, "metric": "n_nulls"},
    {"name": "disc_year_range", "type": "validity", "threshold": 0, "metric": "n_out_of_range"},
  ],
  "notes": "Contrato v1: checks mínimos para no romper JOINs/agrupaciones."
}

(DOCS_DIR / "data_contract_v1.json").write_text(json.dumps(contract_v1, indent=2), encoding="utf-8")
print("Escribí docs/data_contract_v1.json")

### Un contract más robusto sería

```json
contract_v2 = {
  "dataset": "nasa_exoplanets_pscomppars",
  "version": "1.0.0",
  "owner": "data-engineering@nasa.gov",
  "domain": "astronomia.exoplanetas",
  "table_bronze": "raw_ps",
  "grain": "1 fila = 1 planeta único identificado por pl_name",
  
  "schema_expected": {
    "pl_name": "string",
    "hostname": "string",
    "disc_year": "integer",
    "pl_orbper": "float",
    "pl_rade": "float"
  },

  "primary_key_candidate": ["pl_name"],
  "required_columns": ["pl_name", "hostname", "disc_year"],

  "quality_checks": [
    {
      "name": "pk_unique_pl_name",
      "type": "uniqueness",
      "metric": "n_duplicates",
      "threshold": 0,
      "severity": "critical"
    },
    {
      "name": "nulls_pl_name",
      "type": "completeness",
      "metric": "n_nulls",
      "threshold": 0,
      "severity": "critical"
    },
    {
      "name": "nulls_hostname",
      "type": "completeness",
      "metric": "n_nulls",
      "threshold": 0,
      "severity": "high"
    },
    {
      "name": "disc_year_range",
      "type": "validity",
      "metric": "n_out_of_range",
      "params": {"min": 1988, "max": 2025},
      "threshold": 0,
      "severity": "medium"
    },
    {
      "name": "orbital_period_positive",
      "type": "validity",
      "metric": "n_invalid",
      "rule": "pl_orbper > 0",
      "threshold": 0,
      "severity": "medium"
    }
  ],

  "business_rules": [
    "Cada planeta debe estar asociado a un único hostname.",
    "disc_year debe ser mayor o igual al año del primer exoplaneta detectado (1988)."
  ],

  "refresh_policy": {
    "frequency": "daily",
    "expected_arrival_time": "03:00 UTC",
    "late_data_tolerance_minutes": 120
  },

  "notes": "Contrato v2: incluye esquema, severidad, reglas de negocio y políticas de refresco."
}


## 2) Checks mínimos (la idea, antes del SQL)

**Completeness** (nulos en columnas clave):  
- `nulls_pl_name` = `COUNT(*) - COUNT(pl_name)`  
- `nulls_hostname` = `COUNT(*) - COUNT(hostname)`

**Uniqueness** (clave no debe duplicarse):  
- duplicados por `pl_name` → `GROUP BY pl_name HAVING COUNT(*)>1`

**Validity** (rangos plausibles):  
- `disc_year` debería estar en un rango razonable (ej. 1980–2026).

In [None]:
# DEMO (docente): 3 checks con SQL básico

# 1) Nulos en columnas clave 
con.sql("""
SELECT
  COUNT(*) AS total_rows,
  COUNT(*) - COUNT(pl_name)  AS nulls_pl_name,
  COUNT(*) - COUNT(hostname) AS nulls_hostname
FROM raw_ps
""").show()

# 2) Duplicados por pl_name (si existen, alerta)
con.sql("""
SELECT pl_name, COUNT(*) AS c
FROM raw_ps
WHERE pl_name IS NOT NULL
GROUP BY pl_name
HAVING COUNT(*) > 1
ORDER BY c DESC
LIMIT 10
""").show()

# 3) Rangos plausibles de disc_year (ajusta si deseas)
con.sql("""
SELECT COUNT(*) AS n_out_of_range
FROM raw_ps
WHERE disc_year IS NOT NULL
  AND (disc_year < 1980 OR disc_year > 2026)
""").show()

## TU TURNO (en clase)

Vas a construir un mini-reporte de calidad para **12 columnas** (tú puedes ajustar la lista),
y luego lo guardas como tabla `quality_w03a` y como CSV en `artifacts/`.

### TU TURNO 1 — Nulos en 12 columnas clave

In [None]:
picked = [
  "pl_name","hostname","disc_year","discoverymethod",
  "pl_orbper","pl_rade","pl_bmasse",
  "sy_dist","ra","dec",
  "sy_snum","sy_pnum"
]

parts = [
  f"SELECT '{c}' AS col, COUNT(*) - COUNT({c}) AS nulls FROM raw_ps"
  for c in picked
]
query = " UNION ALL ".join(parts) + " ORDER BY nulls DESC, col ASC"
con.sql(query).show()

### TU TURNO 2 — Un check de rango (elige 1)

In [None]:
# TODO: elige UNO y deja solo uno activo

# A) pl_rade (radio) en rango didáctico
query = """
SELECT COUNT(*) AS n_bad_pl_rade
FROM raw_ps
WHERE pl_rade IS NOT NULL
  AND (pl_rade <= 0 OR pl_rade > 30)
"""
con.sql(query).show()

# B) sy_dist (distancia) en rango didáctico
# query = """
# SELECT COUNT(*) AS n_bad_sy_dist
# FROM raw_ps
# WHERE sy_dist IS NOT NULL
#   AND (sy_dist <= 0 OR sy_dist > 20000)
# """
# con.sql(query).show()

# C) disc_year plausible
# query = """
# SELECT COUNT(*) AS n_bad_disc_year
# FROM raw_ps
# WHERE disc_year IS NOT NULL
#   AND (disc_year < 1980 OR disc_year > 2026)
# """
# con.sql(query).show()

# W04B — Construyendo **Silver** (schema estable) + primeras vistas **Gold**

**Objetivo:** pasar de Bronze-lite (`raw_ps`) a un **Silver** con:
- columnas oficiales (Contract v1),
- llaves y granularidad explícitas,
- reglas simples de limpieza,
- dimensiones con **1 fila por clave** (para JOINs sanos),
y luego crear 2 vistas **Gold** de ejemplo.

In [None]:
import os
os.chdir("..")
os.getcwd()

In [None]:
# Setup (cross-platform)
from pathlib import Path
import duckdb, json
from datetime import datetime, timezone

PROJECT_ROOT = Path(".").resolve()
RAW_DIR = PROJECT_ROOT / "data" / "raw"
DB_PATH = PROJECT_ROOT / "data" / "exoplanets.duckdb"
DOCS_DIR = PROJECT_ROOT / "docs"
ART_DIR  = PROJECT_ROOT / "artifacts"

RAW_DIR.mkdir(parents=True, exist_ok=True)
DOCS_DIR.mkdir(parents=True, exist_ok=True)
ART_DIR.mkdir(parents=True, exist_ok=True)

raw_csv = RAW_DIR / "pscomppars.csv"
con = duckdb.connect(str(DB_PATH))

def sql_quote(s: str) -> str:
    return "'" + s.replace("'", "''") + "'"

if not raw_csv.exists():
    raise FileNotFoundError(
        f"No encuentro {raw_csv}. Asegúrate de tenerlo en data/raw/pscomppars.csv (W01/W02)."
    )

raw_csv_abs = raw_csv.resolve()
con.execute(f'''
CREATE OR REPLACE VIEW raw_ps AS
SELECT * FROM read_csv_auto({sql_quote(str(raw_csv_abs))})
''')

con.sql("DESCRIBE raw_ps").show()

## 1) Diseño Silver (mínimo viable)

**Contract v1 (Core 16 columnas):**
`pl_name, hostname, discoverymethod, disc_year, sy_snum, sy_pnum, sy_dist, ra, dec, pl_orbper, pl_rade, pl_bmasse, pl_eqt, st_teff, st_rad, st_mass`

**Reglas Silver (hoy):**
- `pl_name` y `hostname` no nulos
- `disc_year` en [1980, 2026] si no es nulo
- rangos didácticos:
  - `pl_rade` > 0 y ≤ 30 si no es nulo
  - `pl_bmasse` > 0 si no es nulo

In [None]:
# TU TURNO 1: construir silver_planet (igual que el docente, pero tú decides 1 regla extra)
# Elige UNA regla extra:
# - (pl_orbper IS NULL OR pl_orbper > 0)
# - (sy_dist  IS NULL OR sy_dist  > 0)
# - (pl_eqt   IS NULL OR pl_eqt   > 0)

con.execute("DROP TABLE IF EXISTS silver_planet")

con.execute('''
-- TODO: crea silver_planet seleccionando las 16 columnas (Core) y aplicando reglas
SELECT 1;
''')

# Validación:
# con.sql("SELECT COUNT(*) AS n_rows, COUNT(DISTINCT pl_name) AS n_pl FROM silver_planet").show()

## 2) Dimensiones (1 fila por clave)

Estrategia “mesurada” (sin window functions):
- `GROUP BY hostname`
- para cada columna: `MAX(col)` para consolidar una fila por hostname

In [None]:
# TU TURNO 2: crea dim_host_full (1 fila por hostname)
# Debe contener: hostname + 3 columnas de tu elección entre:
# sy_dist, ra, dec, st_teff, st_rad, st_mass

con.execute("DROP TABLE IF EXISTS dim_host_full")
con.execute('''
-- TODO: CREATE TABLE dim_host_full AS SELECT hostname, MAX(...) ... GROUP BY hostname
SELECT 1;
''')

# Validación:
# con.sql("SELECT COUNT(*) AS n_rows, COUNT(DISTINCT hostname) AS n_keys FROM dim_host_full").show()

## 3) Fact table (grain: 1 fila ≈ 1 planeta)

Creamos `fact_planet` desde Silver usando `SELECT DISTINCT`.
Si aparecen duplicados por `pl_name`, se documenta como issue de calidad.

In [None]:
# TU TURNO 3: crea fact_planet desde silver_planet (usa DISTINCT)
con.execute("DROP TABLE IF EXISTS fact_planet")
con.execute('''
-- TODO: CREATE TABLE fact_planet AS SELECT DISTINCT ...
SELECT 1;
''')

# Validación:
# con.sql("SELECT COUNT(*) AS n_rows, COUNT(DISTINCT pl_name) AS n_pl FROM fact_planet").show()

## 4) JOINs sanos + 2 vistas Gold
- JOIN sano: `COUNT(join)` ≈ `COUNT(fact)` (no inflar).
- Gold: `gold_by_method` y `gold_by_host`.

In [None]:
# TU TURNO 4: crea UNA vista Gold (elige A o B)

# A) gold_by_method
# con.execute("DROP VIEW IF EXISTS gold_by_method")
# con.execute('''
# -- TODO: CREATE VIEW gold_by_method AS ...
# ''')
# con.sql("SELECT * FROM gold_by_method LIMIT 10").show()

# B) gold_by_host
# con.execute("DROP VIEW IF EXISTS gold_by_host")
# con.execute('''
# -- TODO: CREATE VIEW gold_by_host AS ...
# ''')
# con.sql("SELECT * FROM gold_by_host LIMIT 10").show()

## 5) Contract Silver v1 + trazabilidad

Escribimos `docs/data_contract_silver_v1.json` describiendo tablas Silver/Gold.
Si cambias columnas o reglas: incrementa versión.

In [None]:
contract_silver_v1 = {
  "dataset": "nasa_exoplanets_pscomppars",
  "version": "1.0.0",
  "bronze": {"table": "raw_ps", "note": "Bronze-lite (Core 16 columnas)"},
  "silver": {
    "tables": ["silver_planet", "dim_host_full", "fact_planet"],
    "grain_fact": "1 fila ≈ 1 planeta (pl_name)",
    "keys": {"fact_planet": ["pl_name"], "dim_host_full": ["hostname"]},
    "core_columns": [
      "pl_name","hostname","discoverymethod","disc_year",
      "sy_snum","sy_pnum","sy_dist","ra","dec",
      "pl_orbper","pl_rade","pl_bmasse","pl_eqt",
      "st_teff","st_rad","st_mass"
    ],
    "quality_minimum": [
      "pl_name NOT NULL",
      "hostname NOT NULL",
      "disc_year in [1980,2026] if not null",
      "pl_rade in (0,30] if not null",
      "dim_host_full: hostname unique"
    ]
  },
  "gold": {"views": ["gold_by_method", "gold_by_host"]},
  "notes": "Si cambias columnas/reglas, incrementa version."
}

(DOCS_DIR / "data_contract_silver_v1.json").write_text(
    json.dumps(contract_silver_v1, indent=2), encoding="utf-8"
)
print("Guardé docs/data_contract_silver_v1.json")

## Reflexión (bitácora)
- ¿Qué evidencia mínima te convence de que tu JOIN es sano?
- ¿Qué trade-off hay entre “limpiar mucho” vs “no perder datos”?

### TU TURNO 3 — Materializa un reporte (tabla) y expórtalo

In [None]:
# TODO: materializa quality_w03a con 3 checks mínimos (2 de nulos + 1 de rango)
# Explica en el reporte con detalle esta solución
# Pistas:
# - CREATE TABLE quality_w03a AS
# SELECT {sql_quote(run_ts)} AS run_ts, 'nulls_pl_name' AS check_name,
#        (COUNT(*) - COUNT(pl_name))::BIGINT AS metric_value
# FROM ... 
#  ...
# - Cada SELECT devuelve: run_ts, check_name, metric_value
# - Reusa: COUNT(*) - COUNT(col) para nulos

run_ts = datetime.now(timezone.utc).isoformat()

sql = f"""
-- TODO: reemplaza esto por tu CREATE TABLE + UNION ALL
SELECT 1;
"""

# con.execute("DROP TABLE IF EXISTS quality_w03a")
# con.execute(sql)
# con.sql("SELECT * FROM quality_w03a").show()

# Exporta evidencia (cuando ya exista quality_w03a)
# out_csv = ART_DIR / f"w03a_quality_{run_ts.replace(':','-')}.csv"
# con.execute(f"COPY (SELECT * FROM quality_w03a) TO {sql_quote(str(out_csv))} (HEADER, DELIMITER ',')")
# print("Exporté:", out_csv)

In [None]:
try:
    con.close()
    print("DuckDB connection closed.")
except NameError:
    print("No connection named 'con' in this notebook.")

## Para entregar (W04)
### En clase (evidencia mínima)
1) En `docs/w03a_quality_report.md` pega:
   - Output de **TU TURNO 1** (tabla de nulos en 12 columnas).
   - Output de **TU TURNO 2** (1 check de rango) + 2–3 líneas de interpretación.
2) En `docs/decisions_log.md`: 1 entrada corta:
   - “Qué 12 columnas escogí y por qué” + evidencia (tabla de nulos o conteos).
3) `docs/w03b_silver_report.md` con outputs:
   - `DESCRIBE silver_planet` + conteos (rows/distinct pl_name/hostname)
   - `dim_host_full`: `n_rows` vs `n_keys`
   - `n_fact` vs `n_join` (JOIN sano)
4) `docs/decisions_log.md`: 1 decisión:
   - “Reglas Silver aplicadas + evidencia”.

### Tarea (para la próxima clase)
1) Termina **TU TURNO 3**:
   - Explica en `docs/w03a_quality_report.md` con detalle el querry
   - Crear `quality_w03a` (3 checks mínimos) y exportar 1 CSV a `artifacts/`.
2) Completa TU TURNO 4 (una vista Gold) si no la hiciste.
3) Exporta 1 vista Gold a CSV en `artifacts/` (COPY ... TO).
4) (Opcional) si agregaste una regla extra, documéntala como versión `v1.0.1`.

## Reflexión (bitácora)
- ¿Qué check te sorprendió más y por qué?
- ¿Qué consecuencias tendría para W02B (JOINs) que `pl_name` o `hostname` tuviera muchos nulos/duplicados?