**Código para la descarga de Índices de Consumo del CIS**

Este código usa un archivo Excel descargado de esta url https://www.cis.es/catalogo-estudios/resultados-definidos/icc para recorrer uno a uno todos los informes de la serie temporal y descargar su información en formato CSV (o en formato SAV, para luego transformarlos en CSV)

In [17]:
import os
import time
import zipfile
import pandas as pd
import traceback
import pyreadstat  # Asegúrate de tener instalado: pip install pyreadstat
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.action_chains import ActionChains

def convert_sav_to_csv(sav_file_path, csv_file_path):
    try:
        # Leer el archivo SAV utilizando pyreadstat
        df, meta = pyreadstat.read_sav(sav_file_path)
        # Guardar en formato CSV
        df.to_csv(csv_file_path, index=False, encoding='utf-8')
        print(f"Archivo SAV convertido con éxito: {csv_file_path}")
    except Exception as e:
        print(f"Error al convertir el archivo SAV: {e}")

# CONFIGURACIÓN
EXCEL_PATH = r"C:\Users\emilo\Downloads\ICC - CIS (1).xlsx"  # Ruta del Excel guía
DOWNLOAD_DIR = r"C:\Users\emilo\Downloads\descargas_cis"         # Carpeta para los ZIP descargados
EXTRACTED_DIR = r"C:\Users\emilo\Downloads\csv_extraidos"          # Carpeta para los CSV extraídos
EMAIL = "emilio.cobos@bde.es"                                     # Tu email

# Crear carpetas si no existen
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(EXTRACTED_DIR, exist_ok=True)

# CONFIGURAR CHROME PARA DESCARGAS AUTOMÁTICAS
options = Options()
# Sin headless para ver la interacción (comenta la siguiente línea si quieres verlo)
# options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
prefs = {
    "download.default_directory": DOWNLOAD_DIR,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)

# Iniciar WebDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)

# LEER EXCEL
df = pd.read_excel(EXCEL_PATH)
df.columns = df.columns.str.strip().str.upper()

if "FECHA" not in df.columns or "CÓDIGO ESTUDIO" not in df.columns:
    raise ValueError("El Excel no tiene las columnas esperadas ('FECHA', 'CÓDIGO ESTUDIO')")

# Convertir la columna FECHA a formato YYYYMM
df["FECHA"] = pd.to_datetime(df["FECHA"], format="%d-%m-%Y", errors="coerce").dt.strftime("%Y%m")

# PROCESAR CADA REGISTRO
for index, row in df.iterrows():
    codigo = row["CÓDIGO ESTUDIO"]
    fecha = row["FECHA"]
    zip_filename = f"MD{codigo}.zip"
    zip_path = os.path.join(DOWNLOAD_DIR, zip_filename)

    # Construir la URL de descarga
    url_descarga = f"https://www.cis.es/descarga-fichero-datos?anio={fecha[:4]}&codEstudio={codigo}&fichero=MD{codigo}.zip"
    print(f"\nProcesando código {codigo}, fecha {fecha}")
    driver.get(url_descarga)
    time.sleep(2)  # Espera a que cargue la página

    try:
        # Rellenar el campo Email
        email_input = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='email']"))
        )
        email_input.clear()
        email_input.send_keys(EMAIL)
        print("Campo email completado.")

        # Rellenar el campo Confirmar email (usando XPath para buscar 'confirmationField' en el id)
        confirm_email_input = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.XPATH, "//input[contains(@id, 'confirmationField')]"))
        )
        confirm_email_input.clear()
        confirm_email_input.send_keys(EMAIL)
        print("Campo confirmar email completado.")

        # Localizar y clicar el checkbox (a través del span asociado, usando un XPath insensible a mayúsculas)
        checkbox_label = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.XPATH, "//span[contains(@class, 'custom-control-label-text') and contains(translate(.,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'conoce y acepta los terminos')]"))
        )
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", checkbox_label)
        time.sleep(1)
        driver.execute_script("arguments[0].click();", checkbox_label)
        print("Checkbox clicado.")

        # Localizar el botón de Enviar
        submit_button = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.ID, "ddm-form-submit"))
        )
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", submit_button)
        time.sleep(1)
        submit_button.click()
        print("Botón Enviar clicado.")

        # Esperar 5 segundos para que se produzca la redirección a la página de descarga
        time.sleep(5)

        # Esperar a que aparezca el título de la página de descarga
        descarga_header = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.ID, "tituloArchivo"))
        )
        print("Página de descarga cargada.")

        # Localizar el botón "Descargar"
        descargar_button = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.XPATH, "//a[contains(@class, 'btn btn-sm btn-primary') and contains(.,'Descargar')]"))
        )
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", descargar_button)
        time.sleep(1)
        descargar_button.click()
        print("Botón Descargar clicado.")

        # Esperar 5 segundos para dar tiempo a que se inicie la descarga
        time.sleep(5)

        # Esperar a que el ZIP se descargue correctamente (verificando que no existan archivos temporales)
        download_wait = 0
        max_wait = 60  # segundos máximos de espera
        while download_wait < max_wait:
            if os.path.exists(zip_path) and not any(fname.endswith(".crdownload") for fname in os.listdir(DOWNLOAD_DIR)):
                break
            time.sleep(2)
            download_wait += 2

        if not os.path.exists(zip_path):
            raise Exception(f"El archivo {zip_filename} no se descargó en el tiempo esperado.")
        print("Archivo ZIP descargado.")

        # Extraer el archivo CSV (nativo) o, si no se encuentra, buscar un archivo SAV y convertirlo a CSV
        extracted = False
        with zipfile.ZipFile(zip_path, "r") as z:
            # Buscar primero un archivo que termine en _num.csv
            for file in z.namelist():
                if file.endswith("_num.csv"):
                    extracted_path = os.path.join(EXTRACTED_DIR, f"{fecha}_num.csv")
                    with open(extracted_path, "wb") as f:
                        f.write(z.read(file))
                    print(f"Extraído CSV nativo: {extracted_path}")
                    extracted = True
                    break
            # Si no se encontró, buscar un archivo que termine en .sav y convertirlo
            if not extracted:
                for file in z.namelist():
                    if file.endswith(".sav"):
                        sav_temp_path = os.path.join(DOWNLOAD_DIR, file)
                        with open(sav_temp_path, "wb") as f:
                            f.write(z.read(file))
                        extracted_path = os.path.join(EXTRACTED_DIR, f"{fecha}_num.csv")
                        convert_sav_to_csv(sav_temp_path, extracted_path)
                        extracted = True
                        # Eliminar el archivo SAV temporal
                        os.remove(sav_temp_path)
                        break
            if not extracted:
                raise Exception("No se encontró el archivo CSV o SAV dentro del ZIP.")

        print(f"Archivo procesado y convertido para código {codigo} (fecha {fecha}).")

        # Eliminar el ZIP tras extraer (y convertir) el archivo
        os.remove(zip_path)
        print(f"Archivo ZIP {zip_filename} eliminado.")

    except Exception as e:
        print(f"Error en formulario para código {codigo}: {e}")
        print(traceback.format_exc())
        if os.path.exists(zip_path):
            os.remove(zip_path)
        continue  # No pasa al siguiente registro hasta que se complete exitosamente

driver.quit()
print("Proceso completado.")



Procesando código 3491, fecha 202412
Campo email completado.
Campo confirmar email completado.
Checkbox clicado.
Botón Enviar clicado.
Página de descarga cargada.
Botón Descargar clicado.
Archivo ZIP descargado.
Extraído CSV nativo: C:\Users\emilo\Downloads\csv_extraidos\202412_num.csv
Archivo procesado y convertido para código 3491 (fecha 202412).
Archivo ZIP MD3491.zip eliminado.

Procesando código 3488, fecha 202411
Campo email completado.
Campo confirmar email completado.
Checkbox clicado.
Botón Enviar clicado.
Página de descarga cargada.
Botón Descargar clicado.
Archivo ZIP descargado.
Extraído CSV nativo: C:\Users\emilo\Downloads\csv_extraidos\202411_num.csv
Archivo procesado y convertido para código 3488 (fecha 202411).
Archivo ZIP MD3488.zip eliminado.

Procesando código 3483, fecha 202410
Campo email completado.
Campo confirmar email completado.
Checkbox clicado.
Botón Enviar clicado.
Página de descarga cargada.
Botón Descargar clicado.
Archivo ZIP descargado.
Extraído CSV na

Este código recorre la misma serie extrayendo los archivos en formato DA y convirtiéndolos a formato RAW

In [26]:
import os
import time
import zipfile
import pandas as pd
import traceback
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.action_chains import ActionChains

# CONFIGURACIÓN
EXCEL_PATH = r"C:\Users\emilo\Downloads\ICC - CIS (1).xlsx"  # Ruta del Excel guía
DOWNLOAD_DIR = r"C:\Users\emilo\Downloads\descargas_cis"         # Carpeta para los ZIP descargados
EXTRACTED_DIR = r"C:\Users\emilo\Downloads\csv_extraidos"          # Carpeta para los archivos RAW (se usan la misma carpeta que antes)
EMAIL = "emilio.cobos@bde.es"                                     # Tu email

# Crear carpetas si no existen
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(EXTRACTED_DIR, exist_ok=True)

# CONFIGURAR CHROME PARA DESCARGAS AUTOMÁTICAS
options = Options()
# Sin headless para ver la interacción
# options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
prefs = {
    "download.default_directory": DOWNLOAD_DIR,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)

# Iniciar WebDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)

# LEER EXCEL
df = pd.read_excel(EXCEL_PATH)
df.columns = df.columns.str.strip().str.upper()

if "FECHA" not in df.columns or "CÓDIGO ESTUDIO" not in df.columns:
    raise ValueError("El Excel no tiene las columnas esperadas ('FECHA', 'CÓDIGO ESTUDIO')")

# Convertir la columna FECHA a formato YYYYMM
df["FECHA"] = pd.to_datetime(df["FECHA"], format="%d-%m-%Y", errors="coerce").dt.strftime("%Y%m")

# PROCESAR CADA REGISTRO
for index, row in df.iterrows():
    codigo = row["CÓDIGO ESTUDIO"]
    fecha = row["FECHA"]
    zip_filename = f"MD{codigo}.zip"
    zip_path = os.path.join(DOWNLOAD_DIR, zip_filename)

    # Construir la URL de descarga
    url_descarga = f"https://www.cis.es/descarga-fichero-datos?anio={fecha[:4]}&codEstudio={codigo}&fichero=MD{codigo}.zip"
    print(f"\nProcesando código {codigo}, fecha {fecha}")
    driver.get(url_descarga)
    time.sleep(2)  # Espera a que cargue la página

    try:
        # Rellenar el campo Email
        email_input = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='email']"))
        )
        email_input.clear()
        email_input.send_keys(EMAIL)
        print("Campo email completado.")

        # Rellenar el campo Confirmar email (usando XPath para buscar 'confirmationField' en el id)
        confirm_email_input = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.XPATH, "//input[contains(@id, 'confirmationField')]"))
        )
        confirm_email_input.clear()
        confirm_email_input.send_keys(EMAIL)
        print("Campo confirmar email completado.")

        # Localizar y clicar el checkbox (usando el span asociado, insensible a mayúsculas)
        checkbox_label = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.XPATH, "//span[contains(@class, 'custom-control-label-text') and contains(translate(.,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'conoce y acepta los terminos')]"))
        )
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", checkbox_label)
        time.sleep(1)
        driver.execute_script("arguments[0].click();", checkbox_label)
        print("Checkbox clicado.")

        # Localizar el botón de Enviar
        submit_button = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.ID, "ddm-form-submit"))
        )
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", submit_button)
        time.sleep(1)
        submit_button.click()
        print("Botón Enviar clicado.")

        # Esperar 5 segundos para que se produzca la redirección a la página de descarga
        time.sleep(5)

        # Esperar a que aparezca el título de la página de descarga
        descarga_header = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.ID, "tituloArchivo"))
        )
        print("Página de descarga cargada.")

        # Localizar el botón "Descargar"
        descargar_button = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.XPATH, "//a[contains(@class, 'btn btn-sm btn-primary') and contains(.,'Descargar')]"))
        )
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", descargar_button)
        time.sleep(1)
        descargar_button.click()
        print("Botón Descargar clicado.")

        # Esperar 5 segundos para dar tiempo a que se inicie la descarga
        time.sleep(5)

        # Esperar a que el ZIP se descargue correctamente (verificando que no existan archivos temporales)
        download_wait = 0
        max_wait = 60  # segundos máximos de espera
        while download_wait < max_wait:
            if os.path.exists(zip_path) and not any(fname.endswith(".crdownload") for fname in os.listdir(DOWNLOAD_DIR)):
                break
            time.sleep(2)
            download_wait += 2

        if not os.path.exists(zip_path):
            raise Exception(f"El archivo {zip_filename} no se descargó en el tiempo esperado.")
        print("Archivo ZIP descargado.")

        # EXTRAER: Buscar archivos cuyo nombre empieza por "DA"
        extracted = False
        with zipfile.ZipFile(zip_path, "r") as z:
            for file in z.namelist():
                # Extraer el nombre del archivo sin ruta
                base_file = os.path.basename(file)
                if base_file.startswith("DA"):
                    # Se guarda en la carpeta de salida con el nombre basado en la fecha y la etiqueta RAW
                    extracted_path = os.path.join(EXTRACTED_DIR, f"{fecha}_raw.raw")
                    with open(extracted_path, "wb") as f:
                        f.write(z.read(file))
                    print(f"Extraído RAW: {extracted_path}")
                    extracted = True
                    break
            if not extracted:
                raise Exception("No se encontró ningún archivo que empiece por 'DA' dentro del ZIP.")

        print(f"Archivo procesado para código {codigo} (fecha {fecha}).")

        # Eliminar el ZIP tras extraer el archivo RAW
        os.remove(zip_path)
        print(f"Archivo ZIP {zip_filename} eliminado.")

    except Exception as e:
        print(f"Error en formulario para código {codigo}: {e}")
        print(traceback.format_exc())
        if os.path.exists(zip_path):
            os.remove(zip_path)
        continue  # No pasa al siguiente registro hasta que se complete exitosamente

driver.quit()
print("Proceso completado.")



Procesando código 3488, fecha 202411
Campo email completado.
Campo confirmar email completado.
Checkbox clicado.
Botón Enviar clicado.
Página de descarga cargada.
Botón Descargar clicado.
Archivo ZIP descargado.
Extraído RAW: C:\Users\emilo\Downloads\csv_extraidos\202411_raw.raw
Archivo procesado para código 3488 (fecha 202411).
Archivo ZIP MD3488.zip eliminado.

Procesando código 3483, fecha 202410
Campo email completado.
Campo confirmar email completado.
Checkbox clicado.
Botón Enviar clicado.
Página de descarga cargada.
Botón Descargar clicado.
Archivo ZIP descargado.
Extraído RAW: C:\Users\emilo\Downloads\csv_extraidos\202410_raw.raw
Archivo procesado para código 3483 (fecha 202410).
Archivo ZIP MD3483.zip eliminado.

Procesando código 3477, fecha 202409
Campo email completado.
Campo confirmar email completado.
Checkbox clicado.
Botón Enviar clicado.
Página de descarga cargada.
Botón Descargar clicado.
Archivo ZIP descargado.
Extraído RAW: C:\Users\emilo\Downloads\csv_extraidos\20