In [7]:
# -*- coding: utf-8 -*-
"""
Wide ETL Municipios (Madrid < 50k):
- Google Places (Nearby Search): recuento + rating medio ponderado + total reseñas
- Google Air Quality API: condición actual
Salida: CSV wide (1 fila por municipio)
"""

import os
import time
import json
import hashlib
import logging
from typing import Dict, List, Any, Tuple

import pandas as pd
import requests
from dotenv import load_dotenv

In [8]:
# =============== CONFIGURACIÓN ===============
load_dotenv()

# --- Claves y rutas ---
GOOGLE_PLACES_API_KEY = os.getenv("GOOGLE_PLACES_API_KEY") or os.getenv("API_KEY")
GOOGLE_AIR_API_KEY    = os.getenv("GOOGLE_AIR_API_KEY") or GOOGLE_PLACES_API_KEY

INPUT_CSV  = os.path.join("../raw_data", "municipios_madrid_menores_50000.csv")
OUT_WIDE   = os.path.join("../raw_data", "municipios_google_data.csv") # Nombre actualizado

CACHE_DIR  = os.path.join("cache")
CACHE_PLACES_SEARCH  = os.path.join(CACHE_DIR, "places_search")
CACHE_AIR            = os.path.join(CACHE_DIR, "air_quality")
os.makedirs(CACHE_PLACES_SEARCH,  exist_ok=True)
os.makedirs(CACHE_AIR,            exist_ok=True)
os.makedirs(os.path.dirname(OUT_WIDE), exist_ok=True)

# --- Parámetros generales ---
RADIUS_METERS = 7000          # radio de consulta
MAX_PAGES = 1                 # Nearby Search: 1 página (20 resultados) para contener cuota
SLEEP_BETWEEN_REQS = 1.2      # segundos entre peticiones
SLEEP_NEXT_PAGE = 2.2         # espera para next_page_token de Google
TIMEOUT = 30                  # timeout HTTP
MAX_RETRIES = 3               # Máximo de reintentos para errores transitorios (5xx)

# --- Columnas esperadas en el CSV ---
COL_ID    = "cod_municipio"
COL_NAME  = "municipio"
COL_LAT   = "latitud"
COL_LON   = "longitud"
COL_POP   = "poblacion"

# --- Categorías Google (EXTENDIDAS) ---
PLACE_TYPES: Dict[str, List[str]] = {
    # 1. Servicios básicos y cotidianos
    "g_supermercados": ["supermarket", "grocery_or_supermarket"],
    "g_conveniencia": ["convenience_store"],
    "g_farmacias": ["pharmacy"],
    "g_bancos": ["bank"],
    "g_cajeros": ["atm"],
    "g_gasolineras": ["gas_station"],

    # 2. Salud y asistencia
    "g_hospitales_clinicas": ["hospital", "clinic"],
    "g_medicos_familia": ["doctor"],
    
    # 3. Educación y formación
    "g_escuelas_infantiles": ["preschool", "kindergarten"],
    "g_colegios_institutos": ["primary_school", "secondary_school", "school"],
    "g_universidad_fp": ["university", "vocational_school"],
    
    # 4. Transporte (Antes OSM - Mapeado a Google Place Types)
    "g_paradas_bus": ["bus_station", "bus_stop"],
    "g_estaciones_principales": ["train_station", "subway_station", "transit_station"],
    "g_aparcamientos": ["parking"],

    # 5. Ocio y cultura
    "g_restaurantes": ["restaurant"],
    "g_cafeterias": ["cafe"],
    "g_bares": ["bar"],
    "g_cines": ["movie_theater"],
    "g_gimnasios": ["gym"],
    "g_parques": ["park"], # Mapea parques (lo más parecido a entorno)
    
    # 6. Comercio
    "g_centros_comerciales": ["shopping_mall"],
    
    # 7. Seguridad y administración (Antes OSM - Mapeado a Google Place Types)
    "g_comisarias": ["police"],
    "g_bomberos": ["fire_station"],
    "g_ayuntamientos": ["city_hall"],
    "g_juzgados": ["court"],
    
    # NOTA: Bosques y rutas de senderismo no tienen Place Type en Google. 
    # Solo "park" se mantiene como indicador de entorno natural.
}


# =============== LOGGING ===============
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# =============== HELPERS ===============
class GoogleAPIError(Exception):
    """Excepción base para errores de Google API."""
    pass

class GoogleAPICriticalError(GoogleAPIError):
    """Error crítico (403, 401) que debe detener la ejecución de esa API."""
    pass

def _hash_key(*parts: str) -> str:
    s = "||".join(parts)
    return hashlib.md5(s.encode("utf-8")).hexdigest()

def _cache_read(path: str):
    if os.path.exists(path):
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    return None

def _cache_write(path: str, data):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False)

def _http_request(method: str, url: str, params: Dict[str, Any] = None, json_body: Dict[str, Any] = None, timeout: int = TIMEOUT) -> requests.Response:
    """Maneja peticiones HTTP con reintento y captura de errores críticos."""
    for attempt in range(MAX_RETRIES):
        try:
            if method == "GET":
                r = requests.get(url, params=params, timeout=timeout)
            elif method == "POST":
                r = requests.post(url, json=json_body, timeout=timeout)
            else:
                raise ValueError("Método HTTP no soportado.")
            
            # --- Manejo de errores de API ---
            if r.status_code in [401, 403]:
                # Error de clave o permiso: no reintentar, abortar el proceso de la API.
                raise GoogleAPICriticalError(f"HTTP {r.status_code}. Revisa la clave API: {r.text[:180]}")
            
            if r.status_code == 200:
                return r
            
            # Si es un error transitorio (5xx), reintentar
            if r.status_code >= 500:
                logging.warning(f"Error HTTP {r.status_code}. Reintento {attempt+1}/{MAX_RETRIES}.")
                time.sleep(2 ** attempt)
                continue
                
            # Otros errores no 200/4xx/5xx (raros)
            return r

        except requests.exceptions.RequestException as e:
            # Error de conexión, timeout, etc.
            logging.warning(f"Error de conexión. Reintento {attempt+1}/{MAX_RETRIES}. Error: {e}")
            time.sleep(2 ** attempt)
            
    # Si todos los reintentos fallaron
    raise GoogleAPIError(f"Fallo persistente tras {MAX_RETRIES} intentos para {url}.")

# Implementaciones que usan _http_request
def _http_get(url: str, params: Dict[str, Any] = None, timeout: int = TIMEOUT) -> requests.Response:
    return _http_request("GET", url, params=params, timeout=timeout)

def _http_post(url: str, json_body: Dict[str, Any], timeout: int = TIMEOUT) -> requests.Response:
    return _http_request("POST", url, json_body=json_body, timeout=timeout)

# =============== GOOGLE PLACES (Nearby Search) ===============
def places_nearby_search(lat: float, lon: float, place_type: str, max_pages: int = MAX_PAGES) -> List[Dict[str, Any]]:
    """
    Busca lugares cercanos para un tipo específico. Incluye paginación.
    Lanza GoogleAPICriticalError si hay un 403/401.
    """
    url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json"
    all_results = []
    page = 0
    next_page_token = None

    while page < max_pages:
        params = {
            "key": GOOGLE_PLACES_API_KEY,
            "location": f"{lat},{lon}",
            "radius": RADIUS_METERS,
            "type": place_type
        }
        if next_page_token:
            # Token de paginación no se puede combinar con otros parámetros excepto key.
            params = {"key": GOOGLE_PLACES_API_KEY, "pagetoken": next_page_token}

        r = _http_get(url, params)
        data = r.json()
        
        if r.status_code != 200:
            # Esto maneja fallos que no sean 403/401 (ya manejados en _http_request)
            logging.warning(f"NearbySearch JSON ERROR {r.status_code} para {place_type}: {data.get('status')}")
            break
            
        results = data.get("results", [])
        all_results.extend(results)
        next_page_token = data.get("next_page_token")
        page += 1

        if next_page_token:
            # Espera forzada para que Google genere el token de la siguiente página
            time.sleep(SLEEP_NEXT_PAGE) 
        else:
            break

        time.sleep(SLEEP_BETWEEN_REQS) # Espera entre diferentes peticiones

    return all_results

def fetch_places_for_category(lat: float, lon: float, cat_key: str, subtypes: List[str]) -> Dict[str, Any]:
    """
    Nearby Search para varios subtipos; deduplica; resume métricas.
    """
    all_results: List[Dict[str, Any]] = []
    
    # 1. Recolección de datos por subtipo con caching
    for subtype in subtypes:
        # Clave de cache por coordenadas, radio, subtipo y página límite
        cache_key = _hash_key("nearby", f"{lat:.5f}", f"{lon:.5f}", str(RADIUS_METERS), subtype, str(MAX_PAGES))
        cache_path = os.path.join(CACHE_PLACES_SEARCH, f"{cache_key}.json")
        data = _cache_read(cache_path)
        
        if data is None:
            data = places_nearby_search(lat, lon, subtype, MAX_PAGES)
            _cache_write(cache_path, data)
        
        all_results.extend(data)
        time.sleep(SLEEP_BETWEEN_REQS) # Evita saturar la API

    # 2. Deduplicación (por place_id)
    dedup = {}
    for p in all_results:
        pid = p.get("place_id")
        if not pid: continue
        old = dedup.get(pid)
        
        if old is None:
            dedup[pid] = p
        else:
            # Nos quedamos con el registro que tenga más reseñas
            n_old = old.get("user_ratings_total") or 0
            n_new = p.get("user_ratings_total") or 0
            if n_new > n_old:
                dedup[pid] = p

    clean = list(dedup.values())

    # 3. Métricas agregadas (rating medio ponderado por nº reseñas)
    total_reviews = 0
    weighted_sum = 0.0
    for p in clean:
        r = p.get("rating")
        n = p.get("user_ratings_total") or 0
        if r is not None and n is not None:
            weighted_sum += r * n
            total_reviews += n

    weighted_avg = (weighted_sum / total_reviews) if total_reviews > 0 else None

    return {
        f"{cat_key}_count": len(clean),
        f"{cat_key}_reviews": int(total_reviews),
        f"{cat_key}_rating_wavg": round(weighted_avg, 3) if weighted_avg is not None else None
    }

# =============== GOOGLE AIR QUALITY (Actual) ===============
# =============== GOOGLE AIR QUALITY (Actual) ===============
def air_quality_current(lat: float, lon: float) -> Dict[str, Any]:
    """
    Google Air Quality API - condiciones actuales para un punto.
    Solo devuelve campos numéricos (AQI y valores de concentración).
    """
    url = f"https://airquality.googleapis.com/v1/currentConditions:lookup?key={GOOGLE_AIR_API_KEY}"
    payload = {
        "location": {"latitude": lat, "longitude": lon},
        "extraComputations": ["POLLUTANT_CONCENTRATION"], # Solo necesitamos valores de concentración
        "languageCode": "es"
    }
    
    cache_key = _hash_key("air_current", f"{lat:.5f}", f"{lon:.5f}")
    cache_path = os.path.join(CACHE_AIR, f"{cache_key}.json")
    data = _cache_read(cache_path)
    
    if data is None:
        r = _http_post(url, payload)
        
        if r.status_code != 200:
            logging.warning(f"AirQuality JSON ERROR {r.status_code}: {r.json().get('error', {}).get('message')}")
            return {}
            
        data = r.json()
        _cache_write(cache_path, data)
        time.sleep(SLEEP_BETWEEN_REQS)

    result = {}
    try:
        # 1. FIX DE ROBUSTEZ: Asegura que la data sea un diccionario antes de parsear
        if not isinstance(data, dict):
            logging.warning(f"AirQuality API / Cache para {lat},{lon} devolvió un tipo inesperado: {type(data)}. Saltando parsing.")
            return {}
            
        indexes = data.get("indexes", [])
        if indexes:
            idx = indexes[0]
            # 2. SOLO NUMÉRICOS: Guardamos el valor numérico del AQI
            result["aq_aqi"] = idx.get("aqi")
            # Descartamos 'aq_aqi_source', 'aq_category', 'aq_dominant_pollutant'

        pollutants = data.get("pollutants", [])
        for p in pollutants:
            code = (p.get("code") or p.get("displayName") or "").lower()
            conc = (p.get("concentration") or {}).get("value") # Valor de concentración (numérico)
            # Descartamos 'unit'
            
            if code and conc is not None:
                # Guardamos solo el valor numérico (e.g., aq_pm25_value)
                result[f"aq_{code}_value"] = conc
                
    except Exception as e:
        logging.warning(f"Parse AirQuality error: {e}")

    return result

# =============== PIPELINE PRINCIPAL (wide) ===============
def process_municipio(row: pd.Series, api_status: Dict[str, bool]) -> Dict[str, Any]:
    cod = row[COL_ID]
    name = row[COL_NAME]
    lat = float(row[COL_LAT])
    lon = float(row[COL_LON])
    pop = row.get(COL_POP, None)

    logging.info(f"Municipio: {name} ({cod})")

    rec = {
        COL_ID: cod, COL_NAME: name, COL_LAT: lat, COL_LON: lon, COL_POP: pop
    }

    # --- Google Places (conteos + rating medio ponderado + total reseñas) ---
    if api_status["places"]:
        for cat_key, subtypes in PLACE_TYPES.items():
            try:
                metrics = fetch_places_for_category(lat, lon, cat_key, subtypes)
                rec.update(metrics)
            except GoogleAPICriticalError as e:
                logging.critical(f"FATAL: {e}. Deshabilitando Google Places para el resto.")
                api_status["places"] = False
                # Poner Nones para esta categoría y salir del bucle de categorías
                rec[f"{cat_key}_count"] = None
                break
            except Exception as e:
                logging.warning(f"Error Google Places en {name} / {cat_key} (no crítico): {e}")
                rec[f"{cat_key}_count"] = None

    # --- Air Quality (actual) ---
    if api_status["air"]:
        try:
            aq = air_quality_current(lat, lon)
            rec.update(aq)
        except GoogleAPICriticalError as e:
            logging.critical(f"FATAL: {e}. Deshabilitando Google Air Quality para el resto.")
            api_status["air"] = False
        except Exception as e:
            logging.warning(f"Error AirQuality en {name} (no crítico): {e}")

    return rec

def main():
    # Validaciones de claves
    if not GOOGLE_PLACES_API_KEY:
        raise SystemExit("Falta GOOGLE_PLACES_API_KEY en .env")
    if not GOOGLE_AIR_API_KEY:
        logging.warning("No se encontró GOOGLE_AIR_API_KEY; intentaré usar la misma de Places.")

    # Cargar CSV
    df = pd.read_csv(INPUT_CSV)
    required_cols = {COL_ID, COL_NAME, COL_LAT, COL_LON}
    missing = required_cols - set(df.columns)
    if missing:
        raise SystemExit(f"Faltan columnas en {INPUT_CSV}: {missing}")

    # Estado de las APIs (para control de errores 403/401)
    api_status = {"places": True, "air": True}

    # Procesar
    rows = []
    for i, row in df.iterrows():
        logging.info(f"[{i+1}/{len(df)}] {row[COL_NAME]}")
        rec = process_municipio(row, api_status)
        rows.append(rec)
        
        # Guardado incremental (por si se interrumpe)
        pd.DataFrame(rows).to_csv(OUT_WIDE, index=False)

    logging.info(f"OK. CSV wide: {OUT_WIDE}")

if __name__ == "__main__":
    main()

2025-11-18 23:21:06,132 [INFO] [1/155] Acebeda (La)
2025-11-18 23:21:06,133 [INFO] Municipio: Acebeda (La) (14)
2025-11-18 23:22:23,111 [INFO] [2/155] Ajalvir
2025-11-18 23:22:23,113 [INFO] Municipio: Ajalvir (29)
2025-11-18 23:24:14,841 [INFO] [3/155] Alameda del Valle
2025-11-18 23:24:14,842 [INFO] Municipio: Alameda del Valle (35)
2025-11-18 23:25:38,170 [INFO] [4/155] Álamo (El)
2025-11-18 23:25:38,171 [INFO] Municipio: Álamo (El) (40)
2025-11-18 23:27:12,079 [INFO] [5/155] Aldea del Fresno
2025-11-18 23:27:12,080 [INFO] Municipio: Aldea del Fresno (88)
2025-11-18 23:28:36,201 [INFO] [6/155] Algete
2025-11-18 23:28:36,202 [INFO] Municipio: Algete (91)
2025-11-18 23:30:18,229 [INFO] [7/155] Alpedrete
2025-11-18 23:30:18,229 [INFO] Municipio: Alpedrete (105)
2025-11-18 23:32:28,590 [INFO] [8/155] Ambite
2025-11-18 23:32:28,591 [INFO] Municipio: Ambite (112)
2025-11-18 23:33:53,478 [INFO] [9/155] Anchuelo
2025-11-18 23:33:53,479 [INFO] Municipio: Anchuelo (127)
2025-11-18 23:35:21,593