# Proyecto DATAMAD

## 1. Carga de datos

## 1.1 Carga de datos de Idealista

En primer lugar importamos las librerías:

Iniciamos el explorador, en este caso utilizamos Chrome

In [None]:
browser = uc.Chrome()

En esta sección extraemos de la página de idealista  los ids de las viviendas.

In [None]:
import asyncio
import aiohttp
import random
import time
import pandas as pd  # Para exportar los datos a CSV
from bs4 import BeautifulSoup as bs
from selenium.webdriver.common.by import By
import undetected_chromedriver as uc

# Configuración de Selenium con undetected_chromedriver
def get_driver():
    options = uc.ChromeOptions()
    options.add_argument('--headless')  # No mostrar la ventana del navegador
    options.add_argument('--disable-gpu')
    options.add_argument('--no-sandbox')
    # Crea el navegador sin ser detectado
    driver = uc.Chrome(options=options)
    return driver

# Función para extraer los IDs de una página de manera asíncrona
async def fetch_page(session, url):
    # Simulamos un navegador real con el User-Agent adecuado
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
    }

    async with session.get(url, headers=headers) as response:
        content = await response.text()
        return url, response.status, content

# Función para extraer los IDs de cada página HTML
def extract_ids_from_html(html_content):
    soup = bs(html_content, 'lxml')
    try:
        articles = soup.find('main', {'class': 'listing-items'}).find_all('article')
        ids = [article.get('data-element-id') for article in articles if article.get('data-element-id') is not None]
        return ids
    except AttributeError:
        return []

# Función para procesar múltiples URLs en paralelo
async def fetch_urls_in_parallel(url_list):
    ids = []
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_page(session, url) for url in url_list]
        for task in asyncio.as_completed(tasks):
            url, status_code, content = await task
            if status_code == 200:
                page_ids = extract_ids_from_html(content)
                ids.extend(page_ids)
                print(f"Extracted {len(page_ids)} IDs from {url}")
            else:
                print(f"Failed to fetch {url} with status code {status_code}")
    return ids

# Función para guardar los resultados en un archivo CSV
def save_to_csv(ids, filename="ids.csv"):
    # Guardamos los datos en un DataFrame y exportamos
    df = pd.DataFrame(ids, columns=["ID"])
    df.to_csv(filename, mode='a', header=False, index=False)  # Usamos 'a' para agregar al archivo
    print(f"Exported {len(ids)} IDs to {filename}")

# Función para manejar el scraping completo
async def main_scraper():
    # Empezamos con la creación del navegador (Selenium con undetected_chromedriver)
    browser = get_driver()

    # Lista de URLs a scrapear
    x = 1
    url_list = []
    max_pages = 100  # Limitar a 100 páginas (ajustable)

    # Crear las URLs para las páginas
    while True:
        url = f'https://www.idealista.com/venta-viviendas/madrid-madrid/pagina-{x}.htm'
        url_list.append(url)
        x += 1
        if x > max_pages:  # Ajusta el número de páginas a scrapear
            break

    # Usamos Selenium para aceptar cookies si es necesario
    for url in url_list:
        browser.get(url)
        time.sleep(random.randint(10, 12))  # Retardo para parecer humano
        try:
            browser.find_element(By.XPATH, '//*[@id="didomi-notice-agree-button"]').click()
        except:
            pass

    # Ahora, obtenemos las páginas de manera asíncrona
    all_ids = []  # Lista para almacenar todos los IDs
    for i, url in enumerate(url_list):
        ids = await fetch_urls_in_parallel([url])
        if ids:
            all_ids.extend(ids)
            save_to_csv(ids)  # Guardar los IDs de esta página en el CSV
        else:
            print("Ya se han recopilado todos los ids")
            break  # Si no hay IDs en la página, terminamos

    # Cerrar el navegador al final
    browser.quit()

    # Convertir todos los IDs a un DataFrame final y guardar en CSV
    if all_ids:
        final_df = pd.DataFrame(all_ids, columns=["ID"])
        save_to_csv(all_ids)  # Guardar todos los resultados en el CSV

    print(f"Total IDs collected: {len(all_ids)}")
    return final_df

# Ejecutar el scraper en el bucle de eventos actual
if __name__ == "__main__":
    # Para Jupyter o IPython, no usamos asyncio.run(), simplemente usamos await
    df_ids = await main_scraper()

    # Ahora puedes usar df_ids en otras partes del código
    print("DataFrame with extracted IDs:")
    print(df_ids)



En la siguiente sección de código utilizamos los Ids recopilados para extraer el resto de datos de la vivienda

In [1]:
import asyncio
import aiohttp
import pandas as pd
from bs4 import BeautifulSoup as bs
import random
import time

# Configuración de Selenium con undetected_chromedriver
def get_driver():
    from selenium.webdriver.common.by import By
    import undetected_chromedriver as uc
    
    options = uc.ChromeOptions()
    options.add_argument('--headless')  # No mostrar la ventana del navegador
    options.add_argument('--disable-gpu')
    options.add_argument('--no-sandbox')
    options.add_argument("--start-maximized")  # Maximizar la ventana del navegador

    # Usar un User-Agent real para simular que es un navegador real
    options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')

    # Otras configuraciones para mejorar la simulación de comportamiento humano
    driver = uc.Chrome(options=options)
    return driver

# Función para extraer los datos de un inmueble utilizando BeautifulSoup
def parsear_inmueble(id_inmueble, html_content):
    print(f"\nProcesando inmueble con ID: {id_inmueble}")
    soup = bs(html_content, 'html.parser')
    
    # Extraer el título del inmueble
    titulo = soup.find('span', {'class': 'main-info__title-main'})
    titulo = titulo.text.strip() if titulo else "N/A"
    
    # Extraer la localización del inmueble
    localizacion = soup.find('span', {'class': 'main-info__title-minor'})
    localizacion = localizacion.text.strip().split(',')[0] if localizacion else "N/A"
    
    # Extraer el precio del inmueble
    precio = soup.find('span', {'class': 'txt-bold'})
    precio = int(precio.text.replace('.', '')) if precio else "N/A"
    
    # Extraer características básicas
    caract_basicas = [li.text.strip() for li in soup.select('div.details-property-feature-one li')]
    
    # Extraer características extras
    caract_extra = [li.text.strip() for li in soup.select('div.details-property-feature-two li')]
    
    # Verificar si los datos clave están vacíos
    if titulo == "N/A" or localizacion == "N/A" or precio == "N/A" or not caract_basicas or not caract_extra:
        print(f"¡Advertencia! El inmueble {id_inmueble} no tiene algunos datos clave.")
        print(f"Título: {titulo}, Localización: {localizacion}, Precio: {precio}")
    
    # Mostrar datos extraídos para cada inmueble
    print(f"Título: {titulo}")
    print(f"Localización: {localizacion}")
    print(f"Precio: {precio}")
    print(f"Características Básicas: {caract_basicas}")
    print(f"Características Extras: {caract_extra}")
    
    return {
        'ID': id_inmueble,
        'Título': titulo,
        'Localización': localizacion,
        'Precio': precio,
        'Características Básicas': caract_basicas,
        'Características Extras': caract_extra
    }

# Función para extraer los datos del inmueble desde su página
async def fetch_page(session, url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
    }
    

    async with session.get(url, headers=headers) as response:
        content = await response.text()
        return url, response.status, content

# Función para manejar el scraping completo de inmuebles usando los IDs desde el CSV
async def main_scraper():
    # Leer los IDs de los inmuebles desde el archivo CSV
    ids_df = pd.read_csv('ids.csv')
    # Suponemos que el archivo CSV tiene una columna llamada 'ID' con los IDs de los inmuebles
    ids_inmuebles = ids_df['ID'].tolist()

    # Usamos aiohttp para obtener las páginas de forma asíncrona
    async with aiohttp.ClientSession() as session:
        results = []
        for id_inmueble in ids_inmuebles:
            inmueble_url = f"https://www.idealista.com/inmueble/{id_inmueble}/"
            page_content = await fetch_page(session, inmueble_url)
            _, _, html_content = page_content
            inmueble_data = parsear_inmueble(id_inmueble, html_content)
            results.append(inmueble_data)
    
    # Guardar los resultados en un CSV
    guardar_en_csv(results)

    # Devolver los resultados para usarlos en otras partes del código
    return results

# Función para guardar los resultados en un CSV
def guardar_en_csv(resultados, filename="inmuebles.csv"):
    df_resultados = pd.DataFrame(resultados)
    df_resultados.to_csv(filename, index=False)
    print(f"Datos exportados a {filename}")

# Ejecutar la función principal en un entorno interactivo sin asyncio.run()
if __name__ == "__main__":
    results = await main_scraper()
    print("Datos recopilados:")
    print(results)



Procesando inmueble con ID: 102043012
Título: Piso en venta en Nueva España
Localización: Chamartín
Precio: 1650000
Características Básicas: ['181 m² construidos', '2 habitaciones', '2 baños', 'Plaza de garaje incluida en el precio', 'Segunda mano/buen estado', 'Trastero', 'Construido en 1964', 'Bajo exterior', 'Sin ascensor']
Características Extras: ['Aire acondicionado', 'Consumo:', 'Emisiones:']

Procesando inmueble con ID: 106330563
Título: Dúplex en venta en calle de Montalbán, 11
Localización: Jerónimos
Precio: 2700000
Características Básicas: ['220 m² construidos', '2 habitaciones', '3 baños', 'Plaza de garaje incluida en el precio', 'Segunda mano/buen estado', 'Trastero', 'Orientación sur', 'Construido en 2020', 'Calefacción individual', 'Bajo exterior', 'Con ascensor']
Características Extras: ['Aire acondicionado', 'Piscina', 'En trámite']

Procesando inmueble con ID: 97875752
Título: Chalet adosado en venta en paseo de La Habana
Localización: Nueva España
Precio: 3110000
Car

## 1.2 Carga de datos DATAMAD

Datos de centros sanitarios

In [None]:
# Importamos las librerías necesarias
import csv
import requests
from pyproj import Proj, transform

# URL del archivo CSV
base_url = "https://datos.comunidad.madrid/catalogo/dataset/d8a0a444-adf5-4c04-8999-0eac3de52cb7/resource/2948b4da-8b39-42b7-b667-779a5284f39d/download/centros_servicios_establecimientos_sanitarios.csv"

# Realizamos la petición
result = requests.get(base_url)

# Si la conexión es exitosa, procesamos el CSV
if result.status_code == 200:
    # Convertimos el texto del CSV en una lista de diccionarios, con ';' como delimitador
    csv_data = result.text.splitlines()
    reader = csv.DictReader(csv_data, delimiter=';')

    # Filtramos solo los datos del municipio de Madrid y eliminamos duplicados por 'centro_nro_registro'
    seen_centers = set()
    filtered_data = []
    for row in reader:
        centro_nro_registro = row.get("centro_nro_registro")
        if row.get("municipio_nombre") and row["municipio_nombre"].lower() == "madrid" and centro_nro_registro not in seen_centers:
            seen_centers.add(centro_nro_registro)  # Agregamos el centro para evitar duplicados
            filtered_data.append(row)

    # Configuramos el sistema de coordenadas UTM y WGS84 (latitud/longitud)
    utm_proj = Proj(proj='utm', zone=30, ellps='WGS84')  # UTM Zone 30T (España)
    wgs84_proj = Proj(proj='latlong', datum='WGS84')

    # Convertimos las coordenadas UTM a latitud y longitud
    for row in filtered_data:
        try:
            # Verificamos si las coordenadas UTM están presentes y no están vacías
            if row["localizacion_coordenada_x"] and row["localizacion_coordenada_y"]:
                # Convertimos las coordenadas UTM a float
                utm_x = float(row["localizacion_coordenada_x"])
                utm_y = float(row["localizacion_coordenada_y"])
                
                # Convertimos a latitud y longitud
                lon, lat = transform(utm_proj, wgs84_proj, utm_x, utm_y)
                
                # Añadimos las nuevas coordenadas al diccionario
                row["latitud"] = lat
                row["longitud"] = lon
            else:
                # Si las coordenadas están vacías, asignamos None o dejamos en blanco
                row["latitud"] = None
                row["longitud"] = None

            # Eliminamos la columna 'oferta_asistencial' si existe
            row.pop("oferta_asistencial", None)

        except Exception as e:
            print(f"Error al convertir coordenadas para la fila: {row['centro_nro_registro']}", e)

    # Guardamos los datos filtrados y transformados en un nuevo archivo CSV
    with open("datos_madrid_latlon.csv", mode="w", newline="", encoding="utf-8") as file:
        # Revisamos si 'latitud' y 'longitud' ya están en las claves y los agregamos solo si no están
        fieldnames = list(filtered_data[0].keys())
        if "latitud" not in fieldnames:
            fieldnames.append("latitud")
        if "longitud" not in fieldnames:
            fieldnames.append("longitud")

        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(filtered_data)
    
    print("Archivo 'CentrosCoordenadas.csv' generado exitosamente.")
else:
    print("Error en la solicitud:", result.status_code)


# 2. Limpieza de datos y coordenadas

Ahora procedemos al tratamiento de los datos. 

In [None]:
import pandas as pd
import re
from fuzzywuzzy import fuzz, process

# Cargar el archivo CSV de inmuebles
df = pd.read_csv("inmuebles.csv", sep=",")

# Convertir 'caracteristicas_basicas' en una lista de Python, si no está ya en ese formato
df['caracteristicas_basicas'] = df['Características Básicas'].apply(eval)

# Función para extraer el número de calle
def extraer_numero_calle(titulo):
    if isinstance(titulo, str):  # Verificamos que el valor sea una cadena
        # Busca un número en la dirección que esté precedido por una coma y un espacio
        match = re.search(r',\s?(\d+)', titulo)
        return int(match.group(1)) if match else None
    return None  # Si no es una cadena, devolver None

# Aplicar la función para crear la columna 'numero_calle'
df['numero_calle'] = df['Título'].apply(extraer_numero_calle)

# Función para extraer los datos de 'caracteristicas_basicas'
def extraer_datos(lista):
    datos = {
        'metros_cuadrados': None,
        'habitaciones': None,
        'baños': None,
        'parcela_m2': None,
        'plaza_garaje': None,
        'estado': None,
        'orientacion': None,
        'año_construccion': None,
        'calefaccion': None,
        'planta': None,
        'ascensor': None,
    }

    for item in lista:
        # Solo procesa elementos que contengan características específicas de la vivienda
        if re.search(r'\bm²\b|\bhabitaciones\b|\bbaños\b|garaje|estado|Orientación|Construido en|Calefacción|Planta|ascensor|Parcela', item):
            if re.search(r'\d+\s?m² construidos', item):
                datos['metros_cuadrados'] = int(re.search(r'\d+', item).group())
            elif 'habitaciones' in item:
                datos['habitaciones'] = int(re.search(r'\d+', item).group())
            elif 'baños' in item:
                datos['baños'] = int(re.search(r'\d+', item).group())
            elif 'Parcela de' in item:
                datos['parcela_m2'] = int(re.search(r'\d+', item.replace('.', '')).group())
            elif 'garaje' in item:
                datos['plaza_garaje'] = True
            elif 'Segunda mano' in item or 'para reformar' in item:
                datos['estado'] = item
            elif 'Orientación' in item:
                datos['orientacion'] = item.split(' ')[-1]
            elif 'Construido en' in item:
                datos['año_construccion'] = int(re.search(r'\d+', item).group())
            elif 'Calefacción' in item:
                datos['calefaccion'] = item.split(': ')[-1]
            elif 'Planta' in item:
                datos['planta'] = item
            elif 'ascensor' in item:
                datos['ascensor'] = True

    return datos

# Aplicar la función y expandir los resultados en el DataFrame
datos_df = df['caracteristicas_basicas'].apply(extraer_datos).apply(pd.Series)

# Combinar los datos originales con las nuevas columnas
df = pd.concat([df, datos_df], axis=1)

# Eliminar columnas innecesarias
df = df.drop(columns=['Características Básicas', 'caracteristicas_basicas', 'Características Extras'])

# Cargar los datos de direcciones vigentes
direcciones_df = pd.read_csv("DireccionesVigentes.csv", encoding="latin1", delimiter=";")

# Limpiar y normalizar nombres de calles en DireccionesVigentes
direcciones_df["VIA_NOMBRE"] = direcciones_df["VIA_NOMBRE"].str.upper().str.strip()

# Eliminar duplicados en direcciones_df para que solo haya un valor de coordenadas por calle
direcciones_df = direcciones_df.drop_duplicates(subset=["VIA_NOMBRE"], keep="first")

# Función mejorada para extraer y normalizar el nombre de la calle del título
def extraer_nombre_calle(titulo):
    match = re.search(r'\b(CALLE|AVENIDA|PASEO|PLAZA|RONDA|CAMINO|CARRER|RUA|CARRERA)\s+(?:DE(L|LA)?\s+)?([\w\s]+)', titulo, re.IGNORECASE)
    if match:
        return match.group(3).upper().strip()  # Captura el nombre limpio de la calle
    return None

# Asegurarse de que todos los valores de 'Título' sean cadenas y manejar valores nulos
df["Título"] = df["Título"].fillna("").astype(str)

# Aplicar la función de extracción
df["CALLE_EXTRAIDA"] = df["Título"].apply(extraer_nombre_calle)

# Usar fuzzy matching para encontrar la mejor coincidencia entre las calles extraídas y las del DataFrame de direcciones
def obtener_mejor_coincidencia(calle_extraida):
    if calle_extraida:
        coincidencia = process.extractOne(calle_extraida, direcciones_df["VIA_NOMBRE"], scorer=fuzz.token_sort_ratio)
        return coincidencia[0] if coincidencia and coincidencia[1] > 80 else None  # Umbral de coincidencia

df['CALLE_COINCIDENTE'] = df['CALLE_EXTRAIDA'].apply(obtener_mejor_coincidencia)

# Hacer el merge de ambas bases de datos en función del nombre de la calle
df_completo = df.merge(direcciones_df[['VIA_NOMBRE', 'LATITUD', 'LONGITUD']],
                       left_on="CALLE_COINCIDENTE", right_on="VIA_NOMBRE", how="left")

# Eliminar la columna temporal y renombrar columnas
df_completo.drop(columns=["CALLE_EXTRAIDA", "CALLE_COINCIDENTE", "VIA_NOMBRE"], inplace=True)

# Filtrar las casas con coordenadas
df_completo = df_completo.dropna(subset=['LATITUD', 'LONGITUD'])

# Guardar el archivo final con todas las características y coordenadas
df_completo.to_csv('inmuebles_con_coordenadas.csv', index=False, sep=';')

print("El archivo final se ha guardado correctamente como 'inmuebles_con_coordenadas.csv'")

# Mostrar el DataFrame final
print(df_completo.head())


Añadimos las coordenadas a las direcciones utilizando DireccionesVigentes.csv

Aquí eliminamos todos aquellos pisos de los cuales no ha sido posible obtener las coordenadas.

In [None]:
import pandas as pd
import re

# Definir una función para convertir coordenadas de DMS a decimal
def dms_a_decimal(dms):
    # Separar grados, minutos, segundos y dirección
    degrees, minutes, seconds, direction = re.split("[°'\" ]+", dms.strip())
    decimal = float(degrees) + float(minutes) / 60 + float(seconds) / 3600
    if direction in ['S', 'W']:
        decimal = -decimal
    return decimal

# Cargar los datos desde un CSV
casas_df = pd.read_csv("casas_con_coordenadas.csv", encoding="utf-8", delimiter=",")

# Imprimir los nombres de las columnas para depuración
print("Columnas disponibles en el DataFrame:", casas_df.columns.tolist())

# Convertir las coordenadas de LATITUD y LONGITUD a decimal
# Asegúrate de que los nombres coincidan exactamente con los nombres de columna
if 'LATITUD' in casas_df.columns and 'LONGITUD' in casas_df.columns:
    casas_df['LATITUD'] = casas_df['LATITUD'].apply(dms_a_decimal)
    casas_df['LONGITUD'] = casas_df['LONGITUD'].apply(dms_a_decimal)
else:
    print("No se encontraron las columnas 'LATITUD' y/o 'LONGITUD' en el DataFrame.")

# Guardar el nuevo archivo CSV con las coordenadas transformadas
casas_df.to_csv("casas_con_coordenadas_transformadas.csv", index=False, sep=';')
print("Archivo 'casas_con_coordenadas_transformadas.csv' creado con éxito.")



# 3. Aplicacion