# **Modelando el Presupuesto de Subvenciones: Una Experiencia Pr√°ctica con Apache Spark**

En este *notebook* vamos a trabajar con el dataset abierto del [**Sistema Nacional de Publicidad de Subvenciones y Ayudas P√∫blicas (BDNS)**](https://www.pap.hacienda.gob.es/bdnstrans/GE/es/inicio), que recoge cientos de miles de convocatorias publicadas en Espa√±a.

Nuestro objetivo es **aprender a manejar datos a gran escala con Apache Spark** y, a partir de ellos, construir un modelo de *machine learning* que nos permita **predecir el rango de presupuesto de una convocatoria** en funci√≥n de sus caracter√≠sticas principales.

El cuaderno sigue una progresi√≥n guiada:
1. Volcado de datos de subvenciones.
2. Preparaci√≥n del entorno y primeros pasos con Apache Spark.
3. Parquet: Transformaci√≥n de los ficheros originales a un formato optimizado.
4. Exploraci√≥n y visualizaci√≥n de los datos.
5. Modelado: Construcci√≥n de un *pipeline* para entrenar y evaluar un clasificador.  
6. Conclusiones.

Este *notebook* combina tanto aspectos t√©cnicos (uso de Spark, transformaci√≥n de datos, entrenamiento de modelos) como pr√°cticos (aplicar estas t√©cnicas a un caso real de inter√©s p√∫blico). La idea es que puedas seguirlo paso a paso y familiarizarte con el flujo completo de un proyecto de *data science* en Spark.


## VOLCADO DE DATOS DE SUBVENCIONES

En este primer apartado vamos a **descargar autom√°ticamente el dataset de subvenciones** desde [la API](https://www.pap.hacienda.gob.es/bdnstrans/GE/es/inicio) del portal del Sistema Nacional de Publicidad de Subvenciones (BDNS).

Se trata de una extracci√≥n directa a trav√©s de la API p√∫blica del portal, que nos permite obtener tanto el √≠ndice de convocatorias como el detalle de cada una de ellas. Aquellos que quieran profundizar m√°s en el funcionamiento de esta API pueden apoyarse en la documentaci√≥n oficial disponible en [este portal](https://www.infosubvenciones.es/bdnstrans/doc/swagger).

![BDNS - Portal Swagger](https://github.com/Admindatosgobes/Laboratorio-de-Datos/blob/main/Data%20Science/Modelando%20el%20Presupuesto%20de%20Subvenciones%3A%20Una%20Experiencia%20Pr√°ctica%20con%20Apache%20Spark/Imagenes/bdns_swagger.png?raw=true)

Algunas consideraciones importantes:

- En esta fase **no utilizamos Spark**, ya que Spark no es la herramienta adecuada para realizar llamadas REST masivas. Aqu√≠ nos apoyamos en Python ‚Äúnormal‚Äù para gestionar la descarga de datos.  
- Debido al **alto volumen de informaci√≥n** (centenares de miles de convocatorias), este proceso puede resultar lento. Si quieres **saltarte este paso**, no hay problema: hemos dejado ya los datos procesados en el repositorio de GitHub. M√°s adelante explicamos c√≥mo cargarlos directamente desde all√≠ para seguir con el ejercicio. Puedes continuar [aqu√≠](#descarga-ficheros-parquet-desde-github) 

Con este apartado dejamos preparado el dataset en bruto que luego transformaremos a formatos m√°s eficientes y analizaremos con Spark.

In [None]:
#@title Funciones auxiliares (click para mostrar/ocultar)
# ============================================================
# DESCARGA REANUDABLE DE CONVOCATORIAS (√çNDICE + DETALLE ROTATIVO ~100 MB)
# Versi√≥n para Jupyter/Colab separada en:
#  - Celda 1: configuraci√≥n + funciones
#  - Celda 2: descarga del √çNDICE
#  - Celda 3: descarga del DETALLE
# ============================================================

import os, sys, json, math, time, random, glob, asyncio
from pathlib import Path
from typing import Iterable

import requests
import aiohttp
from aiohttp import ClientOSError, ServerDisconnectedError, ClientResponseError, ContentTypeError

# ------------------------------------------------------------
# RUTAS Y ARCHIVOS
# ------------------------------------------------------------
CARPETA_SALIDA       = Path("data/bdns")
CARPETA_SALIDA.mkdir(parents=True, exist_ok=True)

# √çndice (un solo fichero NDJSON + ficheros de progreso)
FICHERO_INDICE       = CARPETA_SALIDA / "convocatorias_indice.ndjson"
PROGRESO_PAGINAS     = CARPETA_SALIDA / "progreso_paginas.txt"  # √∫ltima p√°gina descargada (0-based)
PROGRESO_META        = CARPETA_SALIDA / "meta_indice.json"      # totalPages, totalElements, filtros‚Ä¶

# Detalle (archivos rotativos NDJSON de ~100MB)
PREFIJO_DETALLE      = "convocatorias_detalle_"
EXT_DETALLE          = ".ndjson"
PATRON_DETALLE       = str(CARPETA_SALIDA / f"{PREFIJO_DETALLE}*{EXT_DETALLE}")

# ------------------------------------------------------------
# API Y PAR√ÅMETROS GENERALES
# ------------------------------------------------------------
BASE_API             = "https://www.infosubvenciones.es/bdnstrans/api"
URL_INDICE           = f"{BASE_API}/convocatorias/busqueda"   # GET con paginaci√≥n
URL_DETALLE          = f"{BASE_API}/convocatorias"            # GET ?numConv=...
CABECERAS            = {"accept": "application/json", "user-agent": "bdns-descarga-educativa/1.0"}

# √çndice (sync/requests)
TAM_PAGINA           = 1000
REINTENTOS_SYNC      = 5
TIMEOUT_SYNC_SEG     = 60
ESPERA_BASE          = 1.2            # backoff base (1.2, 2.4, 4.8, ‚Ä¶)

# Filtros del √≠ndice (AJUSTA si quieres acotar)
FILTROS_BUSQUEDA = {
    "order": "numeroConvocatoria",
    "direccion": "asc",
}

# L√≠mite opcional de p√°ginas para pruebas (None = todas)
LIMITE_PAGINAS_DESCARGA = None

# Bandera para saltar la descarga del √≠ndice si ya lo tienes
SALTAR_DESCARGA_INDICE  = False

# Detalle (async/aiohttp)
MAX_CONCURRENCIA   = 10      # m√°s alto = m√°s r√°pido pero m√°s riesgo de errores por l√≠mites del servidor
REINTENTOS_ASYNC   = 6
TIMEOUTS_ASYNC     = aiohttp.ClientTimeout(total=120, connect=30, sock_read=90)
LOG_CADA           = 200     # progreso cada N detalles
MICRO_PAUSA_S      = 0.02    # pausa corta entre llamadas para suavizar r√°fagas

# Rotaci√≥n de ficheros de detalle (~100 MB por fichero)
MAX_FICHERO_MB     = 100
MAX_FICHERO_BYTES  = MAX_FICHERO_MB * 1024 * 1024

# L√≠mite opcional de IDs para pruebas (None = todos)
LIMITE_IDS_DETALLE = None


# ============================================================
# UTILIDADES COMUNES
# ============================================================
def extraer_numconv(obj: dict):
    """Devuelve el identificador de convocatoria desde un objeto JSON (√≠ndice o detalle)."""
    return (
        obj.get("numConv")
        or obj.get("numeroConvocatoria")
        or obj.get("numeroConv")
        or obj.get("id")
    )

# ---------------- √çndice: red robusta y progreso ----------------
def get_json_robusto(url: str, params: dict | None = None, intento: int = 1):
    """GET (requests) con reintentos y backoff exponencial + jitter ante errores transitorios."""
    try:
        r = requests.get(url, params=params, headers=CABECERAS, timeout=TIMEOUT_SYNC_SEG)
        if r.status_code in (429, 500, 502, 503, 504) and intento <= REINTENTOS_SYNC:
            espera = ESPERA_BASE * (2 ** (intento - 1)) * (1.0 + 0.25 * random.random())
            time.sleep(espera)
            return get_json_robusto(url, params, intento + 1)
        r.raise_for_status()
        return r.json()
    except Exception:
        if intento <= REINTENTOS_SYNC:
            espera = ESPERA_BASE * (2 ** (intento - 1)) * (1.0 + 0.25 * random.random())
            time.sleep(espera)
            return get_json_robusto(url, params, intento + 1)
        raise

def guardar_progreso_pagina(n_pagina: int):
    PROGRESO_PAGINAS.write_text(str(n_pagina), encoding="utf-8")

def cargar_progreso_pagina() -> int:
    if PROGRESO_PAGINAS.exists():
        try:
            return int(PROGRESO_PAGINAS.read_text(encoding="utf-8").strip())
        except Exception:
            return 0
    return 0

def guardar_meta_indice(meta: dict):
    PROGRESO_META.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")

def leer_meta_indice() -> dict | None:
    if PROGRESO_META.exists():
        try:
            return json.loads(PROGRESO_META.read_text(encoding="utf-8"))
        except Exception:
            return None
    return None

def descargar_indice_reanudable():
    """
    Descarga el √çNDICE paginado a NDJSON (reanudable):
    - Si no hay meta: descarga p√°gina 0, escribe contenido y guarda totalPages/totalElements.
    - Reanuda desde la √∫ltima p√°gina guardada (progreso_paginas.txt).
    - Escribe siempre en APPEND sobre el NDJSON del √≠ndice.
    """
    if SALTAR_DESCARGA_INDICE and FICHERO_INDICE.exists():
        print("üü° Omitiendo descarga del √≠ndice (SALTAR_DESCARGA_INDICE=True y existe el fichero).")
        return

    meta = leer_meta_indice()
    if meta is None:
        params_0 = {"page": 0, "pageSize": TAM_PAGINA, **FILTROS_BUSQUEDA}
        primera = get_json_robusto(URL_INDICE, params=params_0)
        content = primera.get("content", primera if isinstance(primera, list) else [])
        total_elements = primera.get("totalElements") or len(content)
        total_pages = primera.get("totalPages") or math.ceil(total_elements / TAM_PAGINA)

        with FICHERO_INDICE.open("a", encoding="utf-8") as f:
            for it in content:
                f.write(json.dumps(it, ensure_ascii=False) + "\n")

        meta = {
            "totalElements": total_elements,
            "totalPages": total_pages,
            "pageSize": TAM_PAGINA,
            "filtros": FILTROS_BUSQUEDA,
        }
        guardar_meta_indice(meta)
        guardar_progreso_pagina(0)
        print(f"üü¢ √çndice: descargada p√°gina 0 | totalPages={total_pages} | totalElements={total_elements}")
    else:
        print(f"‚ÑπÔ∏è Meta √≠ndice existente: totalPages={meta.get('totalPages')} | pageSize={meta.get('pageSize')}")

    total_pages = meta["totalPages"]
    if LIMITE_PAGINAS_DESCARGA is not None:
        total_pages = min(total_pages, LIMITE_PAGINAS_DESCARGA)

    start_page = cargar_progreso_pagina() + 1
    if start_page >= total_pages:
        print("‚úÖ √çndice ya estaba completo. Nada que descargar.")
        return

    for page in range(start_page, total_pages):
        params = {"page": page, "pageSize": TAM_PAGINA, **FILTROS_BUSQUEDA}
        bloque = get_json_robusto(URL_INDICE, params=params)
        content = bloque.get("content", bloque if isinstance(bloque, list) else [])
        if not content:
            print(f"‚ö†Ô∏è P√°gina {page} sin contenido. Detengo √≠ndice aqu√≠.")
            guardar_progreso_pagina(page)
            break

        with FICHERO_INDICE.open("a", encoding="utf-8") as f:
            for it in content:
                f.write(json.dumps(it, ensure_ascii=False) + "\n")

        guardar_progreso_pagina(page)
        if page % 10 == 0:
            print(f"üíæ √çndice: guardada p√°gina {page}/{total_pages-1}")

    print("‚úÖ Descarga del √≠ndice completada (reanudable).")

def leer_ids_desde_indice(ruta: Path) -> list[str]:
    """Lee el √≠ndice NDJSON y devuelve la lista de numConv como strings."""
    ids = []
    if not ruta.exists():
        raise FileNotFoundError(f"No existe el √≠ndice: {ruta}")
    with ruta.open("r", encoding="utf-8") as f:
        for linea in f:
            try:
                obj = json.loads(linea)
                k = extraer_numconv(obj)
                if k is not None:
                    ids.append(str(k).strip())
            except Exception:
                continue
    return ids

def leer_ids_procesados_rotativos(patron_archivos: str) -> set[str]:
    """Escanea TODOS los ficheros de detalle rotativos y devuelve un set de numConv ya guardados."""
    procesados = set()
    for ruta in sorted(glob.glob(patron_archivos)):
        try:
            with open(ruta, "r", encoding="utf-8") as f:
                for linea in f:
                    try:
                        obj = json.loads(linea)
                        k = extraer_numconv(obj)
                        if k is not None:
                            procesados.add(str(k).strip())
                    except Exception:
                        continue
        except FileNotFoundError:
            continue
    return procesados

# ---------------- Detalle: escritor rotativo ----------------
def obtener_siguiente_indice_archivo() -> tuple[int, int, Path]:
    """Devuelve (indice_actual, tama√±o_bytes, ruta_actual) del √∫ltimo fichero de detalle; si no hay, empieza en 1."""
    archivos = sorted(glob.glob(PATRON_DETALLE))
    if not archivos:
        idx = 1
        ruta = CARPETA_SALIDA / f"{PREFIJO_DETALLE}{idx:04d}{EXT_DETALLE}"
        return idx, 0, ruta
    ultimo = archivos[-1]
    idx = int(Path(ultimo).stem.replace(PREFIJO_DETALLE, ""))
    tam = os.path.getsize(ultimo)
    return idx, tam, Path(ultimo)

class EscritorRotativoNDJSON:
    """Escritor NDJSON que rota de fichero al superar MAX_FICHERO_BYTES."""
    def __init__(self, carpeta: Path, prefijo: str, ext: str, max_bytes: int):
        self.carpeta = carpeta
        self.prefijo = prefijo
        self.ext = ext
        self.max_bytes = max_bytes
        self.idx, self.tam, self.ruta = obtener_siguiente_indice_archivo()
        self.f = open(self.ruta, "a", encoding="utf-8")  # append (reanuda)

    def _rotar(self):
        try:
            self.f.close()
        except Exception:
            pass
        self.idx += 1
        self.ruta = self.carpeta / f"{self.prefijo}{self.idx:04d}{self.ext}"
        self.f = open(self.ruta, "a", encoding="utf-8")
        self.tam = 0
        print(f"üóÇÔ∏è  Rotando a nuevo fichero: {self.ruta.name}")

    def escribir(self, obj: dict):
        linea = json.dumps(obj, ensure_ascii=False) + "\n"
        bytes_linea = len(linea.encode("utf-8"))
        if self.tam + bytes_linea > self.max_bytes:
            self._rotar()
        self.f.write(linea)
        self.tam += bytes_linea

    def cerrar(self):
        try:
            self.f.close()
        except Exception:
            pass

# ---------------- Detalle: red as√≠ncrona ----------------
async def fetch_json_async(session: aiohttp.ClientSession, params: dict, intento: int = 1):
    """GET JSON con reintentos/backoff + jitter ante 429/5xx y errores t√≠picos de red."""
    try:
        async with session.get(URL_DETALLE, params=params) as r:
            if r.status in (429, 500, 502, 503, 504) and intento <= REINTENTOS_ASYNC:
                espera = ESPERA_BASE * (2 ** (intento - 1)) * (1 + 0.25 * random.random())
                await asyncio.sleep(espera)
                return await fetch_json_async(session, params, intento + 1)
            r.raise_for_status()
            return await r.json()
    except (asyncio.TimeoutError, ClientOSError, ServerDisconnectedError, ContentTypeError, ClientResponseError):
        if intento <= REINTENTOS_ASYNC:
            espera = ESPERA_BASE * (2 ** (intento - 1)) * (1 + 0.25 * random.random())
            await asyncio.sleep(espera)
            return await fetch_json_async(session, params, intento + 1)
        raise

async def productor_detalles(session, sem, cola, ids: Iterable[str]):
    """Lanza peticiones de detalle respetando el sem√°foro; encola objetos normalizados (objeto/lista)."""
    async def tarea(num_conv: str):
        async with sem:
            datos = await fetch_json_async(session, {"numConv": num_conv})
            if isinstance(datos, list):
                for d in datos:
                    await cola.put(d)
            else:
                await cola.put(datos)
            await asyncio.sleep(MICRO_PAUSA_S)  # peque√±a pausa para suavizar r√°fagas

    await asyncio.gather(*(tarea(i) for i in ids))
    await cola.put(None)  # se√±al de fin

async def consumidor_escritor(cola, escritor_rot: EscritorRotativoNDJSON, cada_n: int = LOG_CADA):
    """Consume objetos de la cola y los escribe en NDJSON con rotaci√≥n por tama√±o."""
    cont = 0
    try:
        while True:
            item = await cola.get()
            if item is None:
                break
            try:
                escritor_rot.escribir(item)
                cont += 1
                if cont % cada_n == 0:
                    print(
                        f"üíæ Detalles escritos en esta sesi√≥n: {cont} | "
                        f"Archivo actual: {escritor_rot.ruta.name} ({escritor_rot.tam/1024/1024:.1f} MB)"
                    )
            except Exception:
                continue
    finally:
        escritor_rot.cerrar()
        print(
            f"‚úÖ Sesi√≥n de escritura cerrada. Total nuevos: {cont} | "
            f"√öltimo fichero: {escritor_rot.ruta.name}"
        )

# ============================================================
# PREPARACI√ìN DE IDS Y MAIN DETALLE
# ============================================================
def preparar_ids(descargar_indice: bool = True):
    """
    Prepara IDs para descarga de detalle.
    - Si descargar_indice=True: ejecuta primero la descarga/reanudaci√≥n del √≠ndice.
    - Devuelve (todos_ids, ya_procesados, pendientes).
    """
    if descargar_indice:
        print("‚ñ∂Ô∏è Paso 1/3 ‚Äî Descargando (o reanudando) el √çNDICE‚Ä¶")
        descargar_indice_reanudable()
    else:
        print("‚ñ∂Ô∏è Paso 1/3 ‚Äî Usando √≠ndice ya existente (no se descarga).")

    print("‚ñ∂Ô∏è Paso 2/3 ‚Äî Leyendo IDs desde el √≠ndice‚Ä¶")
    todos = leer_ids_desde_indice(FICHERO_INDICE)

    print("‚ñ∂Ô∏è Paso 3/3 ‚Äî Detectando IDs ya guardados en detalle (archivos rotativos)‚Ä¶")
    ya = leer_ids_procesados_rotativos(PATRON_DETALLE)

    pendientes = [i for i in todos if i not in ya]
    if LIMITE_IDS_DETALLE is not None:
        pendientes = pendientes[:LIMITE_IDS_DETALLE]

    print(f"üßæ IDs totales en √≠ndice: {len(todos)} | Ya guardados: {len(ya)} | Pendientes: {len(pendientes)}")
    return todos, ya, pendientes

async def descargar_detalle_async():
    """
    Descarga as√≠ncrona del DETALLE usando el √≠ndice existente.
    NO vuelve a descargar el √≠ndice (solo lo lee).
    """
    # Preparar IDs (sin volver a descargar √≠ndice)
    _, _, pendientes = preparar_ids(descargar_indice=False)
    if not pendientes:
        print("‚úÖ No hay pendientes de detalle. Nada que hacer.")
        return

    # Sesi√≥n HTTP + estructuras de concurrencia
    connector = aiohttp.TCPConnector(
        limit=MAX_CONCURRENCIA * 4,
        limit_per_host=max(6, MAX_CONCURRENCIA // 2),
        enable_cleanup_closed=True,
        ttl_dns_cache=300,
    )
    sem = asyncio.Semaphore(MAX_CONCURRENCIA)
    cola = asyncio.Queue(maxsize=MAX_CONCURRENCIA * 4)
    escritor_rot = EscritorRotativoNDJSON(CARPETA_SALIDA, PREFIJO_DETALLE, EXT_DETALLE, MAX_FICHERO_BYTES)

    async with aiohttp.ClientSession(timeout=TIMEOUTS_ASYNC, connector=connector, headers=CABECERAS) as session:
        prod = asyncio.create_task(productor_detalles(session, sem, cola, pendientes))
        cons = asyncio.create_task(consumidor_escritor(cola, escritor_rot))
        await asyncio.gather(prod, cons)

    print("üéâ Descarga de DETALLE completada (o reanudada).")


In [None]:
# ============================================
# PASO 1: DESCARGA DEL √çNDICE DE CONVOCATORIAS BDNS
# ============================================
#
# Esta celda descarga el listado de convocatorias con sus √≠ndices pero sin su detalle.
# 
# ¬øQu√© hace esta funci√≥n?
#   - Descarga el √≠ndice de convocatorias desde la API BDNS,
#   - Guarda las p√°ginas en formato NDJSON,
#   - Mantiene ficheros de progreso para poder "reanudar"
#
# Archivos generados en carpeta_salida:
#   - convocatorias_indice.ndjson          ‚Üí el √≠ndice completo l√≠nea a l√≠nea
#   - meta_indice.json                     ‚Üí metadatos: totalPages, filters, etc.
#   - progreso_paginas.txt                 ‚Üí √∫ltima p√°gina descargada
#
# IMPORTANTE:
#   No descarga los detalles; eso se hace en la siguiente celda.
# ============================================

try:
    descargar_indice_reanudable()
    print("üéâ √çndice listo.")
    print(f"   √çndice  ‚Üí {FICHERO_INDICE}")
except Exception as e:
    print("‚ùå Error durante la descarga del √≠ndice:", repr(e))


In [None]:
# ======================================================
# PASO 2: DESCARGA DETALLADA DE CONVOCATORIAS (ASYNC)
# ======================================================
#
# El detalle es m√°s voluminoso (cientos de miles de entradas).
# Por eso est√° implementado de forma as√≠ncrona y con concurrencia.
#
# ¬øQu√© hace descargar_detalle_async?
#   1. Lee el √≠ndice que descargaste en la celda anterior.
#   2. Detecta qu√© IDs ya est√°n descargados (evita duplicados).
#   3. Descarga los detalles en paralelo (hasta MAX_CONCURRENCIA).
#   4. Los guarda en ficheros NDJSON rotativos (~100 MB cada uno).
#   5. Si el proceso se interrumpe, se puede reanudar sin perder nada.
#
# Archivos generados (carpeta_salida):
#   - convocatorias_detalle_0001.ndjson
#   - convocatorias_detalle_0002.ndjson
#   - ...
#
# ======================================================

import nest_asyncio, asyncio
nest_asyncio.apply()

try:
    await descargar_detalle_async()
    print(f"   √çndice  ‚Üí {FICHERO_INDICE}")
    print(f"   Detalle ‚Üí {PATRON_DETALLE}")
except Exception as e:
    print("‚ùå Error durante la descarga del detalle:", repr(e))
    print("‚ÑπÔ∏è Si relanzas esta celda, reanudar√° desde donde se qued√≥ (sin duplicar detalle).")


En este apartado hemos completado la descarga del dataset de subvenciones desde la API p√∫blica del BDNS.

Primero obtuvimos el √≠ndice de convocatorias y, a partir de sus identificadores, recuperamos el detalle completo de cada una de ellas por lotes en diferentes ficheros [**NDJSON**](https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson).

Con este paso ya disponemos de la materia prima necesaria para comenzar a trabajar con Spark y transformar los datos a un formato m√°s eficiente.


## PREPARACI√ìN DEL ENTORNO Y PRIMEROS PASOS CON APACHE SPARK



Para poder trabajar con **Apache Spark en Google Colab**, lo primero que necesitamos es instalar la librer√≠a **PySpark**. Esta librer√≠a nos proporciona la interfaz en Python y, adem√°s, incluye los binarios de Spark, por lo que no hace falta descargar nada m√°s desde la p√°gina oficial.

Con un simple **`pip install`** dejaremos listo el entorno para comenzar a trabajar.

In [38]:
!pip install -q pyspark

Spark est√° desarrollado en **Java/Scala**, por lo que requiere tener instalada una m√°quina virtual Java en el entorno. En Colab no viene preinstalado, de modo que debemos instalarlo nosotros. Usamos la versi√≥n **OpenJDK 11**, en su variante ‚Äúheadless‚Äù, que es m√°s ligera porque no incluye componentes gr√°ficos que aqu√≠ no necesitamos.

Una vez que Java est√° instalado, necesitamos indicarle a Spark en qu√© ruta se encuentra. Esto se hace configurando la variable de entorno **`JAVA_HOME`** en Python. En Colab, OpenJDK 11 se instala en la ruta **`/usr/lib/jvm/java-11-openjdk-amd64`**, por lo que apuntamos ah√≠ la variable para que Spark pueda ejecutarse correctamente.

In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
import os

# Configuramos la variable de entorno JAVA_HOME para que Spark sepa d√≥nde est√° instalado Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

En Spark, todo comienza creando una **`SparkSession`**. La sesi√≥n es el punto de entrada a la mayor√≠a de funcionalidades de Spark y nos permite crear, transformar y analizar datos.

Cuando construimos la sesi√≥n, podemos darle un nombre a la aplicaci√≥n con **`.appName("ColabSubvenciones")`**. Este nombre se ver√° en los logs y sirve √∫nicamente como identificador descriptivo.

La opci√≥n **`.master("local[*]")`** indica que Spark se ejecutar√° en modo local, es decir, utilizando los recursos del propio ordenador (o de la m√°quina virtual de Colab en nuestro caso). El par√°metro **`[*]`** significa que Spark aprovechar√° todos los n√∫cleos de CPU disponibles, lo que permite paralelizar operaciones.

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ColabSubvenciones") \
    .master("local[*]") \
    .getOrCreate()


Una vez creada la sesi√≥n, podemos probar su funcionamiento creando un **DataFrame** de ejemplo. Aqu√≠ usamos una peque√±a lista de tuplas con dos columnas (id y val). Esto no es todav√≠a un dataset de subvenciones, sino un simple test para verificar que Spark est√° funcionando correctamente.

In [5]:
df = spark.createDataFrame([(1,"a"),(2,"b")], ["id","val"])
df.show()

+---+---+
| id|val|
+---+---+
|  1|  a|
|  2|  b|
+---+---+



                                                                                

Es una buena pr√°ctica detener Spark cuando no lo vamos a usar m√°s, especialmente en entornos limitados como Colab.

In [12]:
spark.stop()

## PARQUET: TRANSFORMACI√ìN DE LOS FICHEROS ORIGINALES A UN FORMATO OPTIMIZADO

En este apartado vamos a transformar los datos descargados en bruto (NDJSON) a [**Parquet**](https://parquet.apache.org/), un formato de almacenamiento columnar ampliamente utilizado en entornos de *big data*.  

A diferencia de JSON, que guarda la informaci√≥n fila a fila y resulta poco eficiente para consultas anal√≠ticas, Parquet organiza los datos por columnas, lo que permite:  
- **Mayor compresi√≥n** y, por tanto, menor tama√±o en disco.  
- **Acceso selectivo** a las columnas necesarias, reduciendo tiempo de lectura y consumo de memoria.  
- Integraci√≥n nativa y muy optimizada con **Apache Spark**.  

El objetivo de esta secci√≥n es dejar nuestros datos de subvenciones preparados en un formato m√°s **ligero, r√°pido y pr√°ctico**, tanto para la exploraci√≥n como para la construcci√≥n de modelos de *machine learning* en las siguientes fases del ejercicio.


En Spark es habitual importar el m√≥dulo `functions` bajo el alias `F`. Este m√≥dulo incluye una gran colecci√≥n de funciones listas para usar en expresiones de DataFrame como transformaciones de columnas, funciones de agregaci√≥n o manejo de fechas y cadenas.

Adem√°s, iniciamos una nueva *SparkSession*.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("ColabSubvenciones") \
    .master("local[*]") \
    .getOrCreate()

### TRANSFORMACI√ìN FICHEROS NDJSON

En caso de que hayas decidido desarrollar el ejercicio completo, descargando y procesando los datos desde BNDS, debes continuar por aqu√≠.

A continuaci√≥n, cargamos en Spark todos los ficheros NDJSON de detalle y mostramos su esquema para entender qu√© columnas y tipos de datos contiene el dataset.

In [14]:
# Leemos todos los ficheros NDJSON de la carpeta data/bdns
df_subv = spark.read.json("/content/data/bdns/convocatorias_detalle*.ndjson")

# Y Comprobamos el esquema de datos para ver qu√© columnas tenemos
df_subv.printSchema()



root
 |-- abierto: boolean (nullable = true)
 |-- advertencia: string (nullable = true)
 |-- anuncios: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cve: string (nullable = true)
 |    |    |-- datPublicacion: string (nullable = true)
 |    |    |-- desDiarioOficial: string (nullable = true)
 |    |    |-- numAnuncio: long (nullable = true)
 |    |    |-- texto: string (nullable = true)
 |    |    |-- textoLeng: string (nullable = true)
 |    |    |-- titulo: string (nullable = true)
 |    |    |-- tituloLeng: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- ayudaEstado: string (nullable = true)
 |-- codigoBDNS: string (nullable = true)
 |-- descripcion: string (nullable = true)
 |-- descripcionBasesReguladoras: string (nullable = true)
 |-- descripcionFinalidad: string (nullable = true)
 |-- descripcionLeng: string (nullable = true)
 |-- documentos: array (nullable = true)
 |    |-- element: struct (containsNull = true

                                                                                

Guardamos ahora el dataset en formato Parquet.

In [None]:
# Guardamos el dataset completo en formato Parquet. Usamos 'overwrite' para que, si ya existe, lo reemplace
df_subv.write.mode("overwrite").parquet("/content/data/bdns_parquet")

### DESCARGA FICHEROS PARQUET DESDE GITHUB



En caso de que hayas decidido saltar la descarga de los datos desde la BDNS, debes continuar por aqu√≠.

En primer lugar crearemos la carpeta `data/bdns_parquet` en nuestro entorno de trabajo, donde almacenaremos los ficheros.

In [16]:
import os
os.makedirs("/content/data/bdns_parquet", exist_ok=True)

Los datos necesarios est√°n disponibles en la carpeta [*Datos*](https://github.com/Admindatosgobes/Laboratorio-de-Datos/tree/main/Data%20Science/Modelando%20el%20Presupuesto%20de%20Subvenciones%3A%20Una%20Experiencia%20Pr%C3%A1ctica%20con%20Apache%20Spark/Datos) de nuestro GitHub de trabajo.

Si no quieres ejecutar la descarga autom√°tica desde la API, puedes traer los datos ya transformados en **Parquet** directamente desde GitHub. Los pasos son:

1. **Instalar Git en tu ordenador**: Sigue las instrucciones oficiales en la [p√°gina de Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git).

2. **Clonar el repositorio** en tu ordenador. Esto crear√° una carpeta local con todo el repositorio.
   ```bash
   git clone https://github.com/Admindatosgobes/Laboratorio-de-Datos.git
   ```

3. **Localizar la carpeta de datos**: Dentro del repositorio clonado, navega hasta la carpeta de Datos:
    ```bash
    Data Science/Modelando el Presupuesto de Subvenciones: Una Experiencia Pr√°ctica con Apache Spark/Datos
    ```
4. **Abrir la carpeta data/bdns_parquet en Colab** en el explorador de archivos de la izquierda. La creamos anteriormente.

5. **Arrastrar los ficheros Parquet** desde tu ordenador (la carpeta Datos) a la carpeta data/bdns_parquet en Colab.

6. **Comprobar que Spark puede leerlos** en Colab:

In [18]:
df_subv = spark.read.parquet("/content/data/bdns_parquet")
df_subv.printSchema()

root
 |-- abierto: boolean (nullable = true)
 |-- advertencia: string (nullable = true)
 |-- anuncios: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cve: string (nullable = true)
 |    |    |-- datPublicacion: string (nullable = true)
 |    |    |-- desDiarioOficial: string (nullable = true)
 |    |    |-- numAnuncio: long (nullable = true)
 |    |    |-- texto: string (nullable = true)
 |    |    |-- textoLeng: string (nullable = true)
 |    |    |-- titulo: string (nullable = true)
 |    |    |-- tituloLeng: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- ayudaEstado: string (nullable = true)
 |-- codigoBDNS: string (nullable = true)
 |-- descripcion: string (nullable = true)
 |-- descripcionBasesReguladoras: string (nullable = true)
 |-- descripcionFinalidad: string (nullable = true)
 |-- descripcionLeng: string (nullable = true)
 |-- documentos: array (nullable = true)
 |    |-- element: struct (containsNull = true

In [17]:
df_subv.show(5, truncate=True)

+-------+--------------------+--------+-----------+----------+------------+---------------------------+--------------------+---------------+----------+-----------------+--------------------+--------------+------+------+--------------------+-----+---------+--------------------+----------------+--------+----------+----------------------+--------+-----------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------+-------------------+
|abierto|         advertencia|anuncios|ayudaEstado|codigoBDNS| descripcion|descripcionBasesReguladoras|descripcionFinalidad|descripcionLeng|documentos|fechaFinSolicitud|fechaInicioSolicitud|fechaRecepcion|fondos|    id|        instrumentos|  mrr|objetivos|              organo|presupuestoTotal|regiones|reglamento|sePublicaDiarioOficial|sectores|sectoresProductos|sedeElectronica|             textFin|          textInicio|    tipoConvocatoria|  tiposBeneficiarios|urlAyudaEstado|urlBasesReguladoras|


Si esto funciona, ¬°ya tienes los datos listos para continuar con el ejercicio!

## EXPLORACI√ìN Y VISUALIZACI√ìN DE LOS DATOS

Ahora que tenemos el dataset en Parquet, podemos cargarlo de forma mucho m√°s r√°pida y comenzar a explorarlo.
El objetivo de esta fase es obtener una primera idea de las caracter√≠sticas de los datos y de su calidad:

* Cu√°ntos registros tiene el dataset.

* Qu√© columnas contiene y de qu√© tipo son.

* Distribuciones b√°sicas de algunas variables num√©ricas y categ√≥ricas.

* Presencia de valores nulos o campos vac√≠os.

Este tipo de exploraci√≥n inicial es fundamental en cualquier proyecto de an√°lisis o machine learning.

#### **Primeros pasos con el dataset**

Antes de avanzar en la exploraci√≥n de datos, importamos **`plotly.express`**, librer√≠a de visualizaci√≥n para crear gr√°ficos interactivos de forma sencilla y con poco c√≥digo.

Como Spark no est√° pensado para graficar directamente, haremos las agregaciones en Spark y convertiremos el resultado a **`pandas`** (con muestras cuando convenga) para dibujar las gr√°ficas.

In [6]:
!pip install -q plotly

import plotly.express as px
import plotly.graph_objects as goimport
import pandas as pd
import numpy as np

Despu√©s cargamos el dataset y observamos una peque√±a muestra de sus datos.

In [3]:
# Leemos el dataset en formato Parquet
df_subv = spark.read.parquet("/content/data/bdns_parquet")

# Vemos un par de filas de ejemplo
df_subv.show(10, truncate=True)

25/11/02 18:59:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+--------------------+--------------------+-----------+----------+--------------------+---------------------------+--------------------+---------------+--------------------+-----------------+--------------------+--------------+------+------+--------------------+-----+---------+--------------------+----------------+--------------------+----------+----------------------+--------------------+-----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------+--------------------+
|abierto|         advertencia|            anuncios|ayudaEstado|codigoBDNS|         descripcion|descripcionBasesReguladoras|descripcionFinalidad|descripcionLeng|          documentos|fechaFinSolicitud|fechaInicioSolicitud|fechaRecepcion|fondos|    id|        instrumentos|  mrr|objetivos|              organo|presupuestoTotal|            regiones|reglamento|sePublicaDiarioOficial|            sectores|sectoresProductos|     sedeElectronica|  

Ya en una primera revisi√≥n del dataset se aprecian varios aspectos relevantes:
* **Campos prescindibles**: existen columnas que no aportan valor para los an√°lisis previstos (por ejemplo, advertencia).

* **Diversidad de tipos de datos**: encontramos booleanos, num√©ricos, categ√≥ricos, arrays con m√∫ltiples valores, estructuras anidadas y textos descriptivos extensos.

* **Heterogeneidad en la calidad de la informaci√≥n**: hay campos bien definidos y consistentes junto a otros con baja completitud contienen cadenas "NULL" o listas vac√≠as, lo cual requerir√° un tratamiento espec√≠fico.

* **Potencial para diferentes enfoques de an√°lisis**: los datos estructurados (n√∫meros, booleanos, categor√≠as) se prestan a modelos estad√≠sticos y agregaciones, mientras que los textos libres abren la puerta a t√©cnicas de NLP o miner√≠a de texto.

#### **¬øCu√°ntos registros tiene el dataset?**

Un primer paso b√°sico es **contar cu√°ntos registros** tiene el dataset.

In [4]:
# N√∫mero total de registros
print("N√∫mero de convocatorias:", df_subv.count())

N√∫mero de convocatorias: 578360


Disponemos de datos de cerca de **580.000 convocatorias** de subvenciones.

#### **¬øCu√°les son los tipos de convocatoria y la finalidad de las subvenciones?**

Analizando la anterior tabla, observamos dos columnas con variables categ√≥ricas, `descripcionFinalidad` y `tipoConvocatoria`. A continuaci√≥n, analizamos las categor√≠as existentes en cada una de ellas.

In [24]:
# Todas las categor√≠as √∫nicas de 'tipoConvocatoria'
df_subv.select("tipoConvocatoria").distinct().show(truncate=False)

# Todas las categor√≠as √∫nicas de 'descripcionFinalidad'
df_subv.select("descripcionFinalidad").distinct().show(truncate=False)

+-----------------------------------+
|tipoConvocatoria                   |
+-----------------------------------+
|Concurrencia competitiva - can√≥nica|
|Concesi√≥n directa - instrumental   |
|Concesi√≥n directa - can√≥nica       |
+-----------------------------------+

+-------------------------------------------------------+
|descripcionFinalidad                                   |
+-------------------------------------------------------+
|Seguridad Ciudadana e Instituciones Penitenciarias     |
|Industria y Energ√≠a                                    |
|Otras actuaciones de car√°cter econ√≥mico                |
|Defensa                                                |
|Infraestructuras                                       |
|Otras Prestaciones econ√≥micas                          |
|Acceso a la vivienda y fomento de la edificaci√≥n       |
|Servicios Sociales y Promoci√≥n Social                  |
|Comercio, Turismo y Pymes                              |
|Desempleo                 

A contiuaci√≥n, analizamos visualmente el n√∫mero de convocatorias por cada uno de sus tipos.

In [7]:
# 1) Contar convocatorias por tipo
df_tipo = (
    df_subv.groupBy("tipoConvocatoria")
      .count()
      .orderBy(F.desc("count"))
)

# 2) Pasar a Pandas para graficar
df_tipo_pd = df_tipo.toPandas()

# 3) Crear gr√°fico de barras vertical
fig = px.bar(
    df_tipo_pd,
    x="tipoConvocatoria",
    y="count",
    title="N√∫mero de convocatorias por tipo",
    labels={"tipoConvocatoria": "Tipo de convocatoria", "count": "N√∫mero de convocatorias"},
    color="tipoConvocatoria",
    color_discrete_sequence=px.colors.sequential.Agsunset_r[::-1]
)

fig.show()

Podemos ver c√≥mo *Concesi√≥n Directa* es la tipolog√≠a m√°s presente.

Ahora, analizamos qu√© cantidad de convocatorias se publican en funci√≥n de su finalidad.

In [8]:
# 1) Contar ocurrencias por categor√≠a
df_finalidad = (
    df_subv.groupBy("descripcionFinalidad")
      .count()
      .orderBy(F.desc("count"))
)

# 2) Pasar a Pandas (Top N + resto agrupado en "OTROS")
N = 8  # n√∫mero de categor√≠as a mostrar expl√≠citamente
df_finalidad_pd = df_finalidad.limit(N).toPandas()

# Calcular la suma del resto
total_count = df_subv.count()
top_count = df_finalidad_pd["count"].sum()
otros_count = total_count - top_count

# A√±adir fila "OTROS" si corresponde
if otros_count > 0:
    df_otros = pd.DataFrame([{"descripcionFinalidad": "OTROS", "count": otros_count}])
    df_finalidad_pd = pd.concat([df_finalidad_pd, df_otros], ignore_index=True)

# 3) Crear gr√°fico de pastel con Plotly
fig = px.pie(
    df_finalidad_pd,
    values="count",
    names="descripcionFinalidad",
    title="Distribuci√≥n de convocatorias por finalidad",
    hole=0.3,  # donut,
    color_discrete_sequence=px.colors.sequential.Agsunset_r[::-1]
)
fig.show()

Podemos ver que *Cultura* es la finalidad que tiene m√°s convocatorias publicadas. No obstante, no es s√≥lo relevante el n√∫mero de convocatorias sino el presupuesto de las mismas.

Vemos a continuaci√≥n que finalidades reciben mayor presupuesto.

In [9]:
# 1) Sumar presupuesto por finalidad
df_presu_finalidad = (
    df_subv.groupBy("descripcionFinalidad")
      .agg(F.sum("presupuestoTotal").alias("importe_total"))
      .orderBy(F.desc("importe_total"))
)

# 2) Pasar a Pandas (limitamos a Top N para que no sea ilegible)
N = 20  # n√∫mero de finalidades a mostrar
df_presu_finalidad_pd = df_presu_finalidad.limit(N).toPandas()

# 3) Treemap: cada rect√°ngulo es proporcional al importe_total
fig = px.treemap(
    df_presu_finalidad_pd,
    path=["descripcionFinalidad"],
    values="importe_total",
    title="Presupuesto total por finalidad",
    color_discrete_sequence=px.colors.sequential.Agsunset_r[::-1]
)

fig.show()

Podemos ver c√≥mo *Agricultura, Pesta y Alimentaci√≥n* es la finalidad que recibe mayor presupuesto y, mientras que *Cultura* era la que ten√≠aun mayor n√∫mero de convocatorias, a nivel presupuestario no est√° entre las principales, por lo que deben ser convocatorias de menor importe.

#### **¬øQu√© √≥rganos convocan m√°s subvenciones?**


Calculamos el n√∫mero de convocatorias por √≥rgano, nos quedamos con el Top 10 para entender qu√© tipo de √≥rganos convocantes aparecen.

In [10]:
# Distribuci√≥n por √≥rgano convocante
df_subv.groupBy("organo.nivel1").count().orderBy("count", ascending=False).show(10, truncate=False)
df_subv.groupBy("organo.nivel2").count().orderBy("count", ascending=False).show(10, truncate=False)
df_subv.groupBy("organo.nivel3").count().orderBy("count", ascending=False).show(10, truncate=False)

                                                                                

+-------------------------------+-----+
|nivel1                         |count|
+-------------------------------+-----+
|ESTADO                         |60088|
|CANARIAS                       |28615|
|OTROS                          |13692|
|CATALU√ëA                       |12103|
|GALICIA                        |10720|
|COMUNITAT VALENCIANA           |9871 |
|EXTREMADURA                    |9193 |
|CABILDO INSULAR DE GRAN CANARIA|7221 |
|PRINCIPADO DE ASTURIAS         |7082 |
|COMUNIDAD FORAL DE NAVARRA     |6164 |
+-------------------------------+-----+
only showing top 10 rows


                                                                                

+------------------------------------------+-----+
|nivel2                                    |count|
+------------------------------------------+-----+
|MINISTERIO DE¬†ECONOM√çA, COMERCIO Y EMPRESA|26215|
|MINISTERIO DE HACIENDA                    |16125|
|SERVICIO CANARIO DE EMPLEO                |12049|
|CABILDO INSULAR DE GRAN CANARIA           |5407 |
|DIPUTACI√ìN PROVINCIAL DE GIRONA           |5104 |
|DIPUTACI√ìN PROVINCIAL DE ALACANT/ALICANTE |4311 |
|DIPUTACI√ìN PROVINCIAL DE C√ÅDIZ            |3987 |
|DIPUTACI√ìN PROVINCIAL DE CIUDAD REAL      |3739 |
|DIPUTACI√ìN PROVINCIAL DE JA√âN             |3448 |
|DIPUTACI√ìN PROVINCIAL DE HUESCA           |3247 |
+------------------------------------------+-----+
only showing top 10 rows
+--------------------------------------------------------------------------------+------+
|nivel3                                                                          |count |
+----------------------------------------------------------------------

Podemos hacernos una primera idea de la utilizaci√≥n de cada uno de los niveles y de la cantidad de convocatorias lanzadas por cada tipo.

#### **¬øQu√© tipos de beneficiario reciben m√°s subvenciones?**

In [11]:
# Distribuci√≥n por tipo de beneficiario
df_subv.groupBy("tiposBeneficiarios.descripcion").count().orderBy("count", ascending=False).show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|descripcion                                                                                                                                                                                    |count |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|[PERSONAS JUR√çDICAS QUE NO DESARROLLAN ACTIVIDAD ECON√ìMICA]                                                                                                                                    |352538|
|[PERSONAS F√çSICAS QUE NO DESARROLLAN ACTIVIDAD ECON√ìMICA]                                                                                                                                      

                                                                                

Podemos observar c√≥mo, cuando agrupamos directamente por la columna `tiposBeneficiarios.descripcion`, en realidad estamos agrupando por una lista completa de beneficiarios. Por eso aparecen combinaciones largas como si fueran una √∫nica categor√≠a.

Si lo que queremos es contar cu√°ntas veces aparece cada beneficiario individual, necesitamos primero aplanar la lista con la funci√≥n *explode*. De esta forma cada elemento de la lista pasa a ocupar su propia fila, y ya podemos agrupar correctamente por la descripci√≥n de cada beneficiario.

In [12]:
# 1) Aplanamos la lista de tiposBeneficiarios y nos quedamos con la descripci√≥n
df_benef = (df_subv
    .select("id", F.explode_outer("tiposBeneficiarios").alias("tb"))   # una fila por elemento de la lista
    .select("id", F.trim(F.col("tb.descripcion")).alias("benef_desc"))  # nos quedamos con la descripci√≥n
    .filter(F.col("benef_desc").isNotNull())
)

# 2) Normalizamos may√∫sculas/min√∫sculas/espacios (Opcional pero recomendado)
df_benef = df_benef.withColumn("benef_desc_norm", F.upper(F.regexp_replace("benef_desc", r"\s+", " ")))

(df_benef
 .select("id", "benef_desc_norm")
 .dropDuplicates(["id", "benef_desc_norm"]) # Evita doble conteo si la misma categor√≠a aparece repetida en la misma convocatoria
 .groupBy("benef_desc_norm").count()
 .orderBy(F.col("count").desc())
 .show(100, truncate=False))




+-----------------------------------------------------------+------+
|benef_desc_norm                                            |count |
+-----------------------------------------------------------+------+
|PERSONAS JUR√çDICAS QUE NO DESARROLLAN ACTIVIDAD ECON√ìMICA  |381029|
|PERSONAS F√çSICAS QUE NO DESARROLLAN ACTIVIDAD ECON√ìMICA    |110239|
|PYME Y PERSONAS F√çSICAS QUE DESARROLLAN ACTIVIDAD ECON√ìMICA|85622 |
|SIN INFORMACION ESPECIFICA                                 |45583 |
|GRAN EMPRESA                                               |20416 |
+-----------------------------------------------------------+------+



                                                                                

In [13]:
# Explode de tipos de beneficiario
df_benef = df_subv.withColumn("beneficiario", F.explode_outer("tiposBeneficiarios"))

importe_por_benef = (
    df_benef.groupBy(F.col("beneficiario.descripcion").alias("tipo_beneficiario"))
            .agg(
                F.sum("presupuestoTotal").alias("importe_total"),
                F.avg("presupuestoTotal").alias("importe_medio"),
                F.count("*").alias("num_convocatorias")
            )
            .orderBy(F.col("importe_total").desc())
            .limit(15)
            .toPandas()
)

fig = px.bar(
    importe_por_benef.sort_values("importe_total"),
    x="importe_total", y="tipo_beneficiario",
    orientation="h",
    title="Importe total de subvenciones por tipo de beneficiario (Top 15)",
    labels={"importe_total":"Importe total (‚Ç¨)", "tipo_beneficiario":"Tipo de beneficiario"}
)

fig.show()

Podemos ver c√≥mo, aunque el volumen de convocatorias es mucho mayor para `PERSONAS JUR√çDICAS QUE NO DESARROLLAN ACTIVIDAD ECON√ìMICA`, en presupuesto total ganan relevancia las categor√≠as `PYME Y PERSONAS F√çSICAS QUE DESARROLLAN ACTIVIDAD ECON√ìMICA` y `GRAN EMPRESA`.

#### **¬øQu√© variables est√°n m√°s completas y cu√°les tienen m√°s valores ausentes?**

Un aspecto clave de la calidad de datos es la **presencia de nulos o campos vac√≠os**. Si una columna tiene demasiados valores ausentes, quiz√° no nos sirva como predictor en un modelo.

In [14]:
# Calculamos n¬∫ de nulos por columna
nulos = df_subv.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_subv.columns
])

# Pasamos a formato largo (columna, nulos)
nulos_vertical = nulos.selectExpr(
    "stack({0}, {1}) as (columna, nulos)".format(
        len(df_subv.columns),
        ",".join([f"'{c}', `{c}`" for c in df_subv.columns])
    )
)

# Ordenamos: descendente por nulos, luego alfab√©tico
nulos_vertical = nulos_vertical.orderBy(F.desc("nulos"), F.asc("columna"))

# Mostrar todas las filas y sin truncar
nulos_vertical.show(nulos_vertical.count(), truncate=False)

                                                                                

+---------------------------+------+
|columna                    |nulos |
+---------------------------+------+
|ayudaEstado                |570926|
|urlAyudaEstado             |570926|
|reglamento                 |548909|
|descripcionLeng            |507673|
|sedeElectronica            |411246|
|textFin                    |356523|
|textInicio                 |330564|
|fechaInicioSolicitud       |226288|
|fechaFinSolicitud          |205676|
|urlBasesReguladoras        |45721 |
|abierto                    |0     |
|advertencia                |0     |
|anuncios                   |0     |
|codigoBDNS                 |0     |
|descripcion                |0     |
|descripcionBasesReguladoras|0     |
|descripcionFinalidad       |0     |
|documentos                 |0     |
|fechaRecepcion             |0     |
|fondos                     |0     |
|id                         |0     |
|instrumentos               |0     |
|mrr                        |0     |
|objetivos                  |0     |
|

                                                                                

Podemos observar como algunos campos como `sectores` no est√°n siendo interpretados correctamente, ya que contienen listas vac√≠as pero en muchas ocasiones est√°n vac√≠as. Ajustamos el c√≥digo para incluir este tipo de consideraciones.

In [17]:
# Funci√≥n para generar expresi√≥n de "nulo" seg√∫n tipo
def null_expr(col, dtype):
    if dtype == "string":
        # es nulo si es null de verdad o si es la string "NULL"
        return ((F.col(col).isNull()) | (F.col(col) == "NULL"))
    elif dtype.startswith("array"):
        # es nulo si es null de verdad o si tiene longitud 0
        return ((F.col(col).isNull()) | (F.size(F.col(col)) == 0))
    elif dtype.startswith("struct"):
        # es nulo si el struct completo es null
        return F.col(col).isNull()
    else:
        # por defecto solo null
        return F.col(col).isNull()

# Construimos lista de conteos
exprs = []
for field in df_subv.schema.fields:
    colname = field.name
    dtype = str(field.dataType).lower()
    exprs.append(
        F.count(F.when(null_expr(colname, dtype), colname)).alias(colname)
    )

# Ejecutar conteo
nulos_df = df_subv.agg(*exprs)

# Mostrar resultado transpuesto (columna, cantidad)
nulos_long = nulos_df.selectExpr("stack({0}, {1}) as (columna, nulos)".format(
    len(nulos_df.columns),
    ",".join([f"'{c}', {c}" for c in nulos_df.columns])
)).orderBy(F.desc("nulos"), F.asc("columna"))

# Ordenamos: m√°s nulos primero, luego nombre
nulos_long.orderBy(F.desc("nulos"), F.asc("columna")).show(nulos_long.count(), truncate=False)


                                                                                

+---------------------------+------+
|columna                    |nulos |
+---------------------------+------+
|sectoresProductos          |577516|
|objetivos                  |572824|
|ayudaEstado                |570926|
|urlAyudaEstado             |570926|
|fondos                     |552586|
|reglamento                 |548909|
|descripcionLeng            |507673|
|anuncios                   |480181|
|sedeElectronica            |411246|
|textFin                    |356523|
|textInicio                 |330564|
|fechaInicioSolicitud       |226288|
|fechaFinSolicitud          |205676|
|regiones                   |46649 |
|urlBasesReguladoras        |45721 |
|documentos                 |45562 |
|sectores                   |44503 |
|tiposBeneficiarios         |6     |
|abierto                    |0     |
|advertencia                |0     |
|codigoBDNS                 |0     |
|descripcion                |0     |
|descripcionBasesReguladoras|0     |
|descripcionFinalidad       |0     |
|

Ahora representamos gr√°ficamente los campos con nulos para una interpretaci√≥n m√°s visual.

In [18]:
# Filtramos y ordenamos (mismos criterios: nulos desc, columna asc)
nulos_con_valor = (
    nulos_long
    .filter(F.col("nulos") > 0)
    .orderBy(F.desc("nulos"), F.asc("columna"))
)

# Pasamos a Pandas para graficar con Plotly
pdf = nulos_con_valor.toPandas()

if pdf.empty:
    print("No hay columnas con valores nulos.")
else:
    fig = px.bar(
        pdf,
        x="nulos",
        y="columna",
        orientation="h",
        title="Columnas con valores nulos",
        labels={"nulos": "N¬∫ de nulos", "columna": "Columna"},
        color="columna",
        color_discrete_sequence=px.colors.sequential.Agsunset_r[::-1]
    )
    # Mantener orden por total (ascendente en el eje Y para que la mayor quede arriba)
    fig.update_layout(height=600, yaxis={'categoryorder': 'total ascending'})
    fig.show()


#### **¬øC√≥mo se distribuyen los presupuestos?**

En primer lugar, vamos a analizar la ocurrencia de casos de presupuestos a priori raros para convocatorias de subvenciones, como convocatorias con presupuesto negativo o cero.

In [19]:
# N¬∫ de filas con presupuesto < 0
neg_rows = (df_subv
    .where(F.col("presupuestoTotal").isNotNull() & (F.col("presupuestoTotal") < 0))
    .count()
)
print("N¬∫ Convocatorias con presupuesto negativo:", neg_rows)

# N¬∫ de filas con presupuesto = 0
neg_rows = (df_subv
    .where(F.col("presupuestoTotal").isNotNull() & (F.col("presupuestoTotal") == 0))
    .count()
)
print("N¬∫ Convocatorias con presupuesto cero:", neg_rows)

N¬∫ Convocatorias con presupuesto negativo: 0
N¬∫ Convocatorias con presupuesto cero: 65562


Podemos ver que no hay convocatorias con presupuesto negativo, pero s√≠ un volumen importante con presupuesto cero. Para este ejercicio, las eliminaremos ya que pueden tener alg√∫n tipo de consideraci√≥n especial.

In [20]:
df_subv_ppto = df_subv.where(F.col("presupuestoTotal").isNotNull() & (F.col("presupuestoTotal") != 0))

A continuaci√≥n, analizamos m√©tricas estad√≠sticas la variable num√©rica **`presupuestoTotal`**.

In [21]:
# Resumen estad√≠stico de las columnas num√©ricas
df_subv_ppto.describe(["presupuestoTotal"]).show()

+-------+--------------------+
|summary|    presupuestoTotal|
+-------+--------------------+
|  count|              512798|
|   mean|  2168685.7037451197|
| stddev|2.7175059280790484E8|
|    min|                0.01|
|    max|             9.95E10|
+-------+--------------------+



Podemos observar c√≥mo la media es aproximadamente 2,17 M‚Ç¨.

No obstante, prevemos que puede haber unas pocas grandes ayudas y muchas peque√±as por lo que la media no sea suficientemente representativa. Utilizamos la mediana para profundizar algo m√°s en el estudio.

In [22]:
# Mediana (percentil 0.5) de la columna presupuestoTotal
median = df_subv_ppto.approxQuantile("presupuestoTotal", [0.5], 0.01)[0]
print("Mediana:", median)

Mediana: 15145.2


La mediana, ~15.000 ‚Ç¨, queda muy por debajo de la media, lo que revela una **distribuci√≥n fuertemente asim√©trica a la derecha**: unas pocas convocatorias concentran presupuestos muy altos y elevan la media, mientras la mayor√≠a presentan importes significativamente menores.

Para profundizar a√∫n m√°s en la distribuci√≥n de esta variable, procedemos a representarla gr√°ficamente. Lo haremos sobre una muestra de la poblaci√≥n total, un 20% de los datos.

In [23]:
# Tomamos una muestra del 20% para agilizar
sample_pdf = (
    df_subv_ppto.select(F.col("presupuestoTotal").cast("double").alias("presupuestoTotal"))
      .filter(F.col("presupuestoTotal").isNotNull() & (F.col("presupuestoTotal") >= 0))
      .sample(fraction=0.20, seed=42)
      .toPandas()
)

# Histograma en escala lineal
fig_lin = px.histogram(
    sample_pdf, x="presupuestoTotal",
    nbins=100,
    title="Distribuci√≥n de presupuestos (muestra 20%, escala lineal)",
    labels={"presupuestoTotal":"Presupuesto (EUR)"}
)
fig_lin.update_layout(bargap=0.05)
fig_lin.show()

Podemos ver, como ya anticip√°bamos, una distribuci√≥n fuertemente asim√©trica a la derecha. Aparecen muy pocas convocatorias por encima de los 500 M‚Ç¨ seg√∫n los bins que se han generado autom√°ticamente para representar la distribuci√≥n.

Vamos a contar cu√°ntas ayudas realmente est√°n por encima de esta cifra y mostrar algunas de ellas.

In [24]:
df_subv_ppto_gt500 = df_subv_ppto.where(F.col("presupuestoTotal").isNotNull() & (F.col("presupuestoTotal") > 500e6))
print("N¬∫ Convocatorias con presupuesto >500m‚Ç¨:", df_subv_ppto_gt500.count())
df_subv_ppto_gt500.show()

N¬∫ Convocatorias con presupuesto >500m‚Ç¨: 196
+-------+--------------------+--------------------+-----------+----------+--------------------+---------------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------+--------------------+------+--------------------+-----+--------------------+--------------------+----------------+--------------------+--------------------+----------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|abierto|         advertencia|            anuncios|ayudaEstado|codigoBDNS|         descripcion|descripcionBasesReguladoras|descripcionFinalidad|     descripcionLeng|          documentos|fechaFinSolicitud|fechaInicioSolicitud|fechaRecepcion|              fondos|    id|        instrumentos|  mrr|           objetivos|              organo|presupues

Tras haber entendido la casu√≠stica de la asignaci√≥n de presupuesto de convocatorias de subvenciones, vemos como solo algunas muy puntuales tienen presupuestos tan elevados.

Por ello, procedemos ahora a eliminar los *outliers*, es decir aquellos registros con valores at√≠pico o fuera del patr√≥n.

In [25]:
# Nombre de la columna num√©rica que vamos a evaluar con IQR
colname = "presupuestoTotal"

# 1) Obtenemos Q1 (p25) y Q3 (p75) con percentiles aproximados
#    El 0.01 es el error relativo permitido (‚âà1%); approxQuantile es eficiente a escala
q1, q3 = df_subv_ppto.approxQuantile(colname, [0.25, 0.75], 0.01)

# 2) Calculamos el rango intercuart√≠lico (IQR = Q3 - Q1), que mide la ‚Äúmasa central‚Äù
iqr = q3 - q1

# 3) Elegimos el factor k de la regla de Tukey (1.5 est√°ndar; 3.0 = m√°s laxo con extremos)
k = 1.5  # 3.0 si quieres ser menos agresivo

# 4) Definimos los umbrales: cualquier valor fuera de [low, high] se considera outlier
low, high = q1 - k*iqr, q3 + k*iqr

# 5) Filtramos filas qued√°ndonos con valores dentro del rango ‚Äúno at√≠pico‚Äù
df_iqr = df_subv_ppto.where((F.col(colname) >= low) & (F.col(colname) <= high))

In [26]:
# Tomamos una muestra del 20% para agilizar
sample_pdf = (
    df_iqr.select(F.col("presupuestoTotal").alias("presupuestoTotal"))
      .sample(fraction=0.20, seed=42)
      .toPandas()
)

# Histograma en escala lineal
fig_lin = px.histogram(
    sample_pdf, x="presupuestoTotal",
    nbins=100,
    title="Distribuci√≥n de presupuestos (muestra 20%, escala lineal)",
    labels={"presupuestoTotal":"Presupuesto (EUR)"}
)
fig_lin.update_layout(bargap=0.05)
fig_lin.show()

Gracias a las medidas adoptadas, hemos podido ahora entender mucho mejor la distribuci√≥n de los presupuestos de las convocatorias.

In [27]:
spark.stop()

## MODELADO: CONSTRUCCI√ìN DE UN PIPELINE PARA ENTRENAR Y EVALUAR UN CLASIFICADOR

En esta fase vamos a construir un modelo capaz de predecir el rango de presupuesto de una convocatoria a partir de sus caracter√≠sticas principales (√≥rgano convocante, regi√≥n, tipo de beneficiario, tipo de convocatoria, a√±o...).

El objetivo es aprovechar estas caracter√≠sticas para predecir el rango de presupuesto de nuevas convocatorias, de manera que sea posible anticipar si se trata de ayudas peque√±as, medianas o de gran volumen. Esto permitir√≠a a analistas y responsables de gesti√≥n obtener una visi√≥n m√°s clara de c√≥mo se distribuyen los fondos p√∫blicos y qu√© factores influyen en esa distribuci√≥n.

Trabajaremos con Apache Spark ML montando un *pipeline* que incluya preparaci√≥n de variables, ensamblado de rasgos y entrenamiento de un clasificador.

#### **Preparaci√≥n b√°sica del dataset para el modelado**

Gracias al an√°lisis exploratorio anterior podemos ahora limpiar el dataset para facilitar el modelado: selecci√≥n de variables categ√≥ricas, eliminaci√≥n de columnas no consistentes o eliminaci√≥n de *outliers* ser√°n algunas de las medidas a implementar.

In [28]:
# 1) Preparamos la sesi√≥n Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("ColabSubvenciones") \
    .master("local[*]") \
    .getOrCreate()

In [29]:
# 2) Cargamos de nuevo el dataset desde los ficheros parquet
df_ml = spark.read.parquet("data/bdns_parquet")

# 3) Creamos una columna a√±o a partir de fechaRecepcion
df_ml = df_ml.withColumn("anio", F.year("fechaRecepcion"))

# 4) Filtrado presupuestos: nos quedamos con filas con presupuesto informado, >0 y eliminamos outliers
colPpto = "presupuestoTotal"
# Filtro b√°sico
df_ml = df_ml.where(F.col(colPpto).isNotNull() & (F.col(colPpto) > 0))

# Filtro outliers
q1, q3 = df_ml.approxQuantile(colPpto, [0.25, 0.75], 0.01)
iqr = q3 - q1
k = 1.5
low, high = q1 - k*iqr, q3 + k*iqr
df_ml = df_ml.where((F.col(colPpto) >= low) & (F.col(colPpto) <= high))

# 5) Expandimos los niveles de √≥rgano a columnas espec√≠ficas
df_ml = (df_ml
      .withColumn("organo_n1", F.col("organo.nivel2"))
      .withColumn("organo_n2", F.col("organo.nivel2"))
      .withColumn("organo_n3", F.col("organo.nivel3")))

# 6) Seleccionamos las columnas que utilizaremos en nuestro an√°lisis
df_ml= df_ml.select("presupuestoTotal",
          "organo_n1",
          "organo_n2",
          "organo_n3",
          "anio",
          "descripcionFinalidad",
          "tipoConvocatoria")

# 7) Aseguramos que no haya nulos
df_ml = df_ml.fillna({
    "organo_n1": "SIN_INFO",
    "organo_n2": "SIN_INFO",
    "organo_n3": "SIN_INFO",
    "descripcionFinalidad": "SIN_INFO",
    "tipoConvocatoria": "SIN_INFO"
})

df_ml.show(5, truncate=False)

+----------------+------------------------------------------+------------------------------------------+---------+----+-------------------------------------+-----------------------------------+
|presupuestoTotal|organo_n1                                 |organo_n2                                 |organo_n3|anio|descripcionFinalidad                 |tipoConvocatoria                   |
+----------------+------------------------------------------+------------------------------------------+---------+----+-------------------------------------+-----------------------------------+
|1170.0          |AYUNTAMIENTO DE ZAS                       |AYUNTAMIENTO DE ZAS                       |SIN_INFO |2020|Cultura                              |Concurrencia competitiva - can√≥nica|
|44900.0         |AYUNTAMIENTO DE POBLA DE FARNALS, LA      |AYUNTAMIENTO DE POBLA DE FARNALS, LA      |SIN_INFO |2020|Servicios Sociales y Promoci√≥n Social|Concurrencia competitiva - can√≥nica|
|3500.0          |AYUNTAMIE

Para facilitar la interpretaci√≥n, convertimos el presupuesto en tres clases: *bajo, medio y alto*. Esto nos permite aplicar clasificadores de Spark ML y evaluar con m√©tricas est√°ndar.

In [30]:
# Umbrales configurables (ajusta si lo deseas)
t_bajo  = 20_000     # ‚â§ 20k ‚Üí "bajo"
t_medio = 150_000   # (20k, 150k] ‚Üí "medio"; >150k ‚Üí "alto"

def presupuesto_a_clase(p):
    if p <= t_bajo:
        return "bajo"
    elif p <= t_medio:
        return "medio"
    else:
        return "alto"

# UDF para etiquetar
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
udf_clase = udf(presupuesto_a_clase, StringType())

df_umb = df_ml.withColumn("presupuesto_clase", udf_clase(F.col("presupuestoTotal")))

# Balance de clases
df_umb.groupBy("presupuesto_clase").count().orderBy("count", ascending=False).show()

[Stage 4:>                                                          (0 + 8) / 9]

+-----------------+------+
|presupuesto_clase| count|
+-----------------+------+
|             bajo|283849|
|            medio|143100|
|             alto|  1373|
+-----------------+------+



                                                                                

#### **Construcci√≥n del *pipeline* de modelado**

En Spark, un *Pipeline* permite encadenar de forma estructurada todas las etapas de un flujo de *machine learning*, desde la preparaci√≥n de los datos hasta el entrenamiento y la predicci√≥n. Cada paso (como indexar variables, escalar datos o entrenar un modelo) se define como una etapa dentro del pipeline, lo que facilita reproducir el proceso completo y aplicarlo f√°cilmente sobre nuevos conjuntos de datos.

Usaremos como predictores las siguientes caracter√≠sticas de nuestro dataset: `√≥rgano_n1`, `√≥rgano_n2`, `√≥rgano_n3`, `descripcionFinalidad`, `tipoConvocatoria` y `anio`. Adicionalmente, dividimos en entrenamiento y prueba con una proporci√≥n 80/20, de modo que el modelo pueda aprender sobre el 80% de los datos y evaluarse objetivamente con el 20% restante

In [31]:
cols_cat = ["organo_n1", "organo_n2", "organo_n3", "descripcionFinalidad", "tipoConvocatoria"]
cols_num = ["anio"]

df_model = df_umb.select(cols_cat + cols_num + ["presupuesto_clase"])

train, test = df_model.randomSplit([0.8, 0.2], seed=42)
train.count(), test.count()

                                                                                

(343078, 85244)

Construimos ahora un *pipeline* que agrupa todas las etapas del flujo de preparaci√≥n y modelado para simplificar la ejecuci√≥n y garantizar reproducibilidad.
Incluye:
1. `StringIndexer`, para convertir categor√≠as a √≠ndices. Permite transformar columnas con valores categ√≥ricos (por ejemplo, ‚ÄúEspa√±a‚Äù, ‚ÄúFrancia‚Äù, ‚ÄúItalia‚Äù) en n√∫meros enteros (0, 1, 2). Esto facilita que las siguientes etapas del pipeline puedan trabajar con esos valores de manera eficiente.

2. `OneHotEncoder`, herramienta que convierte variables categ√≥ricas en vectores binarios, donde cada categor√≠a se representa con un valor 1 en su posici√≥n correspondiente y 0 en las dem√°s. De esta forma, el modelo puede interpretar correctamente las categor√≠as sin asumir un orden num√©rico entre ellas. De esta forma, el modelo puede interpretar correctamente las categor√≠as sin asumir un orden num√©rico entre ellas, evitando errores de interpretaci√≥n (por ejemplo, que el modelo piense que ‚ÄúItalia=2‚Äù es mayor que ‚ÄúEspa√±a=0‚Äù).

3. `VectorAssembler`, para unir todas las columnas en un vector de caracter√≠sticas. En Spark ML, todos los modelos (como RandomForestClassifier, LogisticRegression, etc.) esperan recibir una sola columna llamada normalmente features, que contiene un vector con todos los valores num√©ricos que describen cada fila. El VectorAssembler es la herramienta que une en un solo vector todas las columnas que queremos usar como variables de entrada del modelo.

4. `RandomForestClassifier`, un modelo de clasificaci√≥n basado en m√∫ltiples √°rboles de decisi√≥n entrenados de forma aleatoria, elegido por su robustez y buena capacidad de generalizaci√≥n sin un gran ajuste de hiperpar√°metros.

In [34]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Indexadores para cada categ√≥rica + label indexer
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cols_cat]
label_indexer = StringIndexer(inputCol="presupuesto_clase", outputCol="label", handleInvalid="keep")

# One-Hot sobre los √≠ndices categ√≥ricos
encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in cols_cat],
    outputCols=[f"{c}_oh"  for c in cols_cat]
)

# Ensamblado del vector de features (OHE + num√©ricas)
assembler = VectorAssembler(
    inputCols=[f"{c}_oh" for c in cols_cat] + cols_num,
    outputCol="features"
)

# Clasificador
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=200,
    maxDepth=15,
    seed=42
)

pipeline = Pipeline(stages=indexers + [label_indexer, encoder, assembler, rf])

Entrenamos el pipeline sobre el conjunto de entrenamiento y generamos predicciones en test para evaluar el rendimiento del modelo.

In [35]:
model = pipeline.fit(train)
pred = model.transform(test)

Usaremos accuracy y F1 como m√©tricas globales y mostraremos la matriz de confusi√≥n para ver d√≥nde comete m√°s errores. Esto nos ayuda a entender si el modelo confunde ‚Äúmedio‚Äù con ‚Äúalto‚Äù, etc.

Evaluaremos el rendimiento del modelo utilizando **accuracy** y **F1-score** como m√©tricas globales.

- **Accuracy** mide el porcentaje total de aciertos del modelo, ofreciendo una visi√≥n general de su desempe√±o.

- **F1-score** combina precisi√≥n y exhaustividad en una sola m√©trica. Es especialmente √∫til cuando las clases est√°n desequilibradas, porque un modelo que acierta mucho en la clase mayoritaria puede tener buena accuracy pero un F1 bajo si falla en las clases minoritarias.

Adem√°s, mostraremos la **matriz de confusi√≥n**, que detalla cu√°ntas observaciones de cada clase fueron clasificadas correctamente o confundidas con otras. Esto nos permite identificar patrones de error, por ejemplo, si el modelo tiende a confundir la clase ‚Äúmedio‚Äù con ‚Äúalto‚Äù.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

# Creamos dos evaluadores para medir el rendimiento global del modelo:
# - Accuracy: proporci√≥n total de predicciones correctas.
# - F1-score: m√©trica combinada que equilibra precisi√≥n y recall.
acc_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
f1_eval  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Calculamos ambas m√©tricas sobre el DataFrame 'pred', que contiene
# las predicciones generadas por el modelo (model.transform(test)).
accuracy = acc_eval.evaluate(pred)
f1       = f1_eval.evaluate(pred)
print(f"Accuracy: {accuracy:.3f}  |  F1: {f1:.3f}")

# Matriz de confusi√≥n:
# Agrupamos por la etiqueta real ('label') y la predicha ('prediction'),
# y contamos cu√°ntas veces se da cada combinaci√≥n.
# Esto nos permite ver en qu√© clases acierta m√°s y en cu√°les se confunde.
cm = (pred.groupBy("label","prediction").count()
          .orderBy("label","prediction"))
cm.show()

Accuracy: 0.802  |  F1: 0.732
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|27672|
|  0.0|       1.0|   65|
|  0.0|       2.0|   71|
|  1.0|       0.0| 4818|
|  1.0|       1.0|  276|
|  1.0|       2.0|  107|
|  2.0|       0.0| 1801|
|  2.0|       1.0|   79|
|  2.0|       2.0|  247|
+-----+----------+-----+


El modelo **acierta en torno al 80% de los casos**, lo que est√° bastante bien, aunque el **F1-score (0,73) muestra que no todas las clases se predicen igual de bien**.

Si miramos la matriz de confusi√≥n, vemos que la clase 0 se identifica correctamente casi siempre, pero las clases 1 y 2 se confunden con frecuencia con la 0.
Esto significa que el modelo tiende a predecir la clase mayoritaria, lo que hace subir la accuracy, pero a costa de fallar m√°s en las clases minoritarias.

#### **An√°lisis de la importancia de las caracter√≠sticas**

En los modelos de bosque aleatorio es posible conocer qu√© variables han tenido mayor peso en las decisiones del modelo, es decir, su importancia relativa. Esto nos permitir√° visualizar y analizar qu√© variables del dataset han sido m√°s determinantes en las decisiones del modelo, proporcionando informaci√≥n valiosa sobre los factores que m√°s influyen en la clasificaci√≥n.

Cuando trabajamos con variables categ√≥ricas que han sido transformadas mediante OneHotEncoding, cada categor√≠a de una misma variable se convierte en una columna independiente. Esto hace que la importancia se reparta entre varias columnas que en realidad pertenecen a una misma variable original.
Para obtener una interpretaci√≥n m√°s clara y √∫til, agrupamos esas columnas codificadas seg√∫n la variable de origen y sumamos su importancia total.De esta forma podemos ver, por ejemplo, qu√© variables del conjunto original ‚Äîcomo tipo de convocatoria o √≥rgano convocante‚Äî son realmente las que m√°s influyen en la predicci√≥n del modelo, independientemente de cu√°ntas categor√≠as internas tuvieran.

In [37]:
from pyspark.ml.feature import OneHotEncoderModel, VectorAssembler
from pyspark.ml.classification import RandomForestClassificationModel

# ============================================
# 1) Localizamos en el pipeline entrenado:
#    - el modelo de Random Forest (para leer importancias),
#    - el OneHotEncoder (para saber cu√°ntas columnas gener√≥ por variable),
#    - y el VectorAssembler (para conocer el orden exacto de las columnas en 'features').
# ============================================
rf_model = next(stg for stg in model.stages if isinstance(stg, RandomForestClassificationModel))
ohe_model = next(stg for stg in model.stages if isinstance(stg, OneHotEncoderModel))
assembler = next(stg for stg in model.stages if isinstance(stg, VectorAssembler))

# Importancias a nivel de cada posici√≥n del vector 'features'
# (mismo orden que genera el VectorAssembler)
importances = rf_model.featureImportances.toArray()

# ============================================
# 2) Mapa de salidas del OneHotEncoder ‚Üí n¬∫ de categor√≠as
#    (antes de aplicar dropLast). Esto nos dice cu√°ntas columnas
#    produce cada columna categ√≥rica codificada.
# ============================================
ohe_cat_sizes = dict(zip(ohe_model.getOutputCols(), ohe_model.categorySizes))

# Por defecto, OneHotEncoder suele usar dropLast=True:
# - Si hay K categor√≠as, genera K-1 columnas (una se "cae" para evitar colinealidad).
drop_last = ohe_model.getDropLast() 

# ============================================
# 3) Orden real de entrada al VectorAssembler
#    Este orden define c√≥mo se apilan las columnas en el vector 'features'.
# ============================================
assembler_inputs = assembler.getInputCols()

# ============================================
# 4) Construimos "spans" (rangos) dentro de 'features' para cada entrada del assembler.
#    - Para variables num√©ricas: tama√±o 1.
#    - Para variables OHE: tama√±o K-1 si dropLast=True, o K si dropLast=False.
#    - Si una variable categ√≥rica tiene solo 1 categor√≠a y dropLast=True ‚Üí tama√±o efectivo 0.
#    Guardamos tambi√©n el nombre de cada bloque para poder agregar luego.
# ============================================

spans = []  
names = []  
pos = 0

for col in assembler_inputs:
    if col in ohe_cat_sizes:
        # Bloque categ√≥rico codificado (One-Hot)
        raw_size = ohe_cat_sizes[col] # Bloque categ√≥rico codificado (One-Hot)
        eff_size = raw_size - 1 if drop_last else raw_size # Bloque categ√≥rico codificado (One-Hot)
        eff_size = max(eff_size, 0)  # Bloque categ√≥rico codificado (One-Hot)
    else:
        # Num√©rica (o vector ya escalar). Asumimos tama√±o 1.
        eff_size = 1

    spans.append((pos, pos + eff_size)) # rango semiabierto [inicio, fin)
    names.append(col)                    # nombre de la entrada del assembler
    pos += eff_size                      # avanzamos el cursor

# Chequeo de consistencia: la suma de longitudes debe igualar
# la dimensi√≥n de 'importances' (longitud de 'features').
if pos != len(importances):
    raise ValueError(
        f"Dimensi√≥n de 'features' ({pos}) no coincide con importances ({len(importances)}). "
        "Posibles causas: columnas num√©ricas que no son escalares, entradas vectoriales adicionales, "
        "o cambios en el assembler. Revisa assembler.getInputCols()."
    )

# ============================================
# 5) Sumamos importancias por bloque
#    - Cada bloque corresponde a una entrada del assembler:
#      * si era OHE, suma todas sus columnas codificadas;
#      * si era num√©rica, toma su √∫nica posici√≥n.
# ============================================
agg = [] # lista de (nombre_de_entrada_assembler, importancia_agrupada)
for (name, (s, e)) in zip(names, spans):
    # Sumamos importancias del rango [s:e). Si el rango es vac√≠o (e==s), ponemos 0.0
    score = float(np.sum(importances[s:e])) if e > s else 0.0
    agg.append((name, score))

# ============================================
# 6) Normalizamos nombres para agrupar por variable original
#    Si tus columnas OHE tienen un sufijo como "_oh" (ej. "tipoConvocatoria_oh"),
#    lo eliminamos para colapsar todas las categor√≠as bajo el mismo nombre base.
#    * Ajusta esta funci√≥n si usas otra convenci√≥n de nombres *
# ============================================
def base_name(n):
    return n[:-3] if n.endswith("_oh") else n # elimina sufijo "_oh"

agg_by_var = {} # dict nombre_variable_base -> importancia_total

for name, score in agg:
    base = base_name(name)
    agg_by_var[base] = agg_by_var.get(base, 0.0) + score

# ============================================
# 7) Mostramos las variables ordenadas por importancia descendente
#    (interpretaci√≥n a nivel de variable original)
# ============================================
agg_sorted = sorted(agg_by_var.items(), key=lambda x: x[1], reverse=True)
for name, score in agg_sorted:
    print(f"{name:20s} -> {score:.4f}")


organo_n2            -> 0.3454
organo_n1            -> 0.3193
descripcionFinalidad -> 0.1491
tipoConvocatoria     -> 0.1058
organo_n3            -> 0.0740
anio                 -> 0.0064


El an√°lisis de importancia de variables muestra qu√© campos han tenido m√°s peso en las decisiones del modelo de bosque aleatorio.

En este caso, las variables relacionadas con el √≥rgano convocante (organo_n1, organo_n2, organo_n3) son las que m√°s influyen en la predicci√≥n, especialmente organo_n2 y organo_n1, que juntas explican m√°s del 65 % de la importancia total del modelo.
Esto sugiere que la estructura organizativa o el nivel del √≥rgano tiene una fuerte relaci√≥n con la variable objetivo.

La finalidad de la convocatoria (descripcionFinalidad) y el tipo de convocatoria tambi√©n aportan informaci√≥n relevante, aunque en menor medida.
Por el contrario, el a√±o (anio) apenas contribuye al modelo, lo que indica que la variable temporal no tiene un peso significativo en la predicci√≥n.

En conjunto, el modelo parece basarse sobre todo en qui√©n convoca, m√°s que en cu√°ndo o para qu√© se convoca.

Guardamos el PipelineModel para poder reutilizarlo sin reentrenar, y dejamos preparado el conjunto de datos transformado si queremos inspeccionar features.

In [None]:
# Guardar modelo entrenado
model_path = "/content/models/presupuesto_rf_pipeline"
model.write().overwrite().save(model_path)
print(f"Modelo guardado en: {model_path}")

# (Opcional) guardar predicciones
pred.write.mode("overwrite").parquet("/content/data/predicciones_presupuesto")

Modelo guardado en: models/presupuesto_rf_pipeline


## CONCLUSIONES

A lo largo de este recorrido hemos pasado por todas las fases clave de un proyecto de an√°lisis y modelado con Spark.
Comenzamos con el volcado de datos de subvenciones, entendiendo su estructura y volumen, y configuramos el entorno de trabajo en Apache Spark para aprovechar su capacidad de procesar grandes conjuntos de datos de forma distribuida.

Posteriormente, transformamos los ficheros originales al formato Parquet, optimizando el almacenamiento y la velocidad de lectura, lo que nos permiti√≥ trabajar de forma m√°s eficiente en las etapas siguientes.
Realizamos una exploraci√≥n inicial de los datos para identificar patrones, relaciones y posibles problemas de calidad, complement√°ndola con visualizaciones que facilitaron la interpretaci√≥n.

A continuaci√≥n, construimos un pipeline completo de modelado, desde el preprocesamiento de variables hasta el entrenamiento y evaluaci√≥n de un clasificador, siguiendo una metodolog√≠a reproducible y escalable.
Finalmente, analizamos los resultados del modelo, evaluando su rendimiento y examinando la importancia de las variables para comprender qu√© factores influyen m√°s en las predicciones.

En conjunto, este ejercicio ha mostrado el flujo de trabajo completo de un proyecto de Machine Learning en Spark, desde los datos brutos hasta la interpretaci√≥n final de un modelo funcional y explicable.