# Importanto as libs

In [81]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import re
from bs4 import BeautifulSoup
import pandas as pd
from anticaptchaofficial.recaptchav2proxyless import *
from chave import google_api, search_id
import datetime
import requests
import logging
import cloudscraper
logging.basicConfig(
    level=logging.DEBUG,  # Nível de log DEBUG para capturar tudo
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("debug_log.txt"),  # Grava logs em um arquivo
        logging.StreamHandler()  # Exibe logs no console
    ]
)
logger = logging.getLogger(__name__)
import os

# Coleta o horário de inicio da aplicação

In [82]:
start_time = time.time()

# Abrir, ler e transformar informações necessárias para buscar as URLs de acordo com arquivo do cliente

In [83]:
#Ler a lista de sites e urls para a busca no google
municipios = pd.read_excel('[LEVANTAMENTO MUNICÍPIOS].xlsx')
municipio_nomes_uf = municipios.set_index('Nome Municipio')['Sigla UF'].to_dict()

###Testes com grupos menores de dados

# base_teste_municipios = list(municipio_nomes_uf.items())[0:4]
# display(base_teste_municipios)
# display(municipio_nomes_uf)

# Coleta de todas as URLs das secretarias de educação a partir da lista do cliente

In [84]:
#Sites indesejados
def is_not_desired_site(url):
    undesired_domains = [
        "facebook.com", "twitter.com", "instagram.com","glassdoor.com.br","linkedin.com", "tiktok.com", "snapchat.com",
        "econodata.com.br","econodata.com.br","serasaexperian.com.br","diariomunicipal.com.br","wikipedia.org",
        "diariomunicipio.com.br", "validator.w3.org","addtoany.com","whatsapp.com","pege.com.br","sites.google.com"
        ]
    return not any(domain in url for domain in undesired_domains)

#Relacionados a educação
def is_related_secretary_edu(url):
    education_keywords = ["gov.br"]
    return any(keyword in url for keyword in education_keywords)



In [None]:
def buscar_urls_google(query, api_key, search_id):
    url = "https://www.googleapis.com/customsearch/v1"
    params = {
        "q": query,
        "key": api_key,
        "cx": search_id,
        "num": 5,
        "lr": "lang_pt",  # Filtra por páginas em português
        "cr": "countryBR"  # Filtra por páginas do Brasil
    }
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        results = response.json()
        logger.debug(f"Resposta completa da API para a query '{query}': {results}")  # Log da resposta completa
        if "items" not in results:
            logger.warning(f"Nenhum resultado encontrado para a query: {query}")
            return []
        return [item["link"] for item in results.get("items", [])]
    except requests.exceptions.RequestException as e:
        logger.error(f"Erro ao buscar URLs no Google: {e}")
        return []
    
sites_secretarias = {}
for municipio_nome, uf in municipio_nomes_uf.items():
    query = f"secretaria de educação de {municipio_nome} - {uf}"
    urls = buscar_urls_google(query, google_api, search_id)
    
    if urls:
        for url in urls:
            if is_not_desired_site(url) and is_related_secretary_edu(url):
                sites_secretarias[municipio_nome] = url
                logger.info(f"URL válida encontrada para {municipio_nome} - {uf}: {url}")
                break
        else:
            logger.warning(f"Nenhum link válido encontrado para {municipio_nome} - {uf}")
    else:
        logger.warning(f"Nenhum resultado encontrado no Google para {municipio_nome} - {uf}")

# Log para verificação das URLs
log_file_dir = "./"
log_file_name = "log_urls_"
log_timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
log_file_ext = ".txt"
log_file_path = log_file_dir + log_file_name + log_timestamp + log_file_ext

# Abre o arquivo de log no modo de escrita
with open(log_file_path, "w") as log_file:
    log_file.write("Log de Verificação e Registro das URLs\n")
    log_file.write("*" * 5 + "=" * 90 + "*" * 5 + "\n")

    for municipio_nome, site_url in sites_secretarias.items():
        log_entry = f"{municipio_nome}-{uf}: {site_url}\n"
        log_file.write(log_entry)

logger.info(f"Log gravado em {log_file_path}")



# Acessa, busca, coleta e trata os dados das URLs

In [None]:
###Global
search_tags = [
    "p","br","tr","td","strong", "span", "h1", "h2", "h3", "h4", "h5", "h6", "div", "a", "li", "header", "footer","body",
    "tbody","em"
    ]

contact_patterns = [
    "organizac", "secretaria", "informacoes", "contact", "atendimento","recado", "deixe seu recado"
    "administrativo", "contato", "entidade", "conosco", "respostas","institucional"
    ]


def main():
    headers = {
        'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                       '(KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
    }
    contact_keywords = [
        "secretaria", "educacao", "sme", "seduc", "semec", "see", "mec", "rede", "escola", "municipal", "estadual", "smec",
        "cme", "telefone", "resposta", "telefones", "telefone(s)", "semed", "sedu", "seceduc", "educasec", "secretario", 
        "sed", "seceduc", "smed", "comec", "contato", "faleconosco", "respostas", "prefeitura", "administracao", "celular", 
        "cel", "mobile", "phone", "educa", "educ", "fone", "Fone", "semeg", "seme", "Telefone", "cacs", "fale", "conosco",
        "atendimento", "E-mail da unidade", "conselho", "SUPERINTENDÊNCIA", "superintendencia", "pop", "tel", "gmail", 
        "hotmail", "yahoo", "bol", "Fale Conosco", "fundeb", "fale conosco", "fax", "gabinete", "gab", "mc", "mc3", 
        "outlook", "secedu", "seceducacao", "mail", "comunicacao", "respostasgabinetesme","adm"
    ]

    contacts = gather_contacts_from_sites(sites_secretarias, headers, contact_keywords)
    
    data = []
    for municipio, site in sites_secretarias.items():
        contact_info = contacts.get(municipio, {})
        email_str = ', '.join(contact_info.get('emails', []))
        phone_str = ', '.join(contact_info.get('phones', []))
        data.append([municipio, site, email_str or "E-mail não encontrado - Verifique manualmente",
                     phone_str or "Telefone não encontrado - Verifique manualmente"])

    df = pd.DataFrame(data, columns=["Município", "Site da Secretaria", "E-mails de Contato", "Telefone de Contato"])

    needs_retry = df[(df["E-mails de Contato"].str.contains("E-mail não encontrado")) | 
                     (df["Telefone de Contato"].str.contains("Telefone não encontrado"))]

    processed_urls = set()  # Inicializa o conjunto fora do loop

    for index, row in needs_retry.iterrows():
        url = row["Site da Secretaria"]
        if url in processed_urls:
            continue  # Evita reprocessamento

        processed_urls.add(url)  # Adiciona a URL ao conjunto de URLs processadas

        content = load_page_content(url, headers)
        if content:
            soup = BeautifulSoup(content, 'html.parser')
            contact_page_urls = find_contact_pages(soup, url, contact_patterns)

            for contact_page_url in contact_page_urls:
                if contact_page_url in processed_urls or contact_page_url == url:
                    continue  # Evita reprocessamento

                processed_urls.add(contact_page_url)
                print(f"Processing URL: {contact_page_url}")
                
                page_content = load_page_content(contact_page_url, headers)
                if page_content:
                    soup = BeautifulSoup(page_content, 'html.parser')
                    new_emails = get_emails_from_page(soup, contact_keywords)
                    new_phones = get_phones_from_page(soup, contact_keywords)
                    
                    if "E-mail não encontrado" in row["E-mails de Contato"]:
                        df.at[index, 'E-mails de Contato'] = ', '.join(new_emails) or "E-mail não encontrado - Verifique manualmente"
                    if "Telefone não encontrado" in row["Telefone de Contato"]:
                        df.at[index, 'Telefone de Contato'] = ', '.join(new_phones) or "Telefone não encontrado - Verifique manualmente"
    
    create_excel(df)

def find_contact_pages(soup, url, contact_patterns):
    found_links = set()
    ignored_strings = [ 
        ###Com as palavras chaves necessarias para o montante do projeto,
        ###os sites abaixo oneram muito tempo para achar um contato e a taxa de retorno é baixissíma, 
        ###por isso removo-os
        "educacao.sp.gov.br",
        "educacao.ba.gov.br",
        "educacao.mg.gov.br"
    ]
    
    for tag in soup.find_all('a', href=True):
        href = tag['href'].strip()
        if href.lower().startswith("javascript:") or href.startswith("#"):
            continue
        #Processa o link completo
        full_url = urllib.parse.urljoin(url, href)
        #Verificar se o link contém qualquer uma das strings ignoradas
        if any(ignored_string in full_url for ignored_string in ignored_strings):
            continue
        if is_not_desired_site(full_url) and is_related_secretary_edu(full_url):
            href_lower = href.lower()
            if any(keyword in href_lower for keyword in contact_patterns):
                found_links.add(full_url)
    
    print("Found links:", found_links)
    return list(found_links)

def create_excel(df):
    excel_filename = "Contatos_secretarias_de_educacao_"
    excel_timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    df.to_excel(excel_filename + excel_timestamp + ".xlsx", index=False)
    print(f"Arquivo salvo como: {excel_filename + excel_timestamp + '.xlsx'}")

###Para deixar o site "limpo" para a coleta de dados
def is_cloudflare_protected(response_text):
    return "Checking your browser" in response_text or "Cloudflare" in response_text

def bypass_cloudflare(driver, url):
    driver.get(url)
    WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
    return driver.page_source

def is_security_warning_page(driver):
    try:
        WebDriverWait(driver, 2).until(
            EC.presence_of_element_located((By.ID, "proceed-link"))
        )
        return True
    except Exception:
        return False

def disable_undesired_blocks(driver):
    try:
        WebDriverWait(driver, 5).until(EC.element_to_be_clickable((By.ID, "proceed-link"))).click()
    except Exception as e:
        print("Restrição não encontrada ou outro erro ocorreu:", e)

def get_page_content(url):
    driver = None
    try:
        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument("--disable-plugins-discovery")
        chrome_options.add_argument("--disable-extensions")
        chrome_options.add_argument("--disable-popup-blocking")
        chrome_options.add_argument("--disable-notifications")
        driver = webdriver.Chrome(options=chrome_options)

        driver.get(url)

        if is_security_warning_page(driver):
            disable_undesired_blocks(driver)

        content_after_blocks_removed = driver.page_source

        if not content_after_blocks_removed:
            print(f"Não foi possível obter o conteúdo da página {url} após desabilitar blocos.")
            return None
        
        if is_cloudflare_protected(content_after_blocks_removed):
            print("Página protegida por Cloudflare detectada. Usando Selenium para bypass.")
            return bypass_cloudflare(driver, url)        

        return content_after_blocks_removed
    finally:
        if driver:
            driver.quit()

def wait_for_page_load_with_cloudflare(url, headers, max_attempts=10, sleep_interval=3):
    scraper = cloudscraper.create_scraper()
    attempts = 0

    while attempts < max_attempts:
        try:
            response = scraper.get(url, headers=headers, timeout=30)
            response.raise_for_status()

            if is_cloudflare_protected(response.text):
                logger.info("Proteção Cloudflare detectada. Tentando pela Selenium.")
                driver = webdriver.Chrome()
                driver.get(url)

                disable_undesired_blocks(driver)

                WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
                response_text = driver.page_source
                driver.quit()

                return response_text

            logger.info(f"Página carregada com sucesso na tentativa nº {attempts + 1}: {url}")
            return response.text

        except requests.exceptions.HTTPError as e:
            logger.error(f"Erro HTTP ao acessar {url}: {e}")
        except requests.exceptions.ConnectionError as e:
            logger.error(f"Erro de conexão ao acessar {url}: {e}")
        except requests.exceptions.Timeout as e:
            logger.error(f"Timeout ao acessar {url}: {e}")
        except requests.exceptions.RequestException as e:
            logger.error(f"Erro geral na requisição ao acessar {url}: {e}")

        attempts += 1
        logger.info(f"Tentativa nº {attempts}: Página não carregou. Aguardando...")
        time.sleep(sleep_interval)

    logger.error(f"Página não carregou após {max_attempts} tentativas. URL: {url}")
    return None

def load_page_content(url, headers, max_attempts=10, sleep_interval=3):
    attempts = 0

    #Verifica primeiramente se a página é protegida ou dinamicamente carregada
    try:
        initial_response = requests.get(url, headers=headers, timeout=10)
        if is_cloudflare_protected(initial_response.text) or is_dynamically_loaded(initial_response.text):
            logger.info("Página protegida ou carregada dinamicamente. Tentando usar Selenium.")
            return fetch_with_selenium(url)
    except requests.exceptions.RequestException as e:
        logger.error(f"Erro na verificação inicial da página {url}: {e}")

    #Se não for protegida ou dinamicamente carregada, usa cloudscraper    
    scraper = cloudscraper.create_scraper()

    while attempts < max_attempts:
        try:
            response = scraper.get(url, headers=headers, timeout=30)
            response.raise_for_status()

            logger.info(f"Página carregada com sucesso na tentativa nº {attempts + 1}: {url}")
            return response.text

        except requests.exceptions.RequestException as e:
            logger.error(f"Erro ao acessar {url}: {e}")

        attempts += 1
        logger.info(f"Tentativa nº {attempts}: Página não carregou. Aguardando...")
        time.sleep(sleep_interval)

    logger.error(f"Página não carregou após {max_attempts} tentativas. URL: {url}")
    return None

def fetch_with_selenium(url):
    driver = webdriver.Chrome()
    try:
        driver.get(url)
        disable_undesired_blocks(driver)
        WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        return driver.page_source
    finally:
        driver.quit()

def is_dynamically_loaded(response_text):
    #Verifica se a página contém muitos scripts, sugerindo carregamento dinâmico
    script_count = response_text.count('<script')
    #Procura por padrões comuns de carregamento dinâmico
    dyn_keywords = [
        'ajax', 'setTimeout(', 'setInterval(', 'XMLHttpRequest',
        'fetch(', 'Vue', 'React', 'Angular', 'app-root', 'ng-app', 'ng-controller'
    ]
    #Conta ocorrências de palavras-chave relacionadas a JS dinâmico
    keyword_presence = any(keyword in response_text for keyword in dyn_keywords)
    #Determina que a página é dinamicamente carregada se houver muitos scripts ou palavras-chave indicando isso
    return script_count > 10 or keyword_presence

###Angariando os contatos
def gather_contacts_from_sites(sites_secretarias, headers, contact_keywords):
    contacts = {}

    for municipio, url in sites_secretarias.items():
        if url:
            try:
                page_content = wait_for_page_load_with_cloudflare(url, headers)
                found_emails, found_phones = set(), set()

                if page_content:
                    soup = BeautifulSoup(page_content, 'html.parser')
                    found_emails = get_emails_from_page(soup, contact_keywords)
                    found_phones = get_phones_from_page(soup, contact_keywords)

                contacts[municipio] = {'emails': list(found_emails), 'phones': list(found_phones)}
            except requests.exceptions.RequestException as e:
                print(f"Erro de rede em {url}: {e}")
                contacts[municipio] = {'emails': [], 'phones': []}
            except Exception as e:
                print(f"Erro ao processar {url}: {str(e)}")
                contacts[municipio] = {'emails': [], 'phones': []}
        else:
            contacts[municipio] = {'emails': [], 'phones': []}
    return contacts

###Para coletar os emails
def get_emails_from_page(soup, contact_keywords):
    emails = set()
    email_pattern = r"[\w.-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,4}"

    for tag in search_tags:
        elements = soup.find_all(tag)
        for element in elements:
            element_text = element.get_text().lower()
            if any(keyword in element_text for keyword in contact_keywords):
                possible_emails = re.findall(email_pattern, element_text)
                for email in possible_emails:
                    sanitized_email = sanitize_email(email, contact_keywords)
                    emails.update(sanitized_email)
                if len(emails) >= len(contact_keywords):
                    break
        if len(emails) >= len(contact_keywords):
            break

    if len(emails) < len(contact_keywords):
        possible_emails = re.findall(email_pattern, soup.get_text().lower())
        for email in possible_emails:
            sanitized_email = sanitize_email(email, contact_keywords)
            emails.update(sanitized_email)

    mailto_links = soup.select('a[href^=mailto]')
    for link in mailto_links:
        email = link['href'].split(':')[1]
        sanitized_email = sanitize_email(email, contact_keywords)
        emails.update(sanitized_email)

    cfemail_elements = soup.select('.__cf_email__')
    for element in cfemail_elements:
        email = decode_email(element['data-cfemail'])
        sanitized_email = sanitize_email(email, contact_keywords)
        emails.update(sanitized_email)

    return emails
    
def decode_email(encoded_string):
    r = int(encoded_string[:2], 16)
    email = ''.join(chr(int(encoded_string[i:i+2], 16) ^ r) for i in range(2, len(encoded_string), 2))
    return email

def sanitize_email(text, contact_keywords):
    # Adiciona um espaço entre números e letras caso estejam grudados
    text = re.sub(r'(\d)([a-zA-Z])', r'\1 \2', text)
    #Atualiza o padrão para números de telefone comuns no Brasil
    phone_pattern = r'\b(\d{2,4}[-.()\s]*){2,3}\d{4}\b'
    #Remove números de telefone
    sanitized_text = re.sub(phone_pattern, '', text)
    #Corrige emails concatenados com "brho" ao final
    sanitized_text = re.sub(r'\b(br)ho\b', r'\1', sanitized_text)
    #Captura todos os e-mails no texto
    email_pattern = r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,4}\b'
    all_emails = re.findall(email_pattern, sanitized_text)
    #Filtra e-mails válidos que terminam com ".br" ou contêm um ano específico
    valid_emails = [email for email in all_emails if email.endswith('.br') or re.search(r'\d{4}', email)]
    #Cria padrão regex que permite qualquer correspondência de palavras-chave
    contact_pattern = '|'.join(re.escape(word) for word in contact_keywords)
    #Verifica se os emails contêm qualquer uma das palavras-chave
    filtered_emails = [email for email in valid_emails if re.search(contact_pattern, email)] 
    #Retorna todos os e-mails em minúsculas, evitando duplicações
    return set(email.lower().strip() for email in filtered_emails)

###Coletar os telefone e celulares
def get_phones_from_page(soup, contact_keywords):
    phones = set()
    phone_pattern = r'(?:\+?55\s?)?\(?\d{2}\)?[\s.-]?9?\d{4}[-\s./]?\d{4}'
    undesired_patterns = [
        r'\b\d{3}\.\d{3}\.\d{3}-\d{2}\b',  #CPF
        r'\b\d{2}\.\d{3}\.\d{3}/\d{4}-\d{2}\b',  #CNPJ
        r'(?<![\d/.-])\b\d{4}\b(?!/)',  #Ano isolado
        r'\blei?\b|\blegislação\b',  #Exemplo genérico para leis
    ]

    def is_undesired_number(number):
        return any(re.search(pattern, number) for pattern in undesired_patterns)

    found_using_keywords = False

    for tag in search_tags:
        elements = soup.find_all(tag)
        for element in elements:
            element_text = element.get_text().lower()
            if any(keyword in element_text for keyword in contact_keywords):
                found_phones = re.findall(phone_pattern, element_text)
                for phone in found_phones:
                    sanitized_phone = sanitize_phone(phone)
                    if sanitized_phone and not is_undesired_number(sanitized_phone):
                        phones.add(sanitized_phone)
                if phones:
                    found_using_keywords = True
                    break
        if phones:
            break

    if not phones:
        full_text = soup.get_text().lower()
        found_phones = re.findall(phone_pattern, full_text)
        for phone in found_phones:
            sanitized_phone = sanitize_phone(phone)
            if sanitized_phone and not is_undesired_number(sanitized_phone):
                phones.add(sanitized_phone)

    return phones

def sanitize_phone(phone):
    # Remove tudo que não é número
    clean_phone = re.sub(r'\D', '', phone)
    # Verifica se o número tem pelo menos 10 dígitos
    if len(clean_phone) >= 10:
        return clean_phone
    return None
 
if __name__ == "__main__":
    main()

# Limpando a pasta de logs e de arquivos - Últimos 3 dias

In [87]:
#Dias que deseja deixar os logs e arquivos criados pelo main()
dias_para_manter = 3

#Calcula a data limite
data_limite = datetime.datetime.now() - datetime.timedelta(days=dias_para_manter)

def limpar_arquivos_antigos(pasta, prefixos, extensao):
    for arquivo in os.listdir(pasta):
        #Verifica se o arquivo tem a extensão e prefixo desejados
        if arquivo.endswith(extensao) and any(arquivo.startswith(prefixo) for prefixo in prefixos):
            #Obtém o caminho completo do arquivo
            caminho_arquivo = os.path.join(pasta, arquivo)
            #Obtém a data de modificação do arquivo
            data_modificacao = datetime.datetime.fromtimestamp(os.path.getmtime(caminho_arquivo))
            #Verifica se o arquivo é mais antigo que a data limite
            if data_modificacao < data_limite:
                os.remove(caminho_arquivo)
                print(f"Arquivo removido: {arquivo}")

#Caminho para a pasta onde os arquivos são salvos
pasta_raiz = "./"

#Chama a função para limpar arquivos antigos
limpar_arquivos_antigos(pasta_raiz, ["Contatos_secretarias_de_educacao_", "log_urls_"], ".xlsx")
limpar_arquivos_antigos(pasta_raiz, ["log_urls_"], ".txt")

# Contabilizando o tempo que a aplicação demorou para terminar

In [90]:
end_time = time.time()
#Calcula o total de segundos
total_seconds = end_time - start_time
#Converte esse tempo em um objeto timedelta
delta = datetime.timedelta(seconds=total_seconds)
#Formata e converte o timedelta como HH:MM:SS - string
total_time = str(delta).split('.')[0]

print(total_time)

12:13:53
