# Preview ETL — Análisis exploratorio y diseño del pipeline

Este cuaderno documenta el **análisis exploratorio** de los datos provenientes de NewsAPI y cómo se diseñó la **ETL (Extract → Transform → Load)** del proyecto. 

**Objetivos:**
- Validar la **extracción** desde la API con queries reales.
- Entender cómo vienen los **datos crudos** (nulos, duplicados, formatos, columnas).
- Aplicar las **transformaciones** usadas en el proyecto (normalización, limpieza, filtros, deduplicación).
- Simular la **carga** a base de datos y revisar la política de *upsert* por `url_hash`.

> **Nota:** Este notebook está pensado como documento demostrativo. No publica credenciales ni las requiere para ejecutarse en modo "preview".


## 1. Parámetros y configuración
Leemos variables de entorno (sin exponerlas en el cuaderno) y preparamos imports. No importamos el módulo de `settings` para evitar fallas si faltan secrets; en su lugar usamos `os.getenv` de forma segura.


In [None]:
import os, sys, json, pandas as pd
from datetime import datetime, timedelta, timezone

# Asegurar que el proyecto esté en el path
PROJECT_ROOT = '.'
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

# Lectura segura de variables de entorno
NEWSAPI_KEY = os.getenv('NEWSAPI_KEY')
API_URL = os.getenv('API_URL', 'https://newsapi.org/v2/everything')
DATABASE_URL = os.getenv('DATABASE_URL')  # puede ser None durante el preview

print('API_URL =', API_URL)
print('NEWSAPI_KEY presente =', bool(NEWSAPI_KEY))
print('DATABASE_URL presente =', bool(DATABASE_URL))

## 2. Construcción de la query (desde BBDD)
En el pipeline de producción, las *keywords* se obtienen desde BBDD mediante `build_q_from_db(engine)`. Aquí intentamos usarla si hay `DATABASE_URL`; si no, usamos un **fallback** estático para la demo.


In [None]:
queries = None
try:
    from src.repositories.db import init_engine
    from src.utils.query_builder import build_q_from_db
    
    if DATABASE_URL:
        engine = init_engine(DATABASE_URL)
        queries = build_q_from_db(engine)
        print('Queries desde BBDD:', queries)
    else:
        print('DATABASE_URL no disponible: usaremos fallback local de queries.')
except Exception as e:
    print('No se pudo construir queries desde BBDD:', str(e))

if not queries:
    # Fallback para la exploración local
    queries = '(artificial intelligence OR machine learning) AND (marketing OR growth)'
    print('Queries (fallback):', queries)

## 3. Extracción de datos (Extract)
Usamos el servicio `fetch_ai_marketing_news(api_url, params)` que centraliza el acceso a la API y normaliza la respuesta a un `DataFrame` (cuando hay artículos). 

**Campos típicos de la respuesta cruda** (a través de `pd.json_normalize`):
- `author`, `title`, `description`, `url`, `urlToImage`, `publishedAt`, `content`
- `source.id` → `source_id`, `source.name` → `source_name`

> Para evitar exponer la API key, esta celda **no se ejecutará automáticamente** aquí. Puedes activar la extracción definiendo `DO_EXTRACT=True` y asegurándote de tener `NEWSAPI_KEY` en tu entorno.


In [None]:
from typing import Optional, Tuple
from pprint import pprint

DO_EXTRACT = False  # cambia a True si quieres probar la llamada real (requiere NEWSAPI_KEY)

df_raw: Optional[pd.DataFrame] = None
meta: dict = {}

if DO_EXTRACT:
    from src.services.fetch_service import fetch_ai_marketing_news
    
    now = datetime.now(timezone.utc)
    days_back = 7
    frm = (now - timedelta(days=days_back)).isoformat(timespec='seconds')
    to  = now.isoformat(timespec='seconds')
    
    params = {
        'apiKey': NEWSAPI_KEY,
        'q': queries,
        'page': 1,
        'pageSize': 100,
        'sortBy': 'publishedAt',
        'from': frm,
        'to': to,
        'language': 'en'
    }
    
    df_raw, meta = fetch_ai_marketing_news(api_url=API_URL, params=params)
    print('Metadatos de la respuesta:')
    pprint(meta)
    
    if df_raw is not None and not df_raw.empty:
        display(df_raw.head(3))
        print('\nColumnas crudas:', list(df_raw.columns))
    else:
        print('Sin artículos en la respuesta o DataFrame vacío.')
else:
    print('Extracción desactivada (DO_EXTRACT=False).')

## 4. Transformación de datos (Transform)
Aplicamos las transformaciones del proyecto:
- `clean_raw_data`: normaliza columnas, rellena valores por defecto, convierte `publishedAt` a UTC y genera `url_hash` a partir de la URL normalizada.
- `filter_by_min_length`: descarta artículos con contenido demasiado corto (según `content` + `"[+XXXX chars]"`).

Mostramos comparación **antes vs después** y el esquema resultante.


In [None]:
from src.services.clean_service import clean_raw_data, filter_by_min_length

# Para poder ejecutar la sección sin API, sintetizamos un pequeño df_raw si no hay extracción
if 'df_raw' not in globals() or df_raw is None:
    df_raw = pd.DataFrame([
        {
            'author': 'Alice', 'title': 'AI boosts marketing', 'description': 'Short desc',
            'url': 'https://example.com/article?utm_source=test', 'urlToImage': None,
            'publishedAt': datetime.now(timezone.utc).isoformat(), 'content': 'Some content [+1200 chars]',
            'source_id': 'ex', 'source_name': 'Example Source'
        },
        {
            'author': None, 'title': 'ML in growth', 'description': 'Desc 2',
            'url': 'https://example.com/article', 'urlToImage': 'https://example.com/img.jpg',
            'publishedAt': datetime.now(timezone.utc).isoformat(), 'content': 'Tiny',
            'source_id': None, 'source_name': 'Example Source'
        }
    ])

n_before = 0 if df_raw is None else len(df_raw)
df_clean = clean_raw_data(df_raw)
df_clean = filter_by_min_length(df_clean, min_total_chars=800)
n_after = 0 if df_clean is None else len(df_clean)

print(f'Registros antes: {n_before}  |  después de limpieza+filtro: {n_after}')
display(df_clean.head(3))
print('\nColumnas finales:', list(df_clean.columns))
print('\nDTypes:')
print(df_clean.dtypes)

## 5. Carga (Load) — *Upsert* en BBDD
La carga en producción se hace con `upsert_news_bulk(engine, df)`, que realiza un **upsert** por `url_hash` para evitar duplicados.

En este cuaderno lo dejamos **opcional** para no requerir una BBDD real. Si defines `DO_LOAD=True` y tienes `DATABASE_URL`, se hará una inserción de prueba.


In [None]:
DO_LOAD = False  # cambia a True si quieres probar la inserción en BBDD
inserted = 0

if DO_LOAD and DATABASE_URL:
    try:
        from src.repositories.db import init_engine
        from src.repositories.news import upsert_news_bulk
        engine = init_engine(DATABASE_URL)
        inserted = upsert_news_bulk(engine, df_clean)
        print('Filas insertadas/actualizadas:', inserted)
    except Exception as e:
        print('Fallo en carga:', str(e))
else:
    print('Carga desactivada o DATABASE_URL no disponible.')

## 6. Métricas rápidas y validaciones
- Duplicados por `url_hash` tras limpieza: deberían ser 0.
- Porcentaje de registros filtrados por longitud mínima.


In [None]:
if df_clean is not None and not df_clean.empty:
    dup = df_clean['url_hash'].duplicated().sum()
    pct_filtered = None
    if 'n_before' in globals() and n_before:
        pct_filtered = round((n_before - n_after) / n_before * 100, 2)
    print('Duplicados por url_hash:', dup)
    print('Filtrados por longitud (%):', pct_filtered)
else:
    print('No hay datos limpios para calcular métricas.')

## 7. Conclusiones y siguientes pasos
**Lo aprendido / validado:**
- La API devuelve campos suficientes para construir un `news` unificado.
- La normalización corrige `author` vacíos, convierte fechas a UTC y deduplica por `url_hash`.
- El filtro por longitud reduce ruido de *snippets* demasiado breves.

**Mejoras posibles:**
- Afinar el *scoring* de relevancia (ej. ponderar por fuente o presencia de palabras clave).
- Añadir validaciones de URL (status 200 de `url_to_image`) si la latencia lo permite.
- Enriquecer con NER/sentiment si el caso de uso lo requiere.

**Pruebas reproducibles:**
- El bloque de *Extract* puede activarse con `DO_EXTRACT=True` si hay `NEWSAPI_KEY`.
- El bloque de *Load* puede activarse con `DO_LOAD=True` si hay `DATABASE_URL`.

_Generado/actualizado: 2025-08-09 07:51 UTC_