# Propuesta ETL – Visitas Web

## Metodología
El proceso ETL propuesto seguirá un enfoque DataOps, con fases que delimitan las fases de desarrollar, probar y operar los flujos de datos de manera controlada y confiable:

| Fase   | Actividades clave | Alcance |
|--------|-----------------|---------|
| Plan   | Definir requerimientos, KPIs y diseño del pipeline, incluyendo arquitectura, puntos de control, lineamientos y políticas de seguridad y respaldo. | Dentro del alcance |
| Develop | Construir los flujos ETL de manera modular a nivel de código | De forma conceptual se describe la estructura de los directorios y los componentes principales del sistema |
| Test   | Ejecutar pruebas unitarias y de datos para garantizar la calidad y el cumplimiento de reglas de negocio. | De forma conceptual se describe el entorno y el entregable de la etapa |
| Deploy | Promocionar el repositorio de código con el pipeline final y documentación completa. | De forma conceptual se describe el proceso |
| Operate | Monitorear la ejecución diaria y asegurar la observabilidad. | De forma conceptual se describe el proceso |

---

## Contexto y Necesidad
Actualmente, los datos de visitas web se almacenan en archivos planos de forma diaria en un directorio predeterminado dentro del servidor SFTP remoto.  
Cada archivo TXT almacena eventos diarios de interacción de usuarios en un sitio web, donde cada registro corresponde a un usuario identificado de forma única a través de su email.  

Los datos se estructuran en las siguientes categorías conceptuales:
- Estado de validez o acceso
- Tiempos de interacción
- Métricas de engagement
- Detalles técnicos y contexto de acceso
- Identificadores internos

Estos datos permiten analizar el comportamiento y nivel de interacción de los usuarios con fines de monitoreo, segmentación y análisis de engagement.

---

## Objetivo del Proyecto
Diseñar y construir un proceso ETL diario que:
- Descargue los archivos `.txt` de visitas web desde el servidor SFTP.
- Valide su estructura y consistencia antes de iniciar el procesamiento.
- Cargue los registros válidos en las tablas correspondientes de la base de datos.
- Almacene los registros inválidos en tablas específicas de errores.
- Ejecute puntos de control en etapas críticas del flujo para garantizar trazabilidad y recuperación ante fallos.
- Genere respaldo diario de los archivos procesados en el servidor de aplicaciones ETL.
- Registre el linaje de los datos y documente cada checkpoint.
- Permita monitoreo y alertas automáticas.

---

## Requerimientos Funcionales

### Validación de estructura y layout
Se realizó un perfilamiento de los datos para comprender la naturaleza de estos y así poder extraer los patrones en las distintas columnas. Con esta información se definió el layout del archivo config/layout.json, que servirá como guía para crear los schemas de Pydantic y todas las reglas de validación y transformación necesarias.  

| Campo            | Tipo esperado | Obligatorio | Validaciones clave |
|-----------------|---------------|------------|------------------|
| email           | string        | ✅         | Formato válido de correo electrónico |
| jk              | string        | ✅         | Debe existir la columna, valor puede ser vacío |
| Badmail         | string        | ✅         | Valores esperados: HARD y vacío |
| Baja            | string        | ✅         | Valores esperados: SI y vacío |
| Fecha envío     | datetime      | ✅         | Formato dd/mm/yyyy hh:mm |
| Fecha open      | datetime      | ✅         | Formato dd/mm/yyyy hh:mm (Puede contener -) |
| Opens           | integer       | ✅         | ≥ 0 |
| Opens virales   | integer       | ✅         | ≥ 0 |
| Fecha click     | datetime      | ✅         | Formato dd/mm/yyyy hh:mm (Puede contener -) |
| Clicks          | integer       | ✅         | ≥ 0 |
| Clicks virales  | integer       | ✅         | ≥ 0 |
| Links           | string        | ✅         | Puede contener - |
| IPs             | string        | ✅         | Formato n.n.n.n, cada n entre 0 y 255 (Puede contener -) |
| Navegadores     | string        | ✅         | Puede estar vacío o contener - |
| Plataformas     | string        | ✅         | Puede estar vacío |

---

### Definición de formato del archivo
- Separador: coma (,)
- Codificación: UTF-8
- Cabecera: incluida
- Delimitadores de texto: ninguno o comillas dobles (")
- Fin de línea: `\n`

---

## Extracción
- Conexión vía SFTP al servidor `8.8.8.8`, directorio `/home/vinkOS/archivosVisitas`.
- Descarga diaria de todos los archivos `.txt` no procesados previamente.
- Verificación de duplicados mediante bitácora diaria.
- Checksum de cada archivo para garantizar integridad.
- Reintentos automáticos 3 veces ante fallo de conexión.
- Registro en bitácora en el orquetador: nombre de archivo, tamaño, fecha de descarga.

---

## Transformación

### Validación de layout y columnas
- 15 columnas esperadas con nombres exactos.

### Validación de contenido
- Se aplican las reglas y formatos definidos en la tabla del layout, asegurando consistencia en tipos de datos, obligatoriedad y valores permitidos.
- Control de versiones de esquemas mediante Pydantic.
- Punto de control: staging contiene solo registros válidos; errores en tabla errores.

### Políticas de transformación

#### 1. Política de Nulos y Manejo de Ausencias (-)
| Campo(s) afectado(s) | Valor Origen | Acción de Transformación | Justificación |
|----------------------|--------------|-------------------------|---------------|
| Contadores (Opens, Opens virales, Clicks, Clicks virales) | - | Se coerciona a 0 (cero) y se convierte a INTEGER | Garantiza que las métricas numéricas se calculen correctamente con valores ≥ 0 |
| Fechas opcionales (Fecha open, Fecha click) | - | Se convierte a NULL y se aplica formato DATETIME | Valor estándar para datos no presentes en campos de fecha opcionales |
| Campos de texto o bandera (Badmail, Baja, Links, IPs, Navegadores, Plataformas, jk) | - | Se convierte a NULL | Normalización de valores ausentes para la carga en estadística |
| Campos obligatorios (email, Fecha envío) | - o formato inválido | Rechazo del registro completo y envío a la tabla errores | Registro no procesable por incumplir estructura o formato mínimo |

#### 2. Política de Normalización y Formato
- Email: conversión a minúsculas.
- Fechas: dd/mm/yyyy HH:mm → YYYY-MM-DD HH:MM:SS.
- IPS: formato de ip n.n.n.n
- Banderas: normalización de valores categóricos.

#### 3. Política de Unicidad y Precedencia
- Criterio de duplicado: mismo email y mismos valores en todos los campos.
- Regla de resolución: conservar registro con Fecha envío más reciente.

#### 4. Política de Integridad de Negocio (Consistencia Temporal)
- `Fecha open` ≥ `Fecha envío`.
- `Fecha click` ≥ `Fecha open` y ≤ `Fecha envío`.
- Violaciones → registro en tabla de errores.

---

## Carga
- Destino final: MySQL (`visitante`, `estadística`, `errores`).
- Estrategia:
  - `visitante` y `estadística`: incremental.
  - `errores`: append.
- Validaciones post-carga: registros cargados coinciden con staging.
- Backup en zip: `/home/etl/visitas/bckp`.
- Eliminación de archivos originales tras backup.

---

## Operación y Monitoreo
- DAG en Airflow con logging detallado.
- Métricas y alertas con Prometheus/Grafana.
- Linaje de datos documentado con OpenLineage.
- Alertas automáticas vía correo o Slack en caso de fallo crítico.

---

## Requisitos Regulatorios
- Emails cifrados en tránsito y reposo (AES256).
- Archivos originales conservados 6 meses.
- Documentación de linaje completo para auditoría.

---

## Calidad de Datos
- Validación de layout, columnas, emails, fechas y valores numéricos.
- Eliminación de duplicados dentro de cada archivo.
- Registro de errores en tabla `errores`.
- Bitácora diaria y mensual en el orquetador.

---

## Seguridad y Privacidad
- Acceso controlado a SFTP y MySQL mediante credenciales.
- Cifrado TLS/AES256.

---

## Integración de Datos

### Clave de Integración Principal
- Email → clave única para determinar si crear o actualizar registro.

### Tablas de destino

#### 1. Tabla `visitante`


| Campo DB | Campo Origen | Lógica de Mapeo y Actualización |
|----------|--------------|--------------------------------|
| email | email | Clave única, minúsculas |
| fechaPrimeraVisita | Fecha envío | Inserción inicial |
| fechaUltimaVisita | Fecha envío | Actualización con registro procesado |
| visitasTotales | Conteo interno | +1 por registro procesado |
| visitasAnioActual | Conteo interno | +1, reinicia si cambia año |
| visitasMesActual | Conteo interno | +1, reinicia si cambia mes |

#### 2. Tabla `estadística`
Cada registro válido se inserta como nueva fila.


La tabla de `estadística`, los nombres originales del TXT contienen **acentos, espacios y mezcla de mayúsculas y minúsculas**, por lo que no cumplen con un estándar SQL. Para el ETL se asume que la tabla destino en SQL sigue un **formato `snake_case`**, y se realiza el mapeo de los campos del TXT a la tabla SQL como se indica a continuación:

| Campo TXT      | Campo SQL      |
| -------------- | -------------- |
| email          | email          |
| jyv            | jyv            |
| Badmail        | badmail        |
| Baja           | baja           |
| Fecha envío    | fecha_envio    |
| Fecha open     | fecha_open     |
| Opens          | opens          |
| Opens virales  | opens_virales  |
| Fecha click    | fecha_click    |
| Clicks         | clicks         |
| Clicks virales | clicks_virales |
| Links          | links          |
| IPs            | ips            |
| Navegadores    | navegadores    |
| Plataformas    | plataformas    |

#### 3. Tabla `errores`
- Agregar registros con información de origen como nuevo registro (append).

---

## Archivado y Linaje
- Backup diario zip: `/home/etl/visitas/bckp`.
- Bitácora mensual con archivos procesados y errores.
- Linaje documentado: archivo origen → transformaciones → tabla destino.

---

## Latencia
- ETL diario ejecutado fuera de horario pico.
- Tiempo estimado: < 1 hora desde extracción hasta carga y backup.

---

## KPIs
- Archivos procesados vs recibidos.
- Registros válidos vs registros con errores.
- Tiempo de ejecución por etapa.
- Alertas generadas y resueltas.

---

## Licencias heredadas
- No se exigen tecnologías específicas.

---

## Validaciones y Checkpoints ETL

### Checkpoint 1: Extracción y Validación de Fuente Cruda
- Etapa: Extracción
- Transformaciones: Ninguna
- Validaciones:
  - Completitud y linaje: existencia, nombre, extensión, tamaño >0, hash distinto.
  - Exactitud: columnas correctas (15), nombre y orden, layout versionado.
  - Archivos con error → `/errores/layout`
- Herramientas: Pandas, Pandera, Pydantic.

### Checkpoint 2: Limpieza, Conversión y Reglas de Negocio
- Etapa: Transformación (Staging)
- Limpieza:
  - Guion (-) → NULL
  - Conversión string → INTEGER/DATETIME
  - Email a minúsculas
  - Encoding UTF-8
- Reglas de negocio:
  - Deduplicación por email (fecha envío más reciente)
  - Normalización flags categóricos
  - Cálculo métricas incrementales
- Validaciones:
  - Registro a registro: Pydantic
  - Dataset completo: Pandera
  - Great Expectations para métricas
- Integridad:
  - Consistencia temporal
  - Reglas de negocio, valores permitidos, unicidad

### Checkpoint 3: Carga, Post-Carga y Operación
- Etapa: Carga
- Pre-Carga:
  - Validar el esquema con el de la DB
  - Ajustar fechas al formato SQL
- Validaciones:
  - Conteos registros válidos vs errores
  - Verificación de integridad de staging
  - Backup antes de carga
- Post-Carga y Operación:
  - Comparar conteos staging vs destino
  - Checksum final


# Diseño



## **Arquitectura**

<img src="../images/Arquitectura.jpg" alt="Descripción de la imagen">

---

## **Stack tecnologico**


* **Archivos TXT en servidor SFTP:** fuente de datos cruda.
* **Python:** motor de transformación y carga de datos.
* **Pandas:** manipulación y limpieza de datos.
* **Pydantic:** validación de esquema y tipos de datos.
* **Great Expectations:** pruebas de calidad y consistencia de datos.
* **Airflow:** orquestador del pipeline ETL.
* **Prometheus:** recopilación de métricas del proceso.
* **Grafana:** visualización de métricas y dashboards.
* **Sentry:** monitoreo de errores y alertas.
* **Slack:** notificaciones automáticas.
* **OpenLineage:** documentación y seguimiento del linaje de datos.

---




### **Etapa de Desarrollo**

En esta etapa realizamos las siguientes actividades:

* Se implementa de manera modular el pipeline ETL utilizando **Python** y **Airflow**.
* Se crea el **repositorio en GitHub** con la estructura base del proyecto:

  * `/dags`: definición de flujos y orquestación.
  * `/modules`: módulos de transformación y carga.
  * `/schemas`: validación con **Pydantic**.
  * `/expectations`: reglas y controles con **Great Expectations**.
  * `/integrations`: conexión con **Sentry**, **Slack** y **OpenLineage**.
  * `/configs`: variables de entorno y configuraciones generales.
* Se generan **DAGs y checkpoints** para validar integridad, consistencia y completitud del flujo.
* Se configuran las integraciones con **Sentry** para la gestión de errores y con **Slack** para notificaciones automáticas.
* Se habilita **OpenLineage** para registrar el linaje de datos y visualizarlo en **Marqués**.
* Se definen las **variables de entorno**, configuraciones de ejecución y contenedores de los servicios del pipeline.

---

### **Etapa de Pruebas**

En esta etapa realizamos las siguientes actividades:


* Se desarrollan **pruebas unitarias** sobre cada módulo en Python para garantizar su correcto funcionamiento.
* Se ejecutan **pruebas de datos** con **Pydantic** y **Great Expectations** para validar estructura, tipos de datos y cumplimiento de reglas de negocio.
* Se comprueba la **coherencia temporal**, detección de duplicados y consistencia general de los registros.
* Se simulan **errores controlados** para validar alertas y notificaciones en **Sentry** y **Slack**.
* Se generan **métricas de calidad y estabilidad** que permiten asegurar la confiabilidad del pipeline antes del paso a producción.

---

### **Etapa de Despliegue y Operación**

* Se define la **estrategia de despliegue** utilizando **Google Cloud Platform (GCP)**.
* Los servicios se ejecutan en **contenedores Docker**, garantizando portabilidad y aislamiento.
* El **aprovisionamiento** de recursos, redes y bases de datos se automatiza con **Terraform**.
* Se establece un flujo de **integración y despliegue continuo (CI/CD)** mediante **GitHub Actions**, incluyendo pruebas automáticas, construcción de imágenes y promoción entre entornos.
* El **monitoreo de ejecución** se gestiona con **Prometheus** y **Grafana**, mostrando métricas de desempeño, uso de recursos y cumplimiento de tiempos de carga.
* Las **alertas de incidentes** se canalizan a **Sentry** y **Slack**, con clasificación por severidad y prioridad de atención.
* Se registra el **linaje de datos** con **OpenLineage**, consultable en **Marqués**, para facilitar la trazabilidad y auditoría de los flujos.
* Se implementan **respaldos automáticos** de logs y artefactos críticos para asegurar la recuperación y continuidad del servicio.

---

### **Paso a Producción**

Para el paso a producción se debe realizar las siguientes actividades:

* Se documenta el proceso completo del pipeline, sus dependencias, puntos de control y variables configurables.
* Se definen los **criterios de aceptación** que deben cumplirse antes de liberar el flujo, incluyendo resultados de pruebas unitarias, métricas de calidad y estabilidad de los DAGs.
* Se valida el cumplimiento de **políticas de seguridad, acceso y respaldo**, asegurando que los datos sensibles estén encriptados y los permisos correctamente asignados.
* Se crea el plan de despriegus con su politica de rollback
* Se ejecuta un **despliegue controlado en ambiente de staging**, validando logs, métricas y comportamiento de las alertas antes de su promoción final a producción.

---

### **Consideraciones para el equipo de Operación**

Se valida con el equipo de operaciones que tenga acceso y visibilidad a las herramientas necesarias para monitorear el pipeline y atender incidentes.

* El equipo operativo cuenta con acceso a los **tableros de Prometheus y Grafana** para visualizar métricas de ejecución, latencia, volumen de datos y fallos en los DAGs.
* Las **alertas y notificaciones** se reciben directamente en los canales designados de **Slack**, con detalle del DAG afectado, hora del incidente y log asociado.
* **Sentry** centraliza los errores y excepciones del sistema, indicando el módulo afectado, tipo de fallo y nivel de severidad.
* **Marqués** provee visibilidad del linaje de datos, permitiendo rastrear la procedencia, transformaciones y destino final de cada conjunto de datos.
* El equipo operativo dispone de una **guía de acción ante incidentes**, que incluye pasos para:

  * Verificar estado del DAG y último checkpoint ejecutado.
  * Analizar logs en Airflow y Sentry.
  * Escalar el incidente al desarrollador responsable si requiere ajuste de código o reconfiguración.
* Se realizan **revisiones periódicas** de métricas, alertas y tendencias de fallos para identificar oportunidades de mejora en la operación.

---