
# DBT + Airflow para **Silver (Dev)** en Databricks  
**Objetivo:** levantar un entorno paralelo de *silver desarrollo* (vía **dbt**) sin tocar *silver producción*, y orquestarlo con **Airflow**.

> Este notebook es una guía ejecutable para que el equipo configure dbt con Databricks (adapter oficial) y corra *pipelines* de *silver dev* en paralelo a *silver prod*. Incluye ejemplos de `profiles.yml`, `dbt_project.yml`, un modelo incremental y un DAG de Airflow.



## 1) Requisitos previos (resumen ejecutivo)
- **Databricks** con Unity Catalog habilitado (ideal) o al menos un Cluster/SQL Warehouse accesible.
- **Adapter dbt-databricks** (recomendado) o **dbt-spark** (si no usas Databricks SQL).  
- **GitHub** (código) + **CI/CD runner** o máquina donde correr `dbt` (puede ser un Job en Databricks o un servidor con Airflow).
- **Esquemas aislados**: `silver` (prod) y `silver_dev` (desarrollo).  
  - Alternativa: usar `schema: silver__{{ env_var('DBT_ENV_NAME', 'dev') }}` para esquemas por entorno.
- **Credenciales**: Token personal o Service Principal de Databricks, host del workspace y (si aplica) catalog/database.



## 2) Instalación del adapter de dbt
En el entorno donde correrá dbt (local, runner, contenedor o VM):

```bash
# Opción 1: Databricks (recomendada)
pip install dbt-databricks

# Opción 2: Spark genérico
pip install dbt-spark[PyHive]
```



## 3) `profiles.yml` (dos *targets*: `prod` y `dev`)
Crea `~/.dbt/profiles.yml` (o usa variable `DBT_PROFILES_DIR`) similar a esto.  
**Nota:** abajo también te dejo un archivo descargable `profiles.yml.example`.

```yaml
databricks_project:
  target: dev
  outputs:
    prod:
      type: databricks
      catalog: main                    # Ajusta a tu catalog (Unity Catalog)
      schema: silver                   # Esquema producción
      host: https://<tu-workspace>.cloud.databricks.com
      http_path: /sql/1.0/warehouses/<WAREHOUSE_ID>   # Si usas SQL Warehouse
      token: "{{ env_var('DATABRICKS_TOKEN') }}"
      threads: 8

    dev:
      type: databricks
      catalog: main
      schema: silver_dev               # Esquema desarrollo
      host: https://<tu-workspace>.cloud.databricks.com
      http_path: /sql/1.0/warehouses/<WAREHOUSE_ID>
      token: "{{ env_var('DATABRICKS_TOKEN') }}"
      threads: 8
```



## 4) `dbt_project.yml`
Proyecto mínimo para aislar *silver dev* y *silver prod* por target.  
Archivo de ejemplo también disponible como `dbt_project.yml.example`:

```yaml
name: databricks_silver
version: 1.0.0
config-version: 2

profile: databricks_project

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]

models:
  databricks_silver:
    +materialized: incremental
    +on_schema_change: append_new_columns
    +file_format: delta
    +tags: ["silver"]

    # Puedes sobreescribir por subcarpetas (silver/, gold/, etc.)
    silver:
      +materialized: incremental
      +schema: "{{ target.schema }}"   # respeta el schema de cada target (prod/dev)
```



## 5) Estructura de carpetas sugerida
```
dbt_project_root/
├─ models/
│  └─ silver/
│     ├─ stg_eventos.sql
│     └─ usuarios_incremental.sql
├─ seeds/
├─ macros/
├─ dbt_project.yml
└─ profiles.yml  (o en ~/.dbt/profiles.yml)
```



## 6) Modelo incremental de ejemplo (Delta + deduplicación)
Guarda como `models/silver/usuarios_incremental.sql` (también dejo el archivo listo para descargar).

```sql
{{ config(
    materialized='incremental',
    unique_key='user_id',
    incremental_strategy='merge'
) }}

with fuente as (
    -- Ejemplo: tabla BRONZE ya creada por tus notebooks de Databricks
    select
        cast(user_id as string)         as user_id,
        lower(trim(user_name))          as user_name,
        cast(event_ts as timestamp)     as event_ts
    from {{ source('bronze', 'events_raw') }}
), dedup as (
    select *
    from (
      select
        *,
        row_number() over (partition by user_id order by event_ts desc) as rn
      from fuente
    ) x
    where rn = 1
)

select user_id, user_name, event_ts
from dedup

{% if is_incremental() %}
  -- En modo incremental, trae solo nuevos/actualizados
  where event_ts > coalesce((select max(event_ts) from {{ this }}), '1900-01-01')
{% endif %}
```
> **Nota:** Define tus *sources* en `models/sources.yml` apuntando a tus tablas/volúmenes reales.



## 7) Comandos clave de dbt
```bash
# Inicializa proyecto (si es nuevo)
dbt init databricks_silver

# Prueba conexión
dbt debug --target dev

# Documentación local (opcional)
dbt docs generate && dbt docs serve

# Correr en DEV (silver_dev)
dbt run --select tag:silver --target dev
dbt test --select tag:silver --target dev

# Correr en PROD (silver)
dbt run --select tag:silver --target prod
dbt test --select tag:silver --target prod
```



## 8) Orquestación con Airflow (BashOperator)
El DAG que te dejo (`dags/airflow_dag_dbt_silver.py`) corre `dbt run/test` con *targets* `dev` y `prod`.  
Requisitos en el *worker*: Python con `dbt-databricks` instalado y variables de entorno (`DATABRICKS_TOKEN`, etc.).


In [None]:

# (Opcional) Verifica variables de entorno requeridas para dbt-databricks
import os
vars_needed = ["DATABRICKS_TOKEN"]
missing = [v for v in vars_needed if not os.environ.get(v)]
print("Variables faltantes:", missing if missing else "OK (todas presentes)")



## 9) Buenas prácticas para “Silver paralelo”
- Mantén **aislamiento por esquema** (`silver` vs `silver_dev`) o por `catalog` si aplica UC.  
- Usa **tags** (`tag:silver`) para seleccionar modelos en jobs y CI/CD.  
- Aplica **pruebas dbt** (unique, not_null, relationships) antes de promover a *prod*.  
- En Databricks, prioriza **formato Delta** y `ZORDER`/`OPTIMIZE` en tablas pesadas.  
- Versiona tus `sources` y modelos con *pull requests* y *code reviews*.
