# T5.1 Panel de control con PowerBI
## Ralphy Núñez Mercado
Hai que facer un proxecto con PowerBI que teña os seguintes elementosPower BI:

Dous informes (e vista para móbiles e ordenador por cada informe).
Alomenos 2 gráficas ou elementos gráficos ordinarios de PowerBI.
1 Mapa (aínda que sexa con outros datos sen relación).
Dúas orixes de datos, por exemplo un excel e un JSON.
Relacións entre os datos (alomenos algunha que teña sentido).
Un obxecto visual de Python.
Un orixe de datos cun script de Python.
Emprega varios orixes de datos (diferentes) para comparar.

Na práctica debe verse algo de:

Scrapping
PANDAS
Spark-HDFS <-> PowerBi

E ademáis que quede bonito o deseño do informe :)

Deberase facer con PowerBI Desktop e publicarase a app.powerbi.com coa conta @fernandowirtz.com.
Subirase a esta tarefa o arquivo .pbix. Se é moi grande para subir, darase unha ligazón ao arquivo en OneDrive (coa conta @fernandowirtz.com).
Compartirase a ligazón ao informe publicado en app.powerbi.com tamén co profe (pode poñerse no texto da tarefa ou engadir ao membro da organización co email: scj@fernandowirtz.com)

## ⬇️ Instalar las librerías

In [None]:
!conda install pip -y || true
!conda install -c conda-forge selenium -y || true
!pip install webdriver_manager || true
!conda install pandas -y || true
!conda install sqlalchemy -y || true
!pip install pyodbc -y || true
!pip install pypdf -y || true
!pip install geopy 

^C


## 🦎 Instalar gecko Driver

In [None]:
from webdriver_manager.firefox import GeckoDriverManager
GeckoDriverManager().install()

'C:\\Users\\ralphy.nunezmercado\\.wdm\\drivers\\geckodriver\\win64\\v0.36.0\\geckodriver.exe'

## ⬇️ Importar las librerías

In [51]:
import time
import random
import re
import csv
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.firefox.service import Service as FirefoxService
from webdriver_manager.firefox import GeckoDriverManager
import pandas as pd
from pypdf import PdfReader, PdfWriter
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
import tabula.io

## Scraping de AutoScout24

In [None]:
# Función para cerrar ventanas emergentes (modales) si aparecen en la página
def cerrar_overlays(driver):
    overlays_xpaths = [
        "//button[contains(@class, 'modal-box__header__close')]",
        "//button[contains(@class, 'close')]",
        "//button[contains(@aria-label, 'Cerrar')]",
        "//button[contains(@aria-label, 'Close')]",
        "//button[contains(@id, 'save-search-list-button')]"
    ]
    for xpath in overlays_xpaths:
        try:
            close_btn = driver.find_element(By.XPATH, xpath)
            if close_btn.is_displayed():
                close_btn.click()
                time.sleep(1)
        except Exception:
            pass

# Limpia el valor textual de datos no válidos o irrelevantes
def limpiar_valor(valor):
    if not valor:
        return ''
    valor = valor.strip()
    textos_invalidos = [
        '- Tipo de combustible', '- Transmisión', '- (Año)', '- Año', '- Kilometraje',
        '- Ubicación', '- Código postal', 'unknown', 'desconocido', 'n/a', 'no disponible'
    ]
    if valor.startswith('-'):
        return ''
    if valor.lower() in [t.lower() for t in textos_invalidos]:
        return ''
    return valor

# Extrae el código postal y la localidad de un texto de dirección
def extraer_ubicacion_cp(texto):
    if not texto:
        return '', ''
    match_cp = re.search(r'ES-(\d{4,5})', texto)
    codigo_postal = match_cp.group(1).zfill(5) if match_cp else ''
    ubicacion = ''
    if match_cp:
        resto = texto[match_cp.end():].strip()
        resto = re.sub(r'^[\s\W_]+', '', resto)
        if resto:
            ubicacion = resto.upper()
    return ubicacion, codigo_postal

# Configura Firefox con un user-agent específico
opciones = webdriver.FirefoxOptions()
opciones.set_preference("general.useragent.override", 
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0")

# Inicia el navegador con geckodriver y opciones
driver = webdriver.Firefox(service=FirefoxService(GeckoDriverManager().install()), options=opciones)
wait = WebDriverWait(driver, 40)

# Abre la página principal de búsqueda de coches
url = "https://www.autoscout24.es/lst?sort=standard&desc=0&ustate=N%2CU&atype=C&cy=E&source=homepage_search-mask"
driver.get(url)

# Acepta cookies si aparece el banner
try:
    btn_cookies = wait.until(
        EC.element_to_be_clickable((By.XPATH, "//button[contains(., 'Aceptar') or contains(., 'Aceptar todas') or contains(., 'Aceptar todo')]"))
    )
    btn_cookies.click()
    print("Cookies aceptadas")
    time.sleep(1)
except Exception:
    print("No apareció banner de cookies o ya estaba aceptado")

datos_coches = []

# Muestra todas las marcas disponibles
wait.until(EC.element_to_be_clickable((By.ID, "make-input-primary-filter"))).click()
time.sleep(1)
wait.until(EC.presence_of_element_located((By.XPATH, '//ul[@id="make-input-primary-filter-suggestions"]')))
sugerencias = driver.find_elements(By.XPATH, '//ul[@id="make-input-primary-filter-suggestions"]/li[@role="option"]')

# Extrae el nombre de todas las marcas
marcas = []
for sug in sugerencias:
    marca = sug.text.strip()
    if marca and not "divider" in sug.get_attribute("class"):
        marcas.append(marca)

print(f"Marcas encontradas: {len(marcas)}")

# Itera por cada marca encontrada
for idx, marca_nombre in enumerate(marcas):
    print(f"\nScrapeando la marca: {marca_nombre} ({idx+1}/{len(marcas)})")
    driver.get(url)
    wait.until(EC.element_to_be_clickable((By.ID, "make-input-primary-filter"))).click()
    time.sleep(1)
    wait.until(EC.presence_of_element_located((By.XPATH, '//ul[@id="make-input-primary-filter-suggestions"]')))
    marca_encontrada = False

    # Selecciona la marca actual en el filtro
    try:
        sugerencias_actuales = driver.find_elements(By.XPATH, '//ul[@id="make-input-primary-filter-suggestions"]/li[@role="option"]')
        for sug in sugerencias_actuales:
            if sug.text.strip().lower() == marca_nombre.lower():
                driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", sug)
                time.sleep(0.5)
                driver.execute_script("arguments[0].click();", sug)
                time.sleep(1)
                # Cierra modal si aparece
                for _ in range(4):
                    if driver.find_elements(By.XPATH, "//button[contains(@class, 'modal-box__header__close')]"):
                        cerrar_overlays(driver)
                        break
                    time.sleep(0.5)
                marca_encontrada = True
                break
        if not marca_encontrada:
            print(f"Marca no seleccionada: {marca_nombre}")
            continue
    except Exception as e:
        print(f"Marca no seleccionada: {marca_nombre} - {e}")
        continue

    # Espera que se carguen los resultados
    try:
        wait.until(EC.presence_of_all_elements_located((By.XPATH, '//article[@data-testid="list-item"]')))
    except Exception:
        print("No se hay coches para esta marca.")
        continue

    # Scraping de hasta 20 páginas por marca
    pagina = 1
    while pagina <= 20:
        print(f"Scrapeando página {pagina} de {marca_nombre}...")
        try:
            wait.until(EC.presence_of_all_elements_located((By.XPATH, '//article[@data-testid="list-item"]')))
        except Exception:
            print("No se encontraron coches en la página, saliendo del bucle.")
            break

        coches = driver.find_elements(By.XPATH, '//article[@data-testid="list-item"]')

        # Extrae información de cada coche
        for articulo in coches:
            try:
                marca = limpiar_valor(articulo.get_attribute('data-make') or marca_nombre)
                modelo = limpiar_valor(articulo.get_attribute('data-model') or '')
                precio = limpiar_valor(articulo.get_attribute('data-price') or '')

                # Kilometraje
                try:
                    km_text = articulo.find_element(By.XPATH, './/*[@data-testid="VehicleDetails-mileage_road"]').text
                    km = limpiar_valor(re.sub(r'[^\d]', '', km_text))
                except:
                    km = ''

                # Año
                try:
                    year_text = articulo.find_element(By.XPATH, './/*[@data-testid="VehicleDetails-calendar"]').text
                    year = year_text.split('/')[-1] if '/' in year_text else year_text
                    year = limpiar_valor(year)
                except:
                    year = ''

                # Combustible
                try:
                    combustible = articulo.find_element(By.XPATH, './/*[@data-testid="VehicleDetails-gas_pump"]').text
                    combustible = limpiar_valor(combustible)
                except:
                    combustible = ''

                # Caballos (CV)
                try:
                    cv_text = articulo.find_element(By.XPATH, './/*[@data-testid="VehicleDetails-speedometer"]').text
                    match = re.search(r'\((\d+)\s*CV\)', cv_text)
                    cv = match.group(1) if match else ''
                    cv = limpiar_valor(cv)
                except:
                    cv = ''

                # Ubicación (profesional o particular)
                ubicacion_raw = ''
                try:
                    ubicacion_raw = articulo.find_element(By.XPATH, './/*[@data-testid="sellerinfo-address"]').text
                except:
                    pass
                if not ubicacion_raw:
                    try:
                        ubicacion_raw = articulo.find_element(By.XPATH, ".//*[contains(@class, 'SellerInfo_private')]").text
                    except:
                        pass

                if ubicacion_raw:
                    ubicacion, codigo_postal = extraer_ubicacion_cp(ubicacion_raw)
                else:
                    ubicacion = ''
                    codigo_postal = ''

                # Transmisión
                try:
                    transmision = articulo.find_element(By.XPATH, './/*[@data-testid="VehicleDetails-transmission"]').text
                    transmision = limpiar_valor(transmision)
                except:
                    transmision = ''

                # Guardar datos
                datos_coches.append([
                    marca.capitalize(), modelo, precio, km, year,
                    combustible, cv, ubicacion, transmision, codigo_postal
                ])
            except Exception as e:
                print(f"Error extrayendo un coche: {e}")
                continue

        # Pasa a la siguiente página si existe
        try:
            paginacion = driver.find_element(By.XPATH, '//*[@data-testid="listpage-pagination"]')
            next_btn_li = paginacion.find_element(By.XPATH, ".//li[contains(@class, 'prev-next') and not(contains(@class, 'previous'))]")
            next_btn = next_btn_li.find_element(By.TAG_NAME, "button")
            if next_btn.get_attribute("aria-disabled") == "true":
                print("No hay más páginas para esta marca.")
                break

            ultimo_articulo = coches[-1]
            next_btn.click()
            pagina += 1
            wait.until(EC.staleness_of(ultimo_articulo))
            wait.until(EC.presence_of_all_elements_located((By.XPATH, '//article[@data-testid="list-item"]')))
            time.sleep(random.uniform(2, 4))
        except Exception as e:
            print("No hay más páginas o no se encontró el botón de siguiente para esta marca.")
            break

# Guarda todos los datos extraídos en un archivo CSV
with open('coches.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerow([
        'Marca', 'Modelo', 'Precio', 'KM', 'Año',
        'Tipo_de_combustible', 'CV', 'Ubicacion', 'Transmision', 'Codigo_postal'
    ])
    writer.writerows(datos_coches)

print(f"¡Datos guardados en coches.csv! Total coches: {len(datos_coches)}")
driver.quit()


Cookies aceptadas
Marcas encontradas: 269

Scrapeando la marca: Mercedes-Benz (1/269)
Scrapeando página 1 de Mercedes-Benz...
Scrapeando página 2 de Mercedes-Benz...
Scrapeando página 3 de Mercedes-Benz...
Scrapeando página 4 de Mercedes-Benz...
Scrapeando página 5 de Mercedes-Benz...
Scrapeando página 6 de Mercedes-Benz...
Scrapeando página 7 de Mercedes-Benz...
Scrapeando página 8 de Mercedes-Benz...
Scrapeando página 9 de Mercedes-Benz...
Scrapeando página 10 de Mercedes-Benz...
Scrapeando página 11 de Mercedes-Benz...
Scrapeando página 12 de Mercedes-Benz...
Scrapeando página 13 de Mercedes-Benz...
Scrapeando página 14 de Mercedes-Benz...
Scrapeando página 15 de Mercedes-Benz...
Scrapeando página 16 de Mercedes-Benz...
Scrapeando página 17 de Mercedes-Benz...
Scrapeando página 18 de Mercedes-Benz...
Scrapeando página 19 de Mercedes-Benz...
Scrapeando página 20 de Mercedes-Benz...
No hay más páginas para esta marca.

Scrapeando la marca: BMW (2/269)
Scrapeando página 1 de BMW...
Scr

### Asignar a cada coche su comunidad autónoma mediante el código postal

In [None]:
# Diccionario que asigna los dos primeros dígitos del código postal a su comunidad autónoma correspondiente
cp_to_comunidad = {
    '01': 'País Vasco',
    '02': 'Andalucía',
    '03': 'Comunidad Valenciana',
    '04': 'Andalucía',
    '05': 'Andalucía',
    '06': 'Extremadura',
    '07': 'Baleares',
    '08': 'Cataluña',
    '09': 'Castilla y León',
    '10': 'Extremadura',
    '11': 'Andalucía',
    '12': 'Comunidad Valenciana',
    '13': 'Castilla-La Mancha',
    '14': 'Andalucía',
    '15': 'Galicia',
    '16': 'Castilla-La Mancha',
    '17': 'Cataluña',
    '18': 'Andalucía',
    '19': 'Castilla-La Mancha',
    '20': 'País Vasco',
    '21': 'Andalucía',
    '22': 'Aragón',
    '23': 'Andalucía',
    '24': 'Castilla y León',
    '25': 'Cataluña',
    '26': 'La Rioja',
    '27': 'Galicia',
    '28': 'Madrid',
    '29': 'Andalucía',
    '30': 'Región de Murcia',
    '31': 'Navarra',
    '32': 'Galicia',
    '33': 'Asturias',
    '34': 'Castilla y León',
    '35': 'Canarias',
    '36': 'Galicia',
    '37': 'Castilla y León',
    '38': 'Canarias',
    '39': 'Cantabria',
    '40': 'Castilla y León',
    '41': 'Andalucía',
    '42': 'Castilla y León',
    '43': 'Cataluña',
    '44': 'Aragón',
    '45': 'Castilla-La Mancha',
    '46': 'Comunidad Valenciana',
    '47': 'Castilla y León',
    '48': 'País Vasco',
    '49': 'Castilla y León',
    '50': 'Aragón',
    '51': 'Ceuta',
    '52': 'Melilla'
}

# Función que devuelve la comunidad autónoma a partir de un código postal
def obtener_comunidad(codigo_postal):
    if pd.isna(codigo_postal):
        return "Desconocida"  # Si el valor es nulo o NaN
    codigo_postal = str(codigo_postal).zfill(5)  # Asegura que tenga 5 dígitos (añade ceros a la izquierda si es necesario)
    prefijo = codigo_postal[:2]  # Extrae los dos primeros dígitos
    return cp_to_comunidad.get(prefijo, "Desconocida")  # Devuelve la comunidad o "Desconocida" si no se encuentra

# Carga el archivo 'coches.csv' y se asegura de que la columna 'Codigo_postal' se lea como texto (no como número)
df = pd.read_csv("coches.csv", dtype={"Codigo_postal": str})

# Convierte la columna 'Año' a tipo entero, permitiendo valores nulos (usa 'Int64', que permite nulos)
df['Año'] = df['Año'].astype(float).astype('Int64')

# Aplica la función obtener_comunidad a cada código postal, y guarda el resultado en una nueva columna
df['Comunidad_autonoma'] = df['Codigo_postal'].apply(obtener_comunidad)

# Guarda el DataFrame actualizado en el mismo archivo CSV
df.to_csv("coches.csv", index=False)

print("Archivo generado: coches.csv")


Archivo generado: coches.csv


## Emisiones de PDF a JSON

### · Eliminar páginas sin tablas del pdf

Para poder leer el pdf necesitaremos la librería pypdf despues simplemente leemos el pdf, saltamos la primera página y escribimos las dos siguientes

In [52]:
reader = PdfReader(".\\EMISIONES DE GEI POR COMUNIDADES AUTÓNOMAS Ed. 2024.pdf")
writer = PdfWriter()

# Eliminar la primera página (índice 0)
for i, page in enumerate(reader.pages):
    if i != 0:  # saltamos la primera página
        writer.add_page(page)

with open("Emisiones_sin_primera_pagina.pdf", "wb") as f:
    writer.write(f)

### · Extracción de las dos tablas del pdf de emisiones

Con Tabula extraemos las dos tablas que hay en el PDF y con cada una haremos un CSV independiente. Esto es porque las tablas tienen una forma peculiar que dificulta la extracción de datos fácilmente con Tabula, así que lo que haremos es tener dos CSV, formatear cada uno de ellos, después juntarlos y tener un solo CSV. Al final, convertiremos el CSV a formato long para que sea más fácil la manipulación de los datos en Power BI.

In [None]:
from tabula.io import convert_into
from tabula.io import read_pdf

pdf = ".\\Emisiones_sin_primera_pagina.pdf"

# Extraer tabla de la página 1
tables_page1 = tabula.io.read_pdf(pdf, pages=1, multiple_tables=False, lattice=True)
tables_page1[0].to_csv("tabla_pagina_1.csv", index=False)

# Extraer tabla de la página 2
tables_page2 = tabula.io.read_pdf(pdf, pages=2, multiple_tables=False, lattice=True)
tables_page2[0].to_csv("tabla_pagina_2.csv", index=False)

In [None]:
def formatear(df):
    df.rename(columns={df.columns[0]: "Comunidad Autónoma"}, inplace=True)
    df["Comunidad Autónoma"] = df["Comunidad Autónoma"].str.strip().str.upper()
    df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
    df.columns = ["Comunidad Autónoma"] + [str(int(float(c))) for c in df.columns[1:]]

    for col in df.columns[1:]:
        df[col] = df[col].astype(str).str.replace('.', '', regex=False).astype(int)

    df.set_index("Comunidad Autónoma", inplace=True)
    return df

# Carga los dos CSV (ajusta las rutas a tus archivos)
df1 = pd.read_csv("tabla_pagina_1.csv", skiprows=1)  # Salta la fila de encabezados duplicados
df2 = pd.read_csv("tabla_pagina_2.csv", skiprows=1)

df1 = formatear(df1)
df2 = formatear(df2)


# Unir horizontalmente por comunidad autónoma
df_final = pd.concat([df1, df2], axis=1)

# Ordena las columnas por año en orden cronológico
df_final = df_final.reindex(sorted(df_final.columns, key=int), axis=1)

df_final.reset_index(inplace=True)

df_final.to_csv("emisiones_co2.csv", index=False)

### · Limpiar datos y pasar el csv a long format

Aquí usaramos melt para tener un csv más utilizable en PowerBi y además eliminaremos la fila de Total de España ya que nos es irrelevante y podemos calcularlo facilmente en PowerBI

In [None]:
df = pd.read_csv('.\\emisiones_co2.csv')

# Eliminar la fila de TOTAL DE ESPAÑA
df = df[df['Comunidad Autónoma'] != 'TOTAL DE ESPAÑA']

# Convertir al formato largo
df_long = pd.melt(
    df,
    id_vars=['Comunidad Autónoma'],
    var_name='Año',
    value_name='CO2'
)

# Ordenar por Comunidad Autónoma y Año
df_long = df_long.sort_values(by=['Comunidad Autónoma', 'Año'])

df_long.to_csv('emisiones_co2.csv', index=False)


### · Añadir coordenadas mediante el nombre de la comunidad autónoma

Para poder hacer esto nos ayudaremos de geopy y crearemos dos nuevas columnas con la latitud y la longitud

In [None]:
df = pd.read_csv("emisiones_co2.csv")

# Obtener la lista de comunidades diferentes
comunidades = df["Comunidad Autónoma"].unique()

# Inicializar el geolocalizador
geolocator = Nominatim(user_agent="geoapi_ejemplo")
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1)

# Crear un diccionario con coordenadas
coordenadas = {}
for comunidad in comunidades:
    location = geocode(f"{comunidad}, España")
    if location:
        coordenadas[comunidad] = (location.latitude, location.longitude)
    else:
        coordenadas[comunidad] = (None, None)
        print(f"No se encontraron coordenadas para: {comunidad}")

# Añadir columnas de latitud y longitud al DataFrame
df["lat"] = df["Comunidad Autónoma"].map(lambda x: coordenadas[x][0])
df["lon"] = df["Comunidad Autónoma"].map(lambda x: coordenadas[x][1])

df.to_csv("emisiones_co2.csv", index=False)

### · Convertir fichero CSV a Json

In [None]:
df = pd.read_csv('emisiones_co2.csv')

df.to_json('emisiones_co2.json', orient='records', lines=True, force_ascii=False)