# Fase 1: Ingesta y Normalización de Datos para Sistema RAG

## 1. Introducción
Este notebook aborda la primera etapa del flujo RAG (Retrieval-Augmented Generation). El objetivo es analizar la estructura de los datos y construir la base de conocimiento que permitirá al sistema verificar afirmaciones posteriormente.

## 2. Objetivos Técnicos
De acuerdo con las necesidades de estructuración de datos no estructurados, realizaremos las siguientes tareas:

1.  **Auditoría de Estructura de Datos:** Muestreo y análisis de los archivos JSON para determinar el esquema de los datos (detectando la prevalencia del estándar **JSON-LD** frente a estructuras planas).
2.  **Definición de Estrategia de Extracción:** Implementación de lógica condicional para extraer el contenido más rico disponible:
    * Prioridad al cuerpo de la noticia (`articleBody`).
    * Uso de descripciones (`description`) para aquellas noticias que no tengan cuerpo explicito.
3.  **Filtrado de Calidad (Data Cleaning):** Detección y descarte automático de registros corruptos o noticias vacías (sin contenido textual útil) para optimizar el almacenamiento.
4.  **Normalización del Texto:** Decodificación de entidades HTML y limpieza de caracteres especiales.
5.  **Indexación Masiva:** Carga eficiente en **Elasticsearch**.

In [1]:
import json
import gzip
import html
import time
from elasticsearch import Elasticsearch, helpers
from tqdm import tqdm 

# --- CONFIGURACIÓN DEL SERVIDOR ---
# Conectamos al puerto 9250 donde hemos desplegado Elasticsearch
ES_HOST = "http://localhost:9250"
INDEX_NAME = "noticias_tfg"
FILE_PATH = "/home/javierruiz/news-2024-2025.jsonl.gz"

print(f"Conectando a Elasticsearch en {ES_HOST}...")
es = Elasticsearch(ES_HOST) # Conexión a Elasticsearch

if es.ping(): # Hacemos ping para verificar la conexión
    print(f"Conexión exitosa Versión: {es.info()['version']['number']}")
else:
    print("Error: No se puede conectar a Elasticsearch.")

Conectando a Elasticsearch en http://localhost:9250...
Conexión exitosa Versión: 8.12.0


In [None]:
import gzip
import json

FILE_PATH = "/home/javierruiz/news-2024-2025.jsonl.gz"

print("AUDITORÍA DE DATOS: Estructura de Fuentes y Cuerpos")
print("\n")

contadores = {
    # FUENTES
    "source_jsonld_dict": 0, # Lo que esperamos 
    "source_otros": 0,       # Cualquier otra cosa rara
    
    # CUERPOS
    "body_en_jsonld": 0,      # Texto largo en jsonld
    "body_en_root": 0,        # Texto largo en la raíz
    "desc_en_jsonld": 0,      # No hay cuerpo, pero hay descripción en jsonld
    "VACIO_TOTAL": 0          # Noticias sin texto ni descripción
}

with gzip.open(FILE_PATH, 'rt', encoding='utf-8') as f:
    for i, line in enumerate(f):
        try:
            doc = json.loads(line)
            jsonld = doc.get("jsonld", {})
            if jsonld is None: jsonld = {}
            
            # --- 1. AUDITORÍA DE FUENTE ---
            pub = jsonld.get("publisher")
            if isinstance(pub, dict) and "name" in pub:
                contadores["source_jsonld_dict"] += 1
            else:
                contadores["source_otros"] += 1

            # --- 2. AUDITORÍA DE CUERPO ---
            # Vamos a ver dónde encontramos texto por orden de preferencia
            has_body_jsonld = bool(jsonld.get("articleBody"))
            has_body_root = bool(doc.get("articleBody"))
            has_desc_jsonld = bool(jsonld.get("description"))
            
            if has_body_jsonld:
                contadores["body_en_jsonld"] += 1
            elif has_body_root:
                contadores["body_en_root"] += 1
            elif has_desc_jsonld:
                contadores["desc_en_jsonld"] += 1
            else:
                contadores["VACIO_TOTAL"] += 1

        except Exception:
            continue

print("\n RESULTADOS DEL ANÁLISIS:")
print(f"Total analizado: {i} noticias")
print("-" * 30)
print("FUENTES:")
print(f"JsonLD Dict (Caso A): {contadores['source_jsonld_dict']} ({(contadores['source_jsonld_dict']/i)*100:.1f}%)")
print(f"Otros formatos:       {contadores['source_otros']}")
print("-" * 30)
print("CUERPOS (CONTENIDO):")
print(f"Body en JsonLD: {contadores['body_en_jsonld']}")
print(f"Desc en JsonLD: {contadores['desc_en_jsonld']}")
print(f"Body en Root: {contadores['body_en_root']}")
print(f"VACÍOS: {contadores['VACIO_TOTAL']}")

AUDITORÍA DE DATOS: Estructura de Fuentes y Cuerpos



 RESULTADOS DEL ANÁLISIS:
Total analizado: 804083 noticias
------------------------------
FUENTES:
JsonLD Dict (Caso A): 502715 (62.5%)
Otros formatos:       19101
------------------------------
CUERPOS (CONTENIDO):
Body en JsonLD: 288748
Desc en JsonLD: 202208
Body en Root: 0
VACÍOS: 30860


In [5]:
import gzip
import json
import html
from elasticsearch import helpers
from tqdm import tqdm

def clean_text(text):
    """
    Normaliza el texto: decodifica HTML y elimina espacios extra.
    """
    if not text:
        return ""
    text = html.unescape(text)
    return text.strip()

def generate_actions():
    """
    Generador: Basado en auditoría de datos para coger todo lo útil y descartar lo que no aporta. (no buscamos en la raíz, solo en JsonLD, y priorizamos articleBody sobre description)
    """
    total_lines = 804084 
    
    with gzip.open(FILE_PATH, 'rt', encoding='utf-8') as f:
        for line in tqdm(f, total=total_lines, desc="Ingesta Optimizada"):
            try:
                # 1. Parseo básico
                doc = json.loads(line)
                
                # 2. Filtro Maestro: Si no hay jsonld, no hay noticia válida (según auditoría)
                jsonld = doc.get("jsonld")
                if not jsonld: 
                    continue 
                
                # 3. Extracción de CONTENIDO 
                # Prioridad: articleBody (Noticia estándar) > description (Video/Galería)
                raw_body = jsonld.get("articleBody")
                if not raw_body:
                    raw_body = jsonld.get("description")
                
                # Si tras esto no hay cuerpo, es una de las 30k vacías -> LA DESCARTAMOS
                if not raw_body:
                    continue

                # 4. Extracción de FUENTE 
                publisher = jsonld.get("publisher", {})
                
                if isinstance(publisher, dict):
                    source_name = publisher.get("name", "Desconocido")
                else:
                    source_name = "Desconocido"
                
                # 5. Metadatos (Títulos y Fechas sí suelen estar en la raíz del objeto JSON)
                title = doc.get("headline", "")
                date = doc.get("published_date", "")
                url = doc.get("url", "")
                
                # --- CREACIÓN DEL DOCUMENTO ---
                processed_doc = {
                    "_index": INDEX_NAME,
                    "_source": {
                        "title": clean_text(title),
                        "body": clean_text(raw_body),
                        "date": date,
                        "url": url,
                        "source": clean_text(source_name)
                    }
                }
                
                yield processed_doc 

            except json.JSONDecodeError:
                continue 
            except Exception:
                continue

In [None]:
# --- CELDA DE VERIFICACIÓN ---
print("Comprobando extracción de datos en la primera noticia válida...")

# Usamos el generador para sacar solo el primer elemento
try:
    first_doc = next(generate_actions())
    print("\nAsí se guardará la primera noticia:")
    print(json.dumps(first_doc["_source"], indent=2, ensure_ascii=False))
    
    if first_doc["_source"]["source"] == "Desconocido":
        print("\nOJO: La fuente sigue saliendo 'Desconocido'. Revisa el código.")
    else:
        print(f"\nFuente detectada: {first_doc['_source']['source']}")
        print("El cuerpo de la noticia empieza por:", first_doc["_source"]["body"][:100], "...")

except StopIteration:
    print("No se pudieron leer noticias.")

Comprobando extracción de datos en la primera noticia válida...


Ingesta Optimizada:   0%|          | 4/804084 [00:00<17:37, 760.22it/s]


Así se guardará la primera noticia:
{
  "title": "Lara Hernández y Carlos Martín: “Ni Iglesias ni Belarra ni Sumar, la gente pondrá las condiciones para la unidad”",
  "body": "Lara Hernández (Madrid, 1986) y Carlos Martín Urriza (Madrid, 1968) acaban de salir elegidos como nuevos coordinadores generales de Movimiento Sumar, el proyecto que puso en marcha como proyecto amplio la vicepresidenta segunda del Gobierno, Yolanda Díaz, y que este fin de semana ha culminado su transformación en una formación tradicional. Ambos rehúyen la idea de colocar a este partido como “uno más” en el cosmos de la izquierda y prefieren definirlo como un movimiento que apela a la sociedad civil, a las organizaciones sociales y también al resto de fuerzas políticas. No creen que esa mutación orgánica haya terminado con la hipótesis inicial del lanzamiento de Sumar. Una de las principales tareas que se han puesto como objetivo será volver a coser a la coalición y recomponer las relaciones con Podemos, ahora 




In [None]:
# 1. REINICIO DEL ÍNDICE
if es.indices.exists(index=INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
    print(f" Índice anterior '{INDEX_NAME}' eliminado.")

# 2. CONFIGURACIÓN 
settings = {
    "mappings": { # los mappings definen la estructura de los documentos y cómo se indexan
        "properties": {
            "title": {"type": "text", "analyzer": "spanish"},
            "body":  {"type": "text", "analyzer": "spanish"},
            "date":  {"type": "date", "ignore_malformed": True}, 
            "url":   {"type": "keyword", "ignore_above": 256}, 
            "source":{"type": "keyword"}
        }
    }
}

es.indices.create(index=INDEX_NAME, body=settings) # Creamos el índice con la configuración definida
print(f"Índice '{INDEX_NAME}' creado con tolerancia a fallos.")

# 3. EJECUCIÓN MASIVA ROBUSTA
print("Iniciando proceso de ingesta masiva...")
start_time = time.time()

# helper.bulk es una función que nos permite enviar documentos a Elasticsearch de forma eficiente.
# raise_on_error=False permite que el proceso siga aunque haya datos sucios
# stats_only=False nos permite ver que ha fallado exactamente si hay errores
success, errors = helpers.bulk(
    es, 
    generate_actions(), 
    raise_on_error=False, 
    stats_only=False
)

duration = time.time() - start_time
failed_count = len(errors)

print(f"\nPROCESO FINALIZADO en {duration/60:.2f} minutos.")
print(f"Éxitos: {success}")
print(f"Fallos: {failed_count}")

# 4. DIAGNÓSTICO DE ERRORES 
if failed_count > 0:
    print("\nEjemplo del primer error encontrado:")
    print(errors[0])
    print("\n(Estos documentos se han ignorado, el resto está guardado correctamente)")

 Índice anterior 'noticias_tfg' eliminado.
Índice 'noticias_tfg' creado con tolerancia a fallos.
Iniciando proceso de ingesta masiva...


Ingesta Optimizada: 100%|██████████| 804084/804084 [01:53<00:00, 7095.76it/s] 


PROCESO FINALIZADO en 1.89 minutos.
Éxitos: 490956
Fallos: 0



