In [None]:
from pyspark.sql import SparkSession
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import time
import os

In [None]:
def init_spark_session(app_name='Web Scraping Correios'):
    """
    Inicializa e configura uma sessão Spark.
    """
    return (SparkSession.builder.appName(app_name)
            .config('spark.sql.repl.eagerEval.enabled', True)
            .config('spark.sql.repl.eagerEval.truncate', 100)
            .config('spark.sql.repl.eagerEval.maxNumRows', 200)
            .getOrCreate())

In [None]:
def init_webdriver():
    """
    Inicializa e configura o WebDriver do Chrome.
    """
    service = Service()
    options = webdriver.ChromeOptions()
    return webdriver.Chrome(service=service, options=options)

In [None]:
def get_select_options_by_value(driver, by_method, selector):
    """
    Obtém as opções de um elemento select na página.
    """
    select_element = driver.find_element(by_method, selector)
    return [option for option in select_element.find_elements(By.TAG_NAME, "option") if option.get_attribute('value')]

In [None]:
def process_state(driver, state, spark):
    """
    Processa um estado específico, coletando os dados de agências dos Correios.
    """
    print(f'Estado {state} iniciou o processamento.')
    time.sleep(2)

    data = []
    municipalities = get_select_options_by_value(driver, By.XPATH, '//*[@id="cmbMunicipio"]')

    for municipality in municipalities:
        time.sleep(1)
        municipality.click()
        municipality = municipality.get_attribute('value')
        time.sleep(2)

        districts = get_select_options_by_value(driver, By.XPATH, '//*[@id="cmbBairro"]')
        for district in districts:
            time.sleep(1)
            district.click()
            district = district.get_attribute('value')
            time.sleep(2)

            addresses = driver.find_elements(By.CLASS_NAME, 'odd')
            for address in addresses:
                data.append(extract_agency_data(address, state, municipality, district))

    # Converte os dados em DataFrame e salva em arquivo CSV
    save_data_to_csv(data, state, spark)
    print(f'Estado {state} finalizado!\n')

In [None]:
def extract_agency_data(address, state, municipality, district):
    """
    Extrai os dados de uma agência, incluindo nome, endereço, número, etc.
    """
    agency_name = get_text_or_default(address, 'nome-agencia')
    agency_address = get_text_or_default(address, 'endereco-agencia')
    number = extract_number_from_address(agency_address)
    google_maps_link, lat, long = get_google_maps_info(address)
    cep = get_cep_from_address(address)

    return {
        "estado": state,
        "municipio": municipality,
        "bairro": district,
        "endereco": agency_address,
        "numero": number,
        "cep": cep,
        "nome_agencia": agency_name,
        "latitude": lat,
        "longitude": long,
        "google_maps_link": google_maps_link
    }

In [None]:
def get_text_or_default(element, class_name, default_value='Vazio'):
    """
    Retorna o texto de um elemento ou um valor padrão caso o texto não exista.
    """
    try:
        return element.find_element(By.CLASS_NAME, class_name).text
    except:
        return default_value

In [None]:
def extract_number_from_address(address):
    """
    Separa o logradouro e o número do endereço.
    """
    try:
        address_split = address.split(', ')
        if len(address_split) == 2:
            return address_split[1]  # Retorna o número
        else:
            return ''  # Caso não haja número
    except:
        return ''

In [None]:
def get_google_maps_info(address):
    """
    Extrai as informações de latitude, longitude e link do Google Maps.
    """
    try:
        tag_a = address.find_elements(By.TAG_NAME, 'a')
        for a in tag_a:
            try:
                google_maps_link = a.get_attribute('href')
                lat_long = google_maps_link.split('daddr=')[1].split(',')
                lat = lat_long[0]
                long = lat_long[1]
                return google_maps_link, lat, long
            except:
                continue
        return 'vazio', 'vazio', 'vazio'
    except:
        return 'vazio', 'vazio', 'vazio'

In [None]:
def get_cep_from_address(address):
    """
    Extrai o CEP da agência, caso disponível.
    """
    try:
        info_button = address.find_element(By.XPATH, './/img[starts-with(@id, "info_")]')
        if 'abre_info' in info_button.get_attribute('src'):
            info_button.click()
        
        time.sleep(2)
        cep = driver.find_element(By.CLASS_NAME, 'mostra_detalhe')
        cep = cep.find_elements(By.CLASS_NAME, 'sombreado')[2].text
        return cep.split('CEP: ')[1]
    except:
        return 'Vazio'

In [None]:
def save_data_to_csv(data, state, spark):
    """
    Converte a lista de dados para DataFrame do Spark e salva em arquivo CSV.
    """
    df = spark.createDataFrame(data).select(
        "estado", "municipio", "bairro", "endereco", "numero", "cep", "nome_agencia", "latitude", "longitude", "google_maps_link"
    )
    
    full_output_dir = f'{output_dir}/{state}'
    
    if not os.path.exists(full_output_dir):
        os.makedirs(full_output_dir)

    df.coalesce(1).write.csv(os.path.join(full_output_dir), mode='overwrite', header=True, encoding='utf-8')
    print(f'DataFrame salvo com sucesso em {full_output_dir}\n')

In [None]:
def main():
    """
    Função principal que executa o scraping para todos os estados.
    """
    # Inicializa as sessões Spark e WebDriver
    spark = init_spark_session()
    driver = init_webdriver()

    # Acessa a URL dos Correios
    url = "https://mais.correios.com.br/app/index.php"
    driver.get(url)

    # Obtém os estados e inicia o processamento
    select_element = driver.find_element(By.XPATH, '//*[@id="cmbEstado"]')
    states = select_element.find_elements(By.TAG_NAME, "option")[1:None]
    
    for state in states:
        state_value = state.get_attribute('value')
        time.sleep(1)
        state.click()
        process_state(driver, state_value, spark)

    print('Processamento finalizado!')
    driver.quit()

## MAIN

In [None]:
# Define o diretório onde os arquivos serão salvos
output_dir = 'data_warehouse/enderecos_correios'

In [None]:
main()