# Sesion Tecnica -- Data Pipelines y DBT
17 Diciembre 2025

# ETL y ELT
![Proceso de ETL vs ELT](https://d2908q01vomqb2.cloudfront.net/b6692ea5df920cad691c20319a6fffd7a4a766b8/2019/12/12/ETLandELTRedshift1.png)
- [Explicacion por AWS](https://aws.amazon.com/es/compare/the-difference-between-etl-and-elt/)

## La utilidad de DBT
"You can use dbt to modularize and centralize your analytics code, while also providing your data team with guardrails typically found in software engineering workflows. Collaborate on data models, version them, and test and document your queries before safely deploying them to production, with monitoring and visibility.

dbt compiles and runs your analytics code against your data platform, enabling you and your team to collaborate on a single source of truth for metrics, insights, and business definitions. This single source of truth, combined with the ability to define tests for your data, reduces errors when logic changes, and alerts you when issues arise.

...

Use dbt to quickly and collaboratively transform data and deploy analytics code following software engineering best practices like version control, modularity, portability, CI/CD, and documentation. This means anyone on the data team comfortable with SQL can safely contribute to production-grade data pipelines."
- [What is DBT?](https://docs.getdbt.com/docs/introduction)

![dbt trabaja al lado de tu sistema de datos](https://docs.getdbt.com/img/docs/cloud-overview.jpg?v=2)

## Prerequisitos Bash

In [1]:
# uv
!brew install uv 

# Sync the project dependencies
!uv sync

[?25l[K[34m⠋[0m JSON API formula.jws.json                        [Downloading  32.1MB/-------]
[K[34m⠋[0m JSON API cask.jws.json                           [Downloading  14.7MB/-------][1F[K[34m⠋[0m JSON API formula.jws.json                        [Downloading  32.1MB/-------]
[K[34m⠋[0m JSON API cask.jws.json                           [Downloading  14.7MB/-------][1F[K[34m⠙[0m JSON API formula.jws.json                        [Downloading  32.1MB/-------]
[K[34m⠙[0m JSON API cask.jws.json                           [Downloading  14.7MB/-------][1F[K[34m⠙[0m JSON API formula.jws.json                        [Downloading  32.1MB/-------]
[K[34m⠙[0m JSON API cask.jws.json                           [Downloading  14.7MB/-------][1F[K[34m⠚[0m JSON API formula.jws.json                        [Downloading  32.1MB/-------]
[K[34m⠚[0m JSON API cask.jws.json                           [Downloading  14.7MB/-------][1F[K[34m⠚[0m JSON API formula.jws.json          

En el terminal desde el proyecto root `iieg_dbt`, iniciar el proyecto de dbt con [duckdb](https://duckdb.org/):
```bash
source .venv/bin/activate
dbt init duckdb_elt
```
y seguir las instrucciones. Por default el base de datos se iniciara en `duckdb_elt/dev.duckdb`

## Nuestro primer fuente de datos: el DOF
Vamos a hacer un scraping de los datos de tablas html en el sitio del [DOF](https://www.dof.gob.mx/nota_detalle_popup.php?codigo=5711612).

In [2]:
# Code inicial para el scraper DOF
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime

import logging
import pandas as pd
import requests
import warnings

warnings.filterwarnings('ignore')
logger = logging.getLogger(__name__)

class DOFScraper:
    def __init__(self, verify_ssl: bool = False):
        """
        Initialize DOF scraper.

        Args:
            verify_ssl: Whether to verify SSL certificates (default: False for DOF)
        """
        self.verify_ssl = verify_ssl
        self.session = requests.Session()
        self.session.verify = verify_ssl

        # Set headers to mimic browser
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'es-MX,es;q=0.9,en;q=0.8',
        })

    def fetch_html(self, url: str) -> Tuple[str, Dict[str, Any]]:
        """
        Fetch HTML content from URL.

        Args:
            url: URL to fetch

        Returns:
            Tuple of (html_content, metadata)

        Raises:
            requests.RequestException: If fetch fails
        """
        logger.info(f"Fetching HTML from: {url}")

        try:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()

            # Detect encoding (DOF uses iso-8859-1)
            if response.encoding is None or response.encoding == 'ISO-8859-1':
                response.encoding = 'iso-8859-1'

            html_content = response.text

            # Extract metadata
            metadata = {
                'url': url,
                'fetch_timestamp': datetime.now(),
                'html_size_bytes': len(html_content.encode('utf-8')),
                'status_code': response.status_code,
                'encoding': response.encoding,
            }

            logger.info(f"Successfully fetched {metadata['html_size_bytes']} bytes")
            return html_content, metadata

        except requests.RequestException as e:
            logger.error(f"Failed to fetch URL: {e}")
            raise


In [3]:
# DOF 2024
dof_html, dof_metadata = DOFScraper().fetch_html('https://www.dof.gob.mx/nota_detalle_popup.php?codigo=5711612')
tables = pd.read_html(dof_html, thousands=None, decimal='.', encoding='iso-8859-1', displayed_only=False)

# Nuestro target es ramo 28 en tabla 4
tables[3]

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,,,,,,,,,,,,,,Anexo 1
1,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...,RAMO GENERAL 28: PARTICIPACIONES A ENTIDADES F...
2,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...,ESTIMACIÓN DEL FONDO GENERAL DE PARTICIPACIONE...
3,(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS),(PESOS)
4,,,,,,,,,,,,,,
5,,,,,,,,,,,,,,
6,E N T I D A D E S,ANUAL,ENERO,FEBRERO,MARZO,ABRIL,MAYO,JUNIO,JULIO,AGOSTO,SEPTIEMBRE,OCTUBRE,NOVIEMBRE,DICIEMBRE
7,,,,,,,,,,,,,,
8,,,,,,,,,,,,,,
9,,,,,,,,,,,,,,


Nos damos cuenta de que estos datos estan muy crudos. *Pero en vez de transformarlos ahora, los guardamos tal y como estan como tabla `raw`. dbt se encargara con las etapas de transformacion.*

In [4]:
# A los metadatos les agregamos el HTML crudo
dof_metadata["raw_html"] = dof_html

In [7]:
# Ejemplo de consulta en DuckDB
import duckdb

dof_28_2024 = tables[3]

duckdb.sql("select * from dof_28_2024")

┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────

### El write de los datos crudos

In [8]:
# Conectar a la base de datos DuckDB
con = duckdb.connect('duckdb_elt/dev.duckdb')

# Vamos a crear a dos tablas: una para los metadatos y otra para las tablas raw HTML
con.execute("""
    create table if not exists raw_dof_html_metadata (
        scrape_id integer primary key,
        url varchar,
        fetch_timestamp timestamp,
        html_size_bytes integer,
        status_code integer,
        encoding varchar,
        raw_html varchar
    )
""")

# Insertar metadatos
scrape_id = 1
con.execute("""
    insert into raw_dof_html_metadata
    values (?, ?, ?, ?, ?, ?, ?)
""", [
    scrape_id,
    dof_metadata['url'],
    dof_metadata['fetch_timestamp'],
    dof_metadata['html_size_bytes'],
    dof_metadata['status_code'],
    dof_metadata['encoding'],
    dof_metadata['raw_html']
])

# Crear tabla para las tablas HTML con clave foránea hacia los metadatos
con.execute("""
    create table if not exists raw_dof_html_table (
        scrape_id integer,
        table_index integer,
        data_row_index integer,
        table_data varchar,
        foreign key (scrape_id) references raw_dof_html_metadata (scrape_id)
    )
""")

# Insertar datos de la tabla[3]
table_index = 3
for idx, row in tables[3].iterrows():
    con.execute("""
        insert into raw_dof_html_table
        values (?, ?, ?, ?)
    """, [scrape_id, table_index, idx, row.to_json()])

con.close()

print(f"✓ Created tables and inserted {len(tables[3])} rows from table[{table_index}]")

✓ Created tables and inserted 47 rows from table[3]


## Diseno modular de dbt
![Ejemplo de modular data modeling](https://cdn.sanity.io/images/wl0ndo6t/main/8eee14ea7b11a571b3cc7e872c789d9747551995-1416x1020.png?fit=max&auto=format&w=3840&q=75)
- [Diseno de datos modulares en dbt](https://www.getdbt.com/blog/modular-data-modeling-techniques)

### Nuestra meta: `fct_dof_fondo_general`

#### Flujo de datos
```
raw_dof_html_metadata → stg_dof_html_metadata ↘
                                                int_dof_fondo_general → fct_dof_fondo_general
raw_dof_html_table → stg_dof_html_table       ↗
```

*A estos commands y files los puedes crear tu o procesar los cells abajo*

In [9]:
# Folders para los modelos dbt en el pipeline
! mkdir -p duckdb_elt/models/stg duckdb_elt/models/int duckdb_elt/models/marts

In [10]:
# Crear el modelo staging para metadatos HTML del DOF -- se puede hacer directamente en duckdbt_elt/models/stg/stg_dof_html_metadata.sql
stg_metadata_sql = """
{{ config(materialized='view') }}

/*
 * Modelo Staging: stg_dof_html_metadata
 * 
 * Limpia y estandariza los metadatos de los scrapes HTML del DOF.
 * Excluye el HTML crudo ya que no se necesita en los modelos downstream.
 */

select
    scrape_id,
    url,
    fetch_timestamp,
    html_size_bytes,
    status_code,
    encoding
from {{ source('raw', 'raw_dof_html_metadata') }}
"""

# Escribir el archivo
with open('duckdb_elt/models/stg/stg_dof_html_metadata.sql', 'w') as f:
    f.write(stg_metadata_sql)

print("✓ Creado: duckdb_elt/models/stg/stg_dof_html_metadata.sql")

✓ Creado: duckdb_elt/models/stg/stg_dof_html_metadata.sql


In [11]:
# Crear el modelo staging para las tablas HTML parseadas
stg_table_sql = """
{{ config(materialized='view') }}

/*
 * Modelo Staging: stg_dof_html_table
 * 
 * Parsea los datos JSON de las tablas HTML en columnas estructuradas.
 * Filtra las filas de encabezado, footer, y mantiene solo los datos de entidades.
 * Incluye pk (primary key) para rastreo.
 */

select
    -- Generar primary key único
    row_number() over (order by scrape_id, table_index, data_row_index) as pk,
    scrape_id,
    table_index,
    data_row_index,
    -- Parsear JSON y extraer columnas individuales
    json_extract_string(table_data, '$.0') as entidad,
    json_extract_string(table_data, '$.1') as anual,
    json_extract_string(table_data, '$.2') as enero,
    json_extract_string(table_data, '$.3') as febrero,
    json_extract_string(table_data, '$.4') as marzo,
    json_extract_string(table_data, '$.5') as abril,
    json_extract_string(table_data, '$.6') as mayo,
    json_extract_string(table_data, '$.7') as junio,
    json_extract_string(table_data, '$.8') as julio,
    json_extract_string(table_data, '$.9') as agosto,
    json_extract_string(table_data, '$.10') as septiembre,
    json_extract_string(table_data, '$.11') as octubre,
    json_extract_string(table_data, '$.12') as noviembre,
    json_extract_string(table_data, '$.13') as diciembre
from {{ source('raw', 'raw_dof_html_table') }}
where 
    -- Filtrar filas de encabezado (comienzan en row 12)
    data_row_index >= 12
    -- Filtrar filas con entidad nula
    and json_extract_string(table_data, '$.0') is not null
    -- Filtrar filas de footer/notas (después de Zacatecas en row 43)
    and data_row_index < 44
"""

# Escribir el archivo
with open('duckdb_elt/models/stg/stg_dof_html_table.sql', 'w') as f:
    f.write(stg_table_sql)

print("✓ Creado: duckdb_elt/models/stg/stg_dof_html_table.sql")

✓ Creado: duckdb_elt/models/stg/stg_dof_html_table.sql


In [12]:
# Crear el modelo intermedio que limpia valores numéricos
int_fondo_sql = """
{{ config(materialized='view') }}

/*
 * Modelo Intermedio: int_dof_fondo_general
 * 
 * Limpia los valores numéricos:
 * - Remueve separadores de miles (comas)
 * - Convierte strings a tipos numéricos (decimal)
 * - Agrega el año (2024 basado en el título de la tabla)
 * - Excluye filas de totales
 * - Mantiene pk para rastreo hacia la fuente
 */

with cleaned_data as (
    select
        pk,
        scrape_id,
        trim(entidad) as entidad,
        2024 as year,
        -- Remover comas y convertir a decimal
        cast(replace(anual, ',', '') as decimal(18,2)) as anual,
        cast(replace(enero, ',', '') as decimal(18,2)) as enero,
        cast(replace(febrero, ',', '') as decimal(18,2)) as febrero,
        cast(replace(marzo, ',', '') as decimal(18,2)) as marzo,
        cast(replace(abril, ',', '') as decimal(18,2)) as abril,
        cast(replace(mayo, ',', '') as decimal(18,2)) as mayo,
        cast(replace(junio, ',', '') as decimal(18,2)) as junio,
        cast(replace(julio, ',', '') as decimal(18,2)) as julio,
        cast(replace(agosto, ',', '') as decimal(18,2)) as agosto,
        cast(replace(septiembre, ',', '') as decimal(18,2)) as septiembre,
        cast(replace(octubre, ',', '') as decimal(18,2)) as octubre,
        cast(replace(noviembre, ',', '') as decimal(18,2)) as noviembre,
        cast(replace(diciembre, ',', '') as decimal(18,2)) as diciembre
    from {{ ref('stg_dof_html_table') }}
)

select *
from cleaned_data
where 
    -- Excluir la fila de totales
    entidad != 'T O T A L'
"""

# Escribir el archivo
with open('duckdb_elt/models/int/int_dof_fondo_general.sql', 'w') as f:
    f.write(int_fondo_sql)

print("✓ Creado: duckdb_elt/models/int/int_dof_fondo_general.sql")

✓ Creado: duckdb_elt/models/int/int_dof_fondo_general.sql


In [14]:
# Crear el modelo fact final para el Fondo General de Participaciones
# Versión simplificada: solo montos anuales
fct_fondo_sql = """
{{ config(materialized='table') }}

/*
 * Modelo Fact: fct_dof_fondo_general
 * 
 * Tabla de hechos final con el Fondo General de Participaciones del DOF.
 * Una fila por entidad con el monto anual total.
 * 
 * Columnas:
 *   pk: primary key único
 *   ano: año (2024)
 *   entidad: nombre del estado
 *   ramo: número de ramo (28)
 *   nombre_del_ramo: descripción del ramo
 *   fondo: nombre del fondo
 *   monto: cantidad anual en pesos
 *   source_id: FK a stg_dof_html_table.pk
 */

select
    row_number() over (order by entidad) as pk,
    year as ano,
    entidad,
    28 as ramo,
    'Participaciones a Entidades Federativas y Municipios' as nombre_del_ramo,
    'Fondo General de Participaciones' as fondo,
    anual as monto,
    pk as source_id
from {{ ref('int_dof_fondo_general') }}
order by entidad
"""

# Escribir el archivo
with open('duckdb_elt/models/marts/fct_dof_fondo_general.sql', 'w') as f:
    f.write(fct_fondo_sql)

print("✓ Creado: duckdb_elt/models/marts/fct_dof_fondo_general.sql")

✓ Creado: duckdb_elt/models/marts/fct_dof_fondo_general.sql


In [15]:
# Crear sources.yml con definiciones mínimas
sources_yml = """version: 2

sources:
  - name: raw
    schema: main
    tables:
      - name: raw_dof_html_metadata
      - name: raw_dof_html_table
"""

# Escribir el archivo
with open('duckdb_elt/models/sources.yml', 'w') as f:
    f.write(sources_yml)

print("✓ Creado: duckdb_elt/models/sources.yml")

✓ Creado: duckdb_elt/models/sources.yml


In [16]:
# Ejecutar dbt run para crear todos los modelos
! cd duckdb_elt && dbt run

[0m15:20:54  Running with dbt=1.10.16
[0m15:20:54  Registered adapter: duckdb=1.10.0
[0m15:20:54  Unable to do partial parsing because saved manifest not found. Starting full parse.
[0m15:20:55  Found 6 models, 4 data tests, 2 sources, 468 macros
[0m15:20:55  
[0m15:20:55  Concurrency: 1 threads (target='dev')
[0m15:20:55  
[0m15:20:55  1 of 6 START sql table model main.my_first_dbt_model ........................... [RUN]
[0m15:20:55  1 of 6 OK created sql table model main.my_first_dbt_model ...................... [[32mOK[0m in 0.04s]
[0m15:20:55  2 of 6 START sql view model main.stg_dof_html_metadata ......................... [RUN]
[0m15:20:55  2 of 6 OK created sql view model main.stg_dof_html_metadata .................... [[32mOK[0m in 0.02s]
[0m15:20:55  3 of 6 START sql view model main.stg_dof_html_table ............................ [RUN]
[0m15:20:55  3 of 6 OK created sql view model main.stg_dof_html_table ....................... [[32mOK[0m in 0.01s]
[0m15:20:

In [17]:
# Ver el resultado final: fct_dof_fondo_general
import duckdb
import pandas as pd

# Configurar pandas para mostrar números completos (no notación científica)
pd.options.display.float_format = '{:,.2f}'.format

con = duckdb.connect('duckdb_elt/dev.duckdb')
print("fct_dof_fondo_general - Primeras 15 entidades:")
print("="*100)
result = con.execute("""
    select pk, ano, entidad, ramo, monto, source_id
    from fct_dof_fondo_general 
    limit 15
""").df()
print(result.to_string(index=False))

con.close()
print("\n✓ Pipeline completado exitosamente!")

fct_dof_fondo_general - Primeras 15 entidades:
 pk  ano             entidad  ramo             monto  source_id
  1 2024      Aguascalientes    28  9,668,089,925.00          1
  2 2024     Baja California    28 27,109,173,605.00          2
  3 2024 Baja California Sur    28  6,040,705,796.00          3
  4 2024            Campeche    28  6,808,647,004.00          4
  5 2024             Chiapas    28 38,901,304,249.00          7
  6 2024           Chihuahua    28 28,042,550,950.00          8
  7 2024    Ciudad de México    28 90,361,115,952.00          9
  8 2024            Coahuila    28 21,616,443,135.00          5
  9 2024              Colima    28  5,316,472,837.00          6
 10 2024             Durango    28 11,793,894,443.00         10
 11 2024          Guanajuato    28 41,591,188,346.00         11
 12 2024            Guerrero    28 22,042,413,269.00         12
 13 2024             Hidalgo    28 20,058,464,217.00         13
 14 2024             Jalisco    28 62,048,882,718.00     

## Que puede seguir?
- Establecer tests y reglas automizados via [schema yml files](https://docs.getdbt.com/reference/resource-configs/schema) para el proyecto y los modelos individualmente.
- Integrar al base existente de [postgres](https://docs.getdbt.com/docs/core/connect-data-platform/postgres-setup), por ejemplo con mas fuentes raw via la infrastructura en [Airflow](https://docs.getdbt.com/guides/airflow-and-dbt-cloud?step=1).
- Establecer un area de marts `feature`, que contienen [features stores](https://github.com/fal-ai/dbt_feature_store) para los proyectos de AM/IA.
- Otros ideas? Sugerencias? Dudas?