In [21]:
#!pip install selenium webdriver-manager pandas charset_normalizer
#!pip install openpyxl # For Excel support in pandas
# resumir la salida de las celdas


In [20]:
"""
scrape_dge_dengue_tendencias_full.py (v11 - Robustness and Bug Fixes)

- La función 'select_option_in_selectize' ahora reintenta la selección hasta 3 veces
  si falla, solucionando errores intermitentes de timing (como el de 'ANCASH').
- La lógica de consolidación ahora busca el nombre de la hoja de Excel de forma
  case-insensitive (ignorando mayúsculas/minúsculas), solucionando el error
  'Worksheet not found'.
- Se ha añadido una pausa explícita después de cada descarga para permitir que el
  sistema de archivos se estabilice.
"""

import os
import time
import glob
import logging
from pathlib import Path
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import ActionChains
from webdriver_manager.chrome import ChromeDriverManager

# ---------- CONFIGURACIÓN DEL USUARIO ----------
DOWNLOAD_DIR = r'D:\UNIDAD D\UNIVERSIDAD\2025-2\Modelos Fisiológicos\Modelos_fisio\datasets\CDC_datasets'
BASE_URL = "https://app7.dge.gob.pe/maps2/shiny_observatorio_web/"
WAIT = 30
HEADLESS = False
LOGLEVEL = logging.INFO
# -----------------------------------------------

try:
    os.makedirs(DOWNLOAD_DIR, exist_ok=True)
except OSError as e:
    logging.error(f"Error al crear el directorio de descarga '{DOWNLOAD_DIR}': {e}"); exit()

logging.basicConfig(level=LOGLEVEL, format="%(asctime)s - %(levelname)s - %(message)s")

# --- FUNCIONES DE AYUDA ---

def make_driver(download_dir=DOWNLOAD_DIR, headless=HEADLESS):
    opts = webdriver.ChromeOptions()
    prefs = {"download.default_directory": download_dir, "download.prompt_for_download": False, "download.directory_upgrade": True, "plugins.always_open_xlsx_externally": True}
    opts.add_experimental_option("prefs", prefs)
    opts.add_argument("--disable-blink-features=AutomationControlled"); opts.add_argument("--start-maximized")
    if headless: opts.add_argument("--headless=new"); opts.add_argument("--window-size=1920,1080")
    return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=opts)

def wait_for_js_idle(driver, pause=0.8, max_tries=60):
    for _ in range(max_tries):
        time.sleep(pause)
        try:
            if not any(o.is_displayed() for o in driver.find_elements(By.CSS_SELECTOR, "div.shiny-load-requests-waiting, div.shiny-busy")): return True
        except Exception: pass
    logging.warning("wait_for_js_idle alcanzó el máximo de intentos."); return False

def ensure_download_completed(download_dir, before_files, timeout=180):
    start = time.time()
    while time.time() - start < timeout:
        new = set(os.listdir(download_dir)) - before_files
        if new and not any(f.endswith((".crdownload", ".tmp")) for f in new):
            time.sleep(2) # Pausa extra para que el sistema de archivos libere el archivo
            return list(new)
        time.sleep(1)
    return []

def normalize_filename(s: str):
    if s is None: return ""
    return "".join(c if c.isalnum() else "_" for c in s).replace("__", "_").strip("_")

def expand_panel(driver, wait, panel_text):
    try:
        xpath = f"//button[contains(@class, 'accordion-button') and .//div[normalize-space()='{panel_text}']]"
        panel_button = wait.until(EC.element_to_be_clickable((By.XPATH, xpath)))
        if panel_button.get_attribute('aria-expanded') == 'false':
            logging.info(f"Expandiendo panel '{panel_text}'...")
            driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", panel_button)
            time.sleep(0.5); panel_button.click(); time.sleep(1.5)
        return True
    except Exception as e:
        logging.error(f"No se pudo expandir el panel '{panel_text}': {e}"); return False

def set_slider_to_full_range(driver, wait):
    logging.info("Ajustando el slider de período al rango completo...")
    try:
        slider_input = wait.until(EC.presence_of_element_located((By.ID, "tendencias-sel_ano")))
        min_val, max_val = slider_input.get_attribute("data-min"), slider_input.get_attribute("data-max")
        script = f"""$("#tendencias-sel_ano").data("ionRangeSlider").update({{ from: {min_val}, to: {max_val} }});"""
        driver.execute_script(script)
        logging.info(f"Slider ajustado a {min_val}-{max_val} con JavaScript."); return True
    except Exception as e:
        logging.error(f"Falló el ajuste del slider: {e}"); return False

def find_interactive_element_for_label(driver, wait, label_text):
    try:
        xpath = f"//label[normalize-space()='{label_text}']/following-sibling::div//input"
        return wait.until(EC.visibility_of_element_located((By.XPATH, xpath)))
    except Exception: logging.error(f"No se pudo encontrar un elemento para la etiqueta '{label_text}'."); return None

## MODIFICADO: Añadido sistema de reintentos para mayor robustez ##
def select_option_in_selectize(driver, input_element, option_text, retries=3):
    if not input_element: return False
    for attempt in range(retries):
        try:
            # La lógica de selección se mantiene, pero ahora dentro de un bucle de reintentos
            parent = input_element.find_element(By.XPATH, "./ancestor::div[contains(@class, 'selectize-input')]")
            driver.execute_script("arguments[0].click();", parent)
            time.sleep(0.5); input_element.send_keys(Keys.CONTROL + "a"); input_element.send_keys(Keys.BACK_SPACE)
            time.sleep(0.3); input_element.send_keys(option_text); time.sleep(1.5) # Aumentada la espera
            option_xpath = f"//div[contains(@class,'selectize-dropdown-content')]//div[normalize-space()='{option_text}']"
            option_to_click = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.XPATH, option_xpath)))
            option_to_click.click(); wait_for_js_idle(driver)
            return True # Si tiene éxito, sale de la función
        except Exception as e:
            logging.warning(f"Intento {attempt + 1}/{retries} para seleccionar '{option_text}' falló. Reintentando...")
            ActionChains(driver).send_keys(Keys.ESCAPE).perform() # Intentar cerrar el dropdown para reintentar
            time.sleep(1) # Pausa antes del siguiente intento
    logging.error(f"Error final seleccionando '{option_text}' después de {retries} intentos.")
    return False

def get_options_from_selectize(driver, input_element, exclude=None):
    # (Función sin cambios)
    options = []
    if not input_element: return options
    exclude_lower = [e.lower() for e in (exclude or [])]
    try:
        parent = input_element.find_element(By.XPATH, "./ancestor::div[contains(@class, 'selectize-input')]")
        driver.execute_script("arguments[0].click();", parent); time.sleep(1)
        opts = driver.find_elements(By.XPATH, "//div[contains(@class, 'selectize-dropdown') and not(contains(@style,'display: none'))]//div[@class='option']")
        for o in opts:
            t = o.text.strip()
            if t and t.lower() not in ("seleccione", "todos", "...", "nacional") and t.lower() not in exclude_lower:
                options.append(t)
        ActionChains(driver).send_keys(Keys.ESCAPE).perform()
    except Exception as e:
        logging.error(f"Error obteniendo opciones del selectize: {e}")
    return list(dict.fromkeys(options))

def main():
    driver = make_driver()
    wait = WebDriverWait(driver, WAIT)
    try:
        logging.info("Abriendo la aplicación...")
        driver.get(BASE_URL); wait_for_js_idle(driver)

        logging.info("Ir a pestaña Tendencias...")
        wait.until(EC.element_to_be_clickable((By.XPATH, "//a[@data-value='tendencias']"))).click()
        time.sleep(3); wait_for_js_idle(driver)

        logging.info("Configurando filtros iniciales...")
        select_option_in_selectize(driver, find_interactive_element_for_label(driver, wait, "Enfermedad/episodio"), "Dengue")
        select_option_in_selectize(driver, find_interactive_element_for_label(driver, wait, "Tipo de diagnóstico o forma clínica"), "TOTAL")

        if not expand_panel(driver, wait, "Periodo de interés") or not set_slider_to_full_range(driver, wait): return
        if not expand_panel(driver, wait, "Lugar de interés"): return
        
        dep_input = find_interactive_element_for_label(driver, wait, "Departamento")
        departamentos = get_options_from_selectize(driver, dep_input)
        
        if not departamentos: logging.error("No se pudieron obtener departamentos."); return
        logging.info(f"Se descargará un archivo por cada uno de los {len(departamentos)} departamentos.")

        for dep in departamentos:
            logging.info(f"=== Procesando Departamento: {dep.upper()} ===")
            if not select_option_in_selectize(driver, dep_input, dep):
                logging.warning(f"No se pudo seleccionar {dep}, saltando al siguiente."); continue

            try:
                logging.info("  -> Pulsando 'Click para mostrar'...")
                wait.until(EC.element_to_be_clickable((By.ID, "tendencias-btn_zona"))).click()
                logging.info("  -> Esperando a que se generen los datos...")
                wait_for_js_idle(driver, pause=2, max_tries=90)

                logging.info("  -> Buscando botón 'Descargar base'...")
                download_link = wait.until(EC.element_to_be_clickable((By.ID, "tendencias-down_climasalud")))
                
                before = set(os.listdir(DOWNLOAD_DIR))
                download_link.click()
                
                downloaded = ensure_download_completed(DOWNLOAD_DIR, before)
                if not downloaded:
                    logging.warning("  -> No se detectó archivo descargado (timeout)."); continue
                
                old_path = os.path.join(DOWNLOAD_DIR, downloaded[0])
                new_name = f"dengue_completo_{normalize_filename(dep)}.xlsx"
                new_path = os.path.join(DOWNLOAD_DIR, new_name)

                if os.path.exists(new_path): os.remove(new_path)
                os.rename(old_path, new_path)
                logging.info(f"  -> ¡Éxito! Archivo guardado como: {new_name}")

            except Exception as e:
                logging.error(f"  -> Falló la secuencia de descarga para {dep}: {e}")

        # --- CONSOLIDACIÓN FINAL DESDE ARCHIVOS EXCEL ---
        logging.info("="*50 + "\nPROCESO DE DESCARGA FINALIZADO. CONSOLIDANDO ARCHIVOS...\n" + "="*50)
        
        all_xlsx = glob.glob(os.path.join(DOWNLOAD_DIR, "*.xlsx"))
        if not all_xlsx:
            logging.error("No se encontraron archivos .xlsx para consolidar."); return

        df_list = []
        for f_path in all_xlsx:
            try:
                logging.info(f"Procesando archivo: {os.path.basename(f_path)}")
                xls = pd.ExcelFile(f_path)
                
                ## MODIFICADO: Lógica para encontrar la hoja 'Distrito' sin importar mayúsculas/minúsculas ##
                target_sheet = None
                for sheet_name in xls.sheet_names:
                    if sheet_name.lower() == 'distrito':
                        target_sheet = sheet_name
                        break
                
                if target_sheet:
                    logging.info(f"  -> Encontrada hoja '{target_sheet}'. Leyendo datos...")
                    df = pd.read_excel(xls, sheet_name=target_sheet)
                    df['origen_archivo'] = os.path.basename(f_path)
                    df_list.append(df)
                else:
                    # Si no se encuentra 'distrito', se puede intentar con la última hoja como fallback o simplemente reportar
                    logging.warning(f"  -> No se encontró una hoja llamada 'Distrito' en el archivo. Hojas disponibles: {xls.sheet_names}")

            except Exception as e:
                logging.warning(f"No se pudo procesar el archivo {os.path.basename(f_path)}: {e}")

        if not df_list:
            logging.error("No se pudo leer datos de ningún archivo Excel. No se generará el consolidado."); return

        final_df = pd.concat(df_list, ignore_index=True)
        output_path = os.path.join(Path(DOWNLOAD_DIR).parent, "dengue_consolidado_distrital_final.csv")
        final_df.to_csv(output_path, index=False)

        logging.info("="*50)
        logging.info(f"¡PROCESO COMPLETADO CON ÉXITO!\nArchivo consolidado en: {output_path}")
        logging.info(f"El dataset final tiene {final_df.shape[0]} filas y {final_df.shape[1]} columnas.\n" + "="*50)

    except Exception:
        logging.error("Ocurrió un error fatal en el proceso principal.", exc_info=True)
    finally:
        driver.quit()

if __name__ == "__main__":
    main()

2025-09-16 23:05:18,211 - INFO - Get LATEST chromedriver version for google-chrome
2025-09-16 23:05:18,411 - INFO - Get LATEST chromedriver version for google-chrome
2025-09-16 23:05:18,625 - INFO - Driver [C:\Users\Alvaro\.wdm\drivers\chromedriver\win64\140.0.7339.82\chromedriver-win32/chromedriver.exe] found in cache
2025-09-16 23:05:19,693 - INFO - Abriendo la aplicación...
2025-09-16 23:05:27,786 - INFO - Ir a pestaña Tendencias...
2025-09-16 23:05:31,717 - INFO - Configurando filtros iniciales...
2025-09-16 23:05:41,182 - INFO - Expandiendo panel 'Periodo de interés'...
2025-09-16 23:05:43,257 - INFO - Ajustando el slider de período al rango completo...
2025-09-16 23:05:43,293 - INFO - Slider ajustado a 2017-2025 con JavaScript.
2025-09-16 23:05:43,329 - INFO - Expandiendo panel 'Lugar de interés'...
2025-09-16 23:05:46,794 - INFO - Se descargará un archivo por cada uno de los 24 departamentos.
2025-09-16 23:05:46,795 - INFO - === Procesando Departamento: AMAZONAS ===
2025-09-16 2