In [86]:
import queue
import os
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from selenium.webdriver import ChromeOptions
from time import sleep
import pandas as pd
import requests
import random
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import matplotlib.pyplot as plt
from pymongo import MongoClient
from dotenv import load_dotenv
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut

pd.options.display.max_colwidth = None

In [77]:
def extract_listing_data(url):
    
    options = ChromeOptions()
    
    # Inicializa o WebDriver do Chrome
    driver = webdriver.Chrome(options=options)
    
    # Acessa a página da listagem de imóveis
    driver.get(url)
    sleep(random.randint(2,5))  # Aguarda o carregamento da página (ajuste conforme necessário)
    
    # Obtém o conteúdo da página
    page_source = driver.page_source
    
    # Fecha o WebDriver do Chrome
    driver.quit()

    # Analisa o conteúdo da página com BeautifulSoup
    soup = BeautifulSoup(page_source, 'html.parser')  
    
    # Extrai os dados da página
    try:
        title = soup.find('h1', class_='title__title js-title-view')\
            .get_text(strip=True)
    except AttributeError:
        title = None
    
    try:
        address = soup.find('p', class_='title__address js-address')\
            .get_text(strip=True)
    except AttributeError:
        address = None
    
    try:
        area = soup.find('li', class_='features__item features__item--area js-area')\
            .get_text(strip=True)\
            .replace('m²', '')
    except AttributeError:
        area = None
    
    try:
        bedrooms = soup.find('li', class_='features__item features__item--bedroom js-bedrooms')\
            .get_text(strip=True)\
            .strip()\
            .replace('quartos', '')\
            .replace('quarto', '')\
            .replace("Não informadoSolicitar", '')\
            .strip()
    except AttributeError:
        bedrooms = None
    
    try:
        bathrooms = soup.find('li', class_='features__item features__item--bathroom js-bathrooms')\
            .get_text(strip=True)
    except AttributeError:
        bathrooms = None
    
    try:
        parking = soup.find('li', class_='features__item features__item--parking js-parking')\
            .get_text(strip=True)\
            .strip()\
            .replace('vagas', '')\
            .replace('vaga', '')\
            .strip()
    except AttributeError:
        parking = None
    
    try:
        price = soup.find('h3', class_='price__price-info js-price-sale')\
            .get_text(strip=True)\
            .strip()\
            .replace('R$', '')\
            .replace('/Mês', '')\
            .replace('.', '')\
            .replace(',', '.')\
            .strip()
    except AttributeError:
        price = None
    
    try:
        condominium = soup.find('span', class_='price__list-value condominium js-condominium')\
            .get_text(strip=True)\
            .strip()\
            .replace('R$', '')\
            .replace('.', '')\
            .replace(',', '.')\
            .strip()
    except AttributeError:
        condominium = None
    
    try:
        total_rent = soup.find('span', class_='price__list-value rent-condominium js-total-rental-price')\
            .get_text(strip=True)\
            .strip()\
            .replace('R$', '')\
            .replace('.', '')\
            .replace(',', '.')\
            .strip()
    except AttributeError:
        total_rent = None
    
    try:
        iptu = soup.find('span', class_='price__list-value iptu js-iptu')\
            .get_text(strip=True)\
            .strip()\
            .replace('R$', '')\
            .replace('.', '')\
            .replace(',', '.')\
            .strip()
    except AttributeError:
        iptu = None

    def get_neighborhood(address: str):
        if(len(address.split(',')) > 2):
            return address.split('-')[1].split(',')[0].strip()
        else:
            return address.split(',')[0].strip()
    
    def get_city(address: str):
        return address.split(',')[-1].strip()
    
    def geocode_address(address):
        geolocator = Nominatim(user_agent="geoapiExercises")
        try:
            location = geolocator.geocode(address)
            if location:
                return location.latitude, location.longitude
            else:
                return None, None
        except GeocoderTimedOut:
            return geocode_address(address)
        
    (latitude, longitude) = geocode_address(address)
    
    return {
        "title": title,
        "address": address,
        "neighborhood": get_neighborhood(address),
        "city": get_city(address),
        "latitude": latitude,
        "longitude": longitude,
        "area": area,
        "bedrooms": bedrooms,
        "bathrooms": bathrooms,
        "parking": parking,
        "price/mouth": price,
        "condominium": condominium,
        "total_rent": total_rent,
        "iptu": iptu,
        "url": url,
    }

In [71]:
# Função para acessar a URL inicial e obter as listagens
def get_html(url):
    chrome_options = Options()
    driver = webdriver.Chrome(options=chrome_options)
    driver.get(url)
    sleep(3)
    html = driver.page_source
    driver.quit()
    soup = BeautifulSoup(html, 'html.parser')
    return soup


In [72]:
def extraction(urls, root_url: str):
    data = []
    with ThreadPoolExecutor(max_workers=6) as executor:
        future_to_url = {executor.submit(extract_listing_data, root_url + url): url for url in urls}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                rent_data = future.result()
                if rent_data:
                    data.append(rent_data)
            except Exception as e:
                print(f"Erro ao processar {url}: {e}")
        
    return data

In [78]:

root_url = "https://www.vivareal.com.br/"
url_text = '?pagina='
url_page = 1
first_page = True
next_page = True
all_data = []
q = queue.Queue()

while (next_page == True):
    if (first_page == True):
        url = f"{root_url}/aluguel/ceara/fortaleza/#onde=Brasil,Cear%C3%A1,Fortaleza,,,,,,BR%3ECeara%3ENULL%3EFortaleza,,,"
        first_page = False
    else:        
        url = f"https://www.vivareal.com.br/aluguel/ceara/fortaleza/?pagina={url_page}#onde=Brasil,Cear%C3%A1,Fortaleza,,,,,,BR%3ECeara%3ENULL%3EFortaleza,,,"
        print(f"{url}")

    # Obter a resposta da URL inicial
    html = get_html(url)
    cards = html.find('div', {'class': 'results-list js-results-list'})\
        .find_all('a', {'class':'property-card__content-link js-card-title'})
    
    urls = [card['href'] for card in cards]

    page_data = extraction(urls, root_url)
    all_data.extend(page_data)

    if url_page >= 20:
        next_page = False
        print("Última página alcançada.")
        break

    url_page = url_page + 1


# Criar DataFrame com os dados coletados
df = pd.DataFrame(all_data)
# print(df)


https://www.vivareal.com.br/aluguel/ceara/fortaleza/?pagina=2#onde=Brasil,Cear%C3%A1,Fortaleza,,,,,,BR%3ECeara%3ENULL%3EFortaleza,,,
Última página alcançada.


In [80]:
import re

def extract_bathrooms(text):
    # Procurar por padrões numéricos na string
    matches = re.findall(r'\d+', text)
    if matches:
        return int(matches[0])  # Retorna o primeiro número encontrado como inteiro
    else:
        return None  # Caso não encontre nenhum número, retorna None

# Aplicar a função para extrair o número de banheiros
df["bathrooms"] = df["bathrooms"].apply(extract_bathrooms).astype("Int64")


In [83]:
df["bedrooms"] = df["bedrooms"].replace("Não informadoSolicitar", np.nan)

In [94]:
# Função para geocodificar o endereço e obter as coordenadas
def geocode_address(address):
    geolocator = Nominatim(user_agent="geoapiExercises")
    try:
        location = geolocator.geocode(address)
        if location:
            return location.latitude, location.longitude
        else:
            return None, None
    except GeocoderTimedOut:
        return geocode_address(address)

In [90]:
if 'neighborhood' not in df.columns:
    df['neighborhood'] = df.apply(lambda row: row.address.split('-')[1].split(',')[0].strip(), axis=1)
if 'city' not in df.columns:
    df['city'] = df.apply(lambda row: row.address.split(',')[-1].strip(), axis=1)

# Converter para tipos numéricos
df["price/mouth"] = pd.to_numeric(df["price/mouth"], errors="coerce")
df["condominium"] = pd.to_numeric(df["condominium"], errors="coerce")
df["total_rent"] = pd.to_numeric(df["total_rent"], errors="coerce")

In [95]:

# Aplica a função na coluna 'address' e cria duas novas colunas 'latitude' e 'longitude'
df[['latitude', 'longitude']] = df['address'].apply(lambda x: pd.Series(geocode_address(x)))

# from geopy.geocoders import Nominatim

# # Criar um objeto geocoder do Nominatim
# geolocator = Nominatim(user_agent="my_app")

# # Exemplo de endereço em Fortaleza, Ceará
# address = "Rua do Anjo Branco, 1131, Fortaleza, Ceará"

# location = geolocator.geocode(address)

# print(location.latitude)
# print(location.longitude)

In [98]:
df.address

0                 Rua Tibúrcio Cavalcanti, 451 - Meireles, Fortaleza - CE
1              Rua Governador João Carlos, 458 - Serrinha, Fortaleza - CE
2                                   Rua E, 64 - São Bento, Fortaleza - CE
3                Rua Jaime Benévolo, 801 - José Bonifácio, Fortaleza - CE
4           Rua Coronel João de Oliveira, 355 - Messejana, Fortaleza - CE
                                     ...                                 
67                        Rua Frei Orlando, 302 - Montese, Fortaleza - CE
68                       Rua Princesa Isabel, 27 - Centro, Fortaleza - CE
69                                         Henrique Jorge, Fortaleza - CE
70    Avenida Desembargador Gonzaga, 532 - Parque Iracema, Fortaleza - CE
71                    Avenida dos Flamboyants, 450 - Cocó, Fortaleza - CE
Name: address, Length: 72, dtype: object

In [None]:
# Contar o número de listagens em cada bairro
neighborhood_counts = df['neighborhood'].value_counts()

# Plotar o gráfico de barras
plt.figure(figsize=(12, 8))
neighborhood_counts.plot(kind='bar')
plt.title('Número de lugares para alugar em cada bairro')
plt.xlabel('Bairro')
plt.ylabel('Número de listagens')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

In [99]:
# Carregar variáveis de ambiente do arquivo .env
load_dotenv(dotenv_path='CREDENTIALS.env')

# Obter credenciais do MongoDB das variáveis de ambiente
mongo_user = os.getenv("MONGO_USER")
mongo_pass = os.getenv("MONGO_PASS")
mongo_cluster = os.getenv("MONGO_CLUSTER")

try:
    # Conecte-se ao MongoDB
    mongo_uri = f"mongodb+srv://{mongo_user}:{mongo_pass}@{mongo_cluster}.0tyg61m.mongodb.net/?retryWrites=true&w=majority"
    print(f"{mongo_uri}\n")
    client = MongoClient(mongo_uri)

    # Selecionar o banco de dados e a coleção
    db = client.RealEstateData
    collection = db.rental
    # Converter o DataFrame para uma lista de dicionários
    data_dict = df.to_dict("records")

    try:
        # Inserir dados na coleção
        collection.insert_many(data_dict)
        print("Dados inseridos com sucesso!")
    except Exception as e:        
        print(f"Falha ao inserir dados no MongoDB: {e}")

except Exception as e:
    print(f"Falha ao conectar ao MongoDB: {e}")

mongodb+srv://robson_sampaio:Senha.123@penelope.0tyg61m.mongodb.net/?retryWrites=true&w=majority

Dados inseridos com sucesso!
