In [5]:
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException
from selenium.common.exceptions import StaleElementReferenceException
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.support.ui import Select
import pandas as pd
from bs4 import BeautifulSoup
import requests
import os




class Navegador:
    def __init__(self):
        # Configurar opções do Chrome
        options = Options()
        options.add_argument("--enable-automation")
        options.add_argument("--start-maximized")
        options.add_argument("--disable-notifications")
        options.add_argument("--disable-popup-blocking")
        options.add_argument("--kiosk-printing")

        #add plugin
        options.add_extension('./solver.crx')
        
        self.servico = Service(ChromeDriverManager().install())
        
        
        # Inicializar o WebDriver do Chrome com as opções configuradas
        #self.driver = webdriver.Remote(command_executor="http://localhost:4444/wd/hub", options=options)
        self.driver = webdriver.Chrome(service=self.servico, options=options)
        self.wait = WebDriverWait(self.driver, 15)
        self.by = By
        self.locator = {
            "XPATH": By.XPATH,
            "ID": By.ID,
            "CLASS_NAME": By.CLASS_NAME,
            "LINK_TEXT": By.LINK_TEXT,
            "NAME": By.NAME,
            "PARTIAL_LINK_TEXT": By.PARTIAL_LINK_TEXT,
            "TAG_NAME": By.TAG_NAME,
            "CSS_SELECTOR": By.CSS_SELECTOR
        }        

    def get_session_id (self):
        return self.driver.session_id

    def disable_alert(self):
        self.driver.switch_to.alert.dismiss()

    def element_get_text(self, element, tag):
        if element in self.locator:
            try:
                # Aguardar até que o elemento seja visível e, em seguida, retornar seu texto
                element_text = self.wait.until(EC.visibility_of_element_located((self.locator[element], tag)))
                return element_text
            except TimeoutException:
                print("Elemento não encontrado")   
                  
    def get_elements(self, element, tag):
        if element in self.locator:
            try:
                # Aguardar até que o elemento seja visível e, em seguida, retornar seu texto
                elements = self.wait.until(EC.visibility_of_all_elements_located((self.locator[element], tag)))
                return elements
            except TimeoutException:
                print("Elemento não encontrado")

    def get(self, url):
        # await asyncio.sleep(0)
        self.driver.get(url)
    def close(self):
    #  await asyncio.sleep(0)
        self.driver.quit()   

    def close_session(self, session_id):
        grid_url = "https://grid.consium.com.br/wd/hub"
        session_url = f"{grid_url}/session/{session_id}"
        response = requests.delete(session_url)
        if response.status_code == 200:
            print("Sessão fechada com sucesso!")
        else:
            print("Falha ao fechar a sessão.")

        return response    
    # Funcao para digitar no elemento           
    def sendkeys(self, element, tag, keys):
    #  await asyncio.sleep(0)
        if element in self.locator:
            try:
                self.wait.until(EC.presence_of_element_located((self.locator[element], tag))).send_keys(keys)
            except TimeoutException:
                print("Elemento não encontrado")
                
    # Funcao para clicar no elemento                
    def click(self, element, tag):
    #  await asyncio.sleep(0)
        if element in self.locator:
            try:
                self.wait.until(EC.visibility_of_element_located((self.locator[element], tag))).click()
            except TimeoutException:    
                print("Elemento não encontrado")


    def get_table_element(self, element, tag):
        try:
            # Obter o conteúdo HTML da tag <tbody>
            html_content = self.wait.until(EC.visibility_of_element_located((self.locator[element], tag))).get_attribute('innerHTML')
            # Extrair dados da tabela e transforma em dataframe
            data = self.table_to_dataframe(html_content)
            qtd_linhas = len(data)
            return data, qtd_linhas
        except TimeoutException:
            print("Elemento não encontrado")

    def table_to_dataframe(self, html_content):

        soup = BeautifulSoup(html_content, 'html.parser')

        # Encontra a tabela desejada (selecionando-a pela classe, id ou outras características)
        table = soup.find('table')

        # Verifica se a tabela foi encontrada
        if table:
            # Inicializa uma lista para armazenar os dados da tabela
            table_data = []
            # Itera sobre as linhas da tabela (<tr>)
            for row in table.find_all('tr'):
                # Inicializa uma lista para armazenar os dados de uma linha
                row_data = []
                # Itera sobre as células da linha (<td>)
                for cell in row.find_all(['td']):
                    # Adiciona o texto da célula à lista de dados da linha
                    value = cell.text.strip()
                    # Verifica se o valor não está vazio
                    if value:
                        row_data.append(value)
                    else:
                        row_data.append(None)
                    # Verifica se a célula contém uma tag de âncora (hiperlink)
                    link = cell.find('a')
                    if link:
                        # Se houver uma tag de âncora, adiciona o link (href) à lista de dados da linha
                        row_data.append(link.get('href'))
                    else:
                        row_data.append(None)
                # Adiciona os dados da linha à lista de dados da tabela
                if row_data:
                    table_data.append(row_data)

            # Imprime os dados da tabela
            
            df = pd.DataFrame(table_data)
            df.to_excel('arquivo.xlsx', index=False)

            return df 
        

                   

In [6]:
import sqlite3
import json

def salvar_ou_atualizar_perfil_em_banco(perfil, nome_arquivo_db="profiles.db"):
    """
    Salva ou atualiza um único perfil no banco de dados SQLite.

    :param perfil: Dicionário representando o perfil a ser salvo ou atualizado.
    :param nome_arquivo_db: Nome do arquivo do banco de dados SQLite.
    """
    # Conexão com o banco de dados
    conn = sqlite3.connect(nome_arquivo_db)
    cursor = conn.cursor()

    # Certificar-se de que a tabela existe
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS profile (
        link TEXT PRIMARY KEY,
        nome TEXT,
        skills TEXT,
        sobre TEXT,
        cargo TEXT,
        experiencia TEXT,
        educacao TEXT,
        certificacoes TEXT,
        contato_email TEXT,
        contato_telefone TEXT,
        contato_linkedin TEXT,
        contato_github TEXT,
        foto TEXT
    );
    ''')

    link = perfil.get("link", "")
    
    # Verificar se o link já existe na tabela
    cursor.execute("SELECT link FROM profile WHERE link = ?", (link,))
    if cursor.fetchone() is not None:
        # Atualizar o registro existente
        cursor.execute('''
        UPDATE profile
        SET nome = ?, skills = ?, sobre = ?, cargo = ?, experiencia = ?, 
            educacao = ?, certificacoes = ?, contato_email = ?, contato_telefone = ?, 
            contato_linkedin = ?, contato_github = ?, foto = ?
        WHERE link = ?
        ''', (
            perfil.get("nome", ""),
            perfil.get("skills", ""),
            perfil.get("sobre", ""),
            perfil.get("cargo", ""),
            json.dumps(perfil.get("experiencia", [])),
            json.dumps(perfil.get("educacao", [])),
            json.dumps(perfil.get("certificacoes", [])),
            perfil.get("contato", {}).get("email", ""),
            perfil.get("contato", {}).get("telefone", ""),
            perfil.get("contato", {}).get("linkedin", ""),
            perfil.get("contato", {}).get("github", ""),
            perfil.get("foto", ""),
            link
        ))
        print(f"Registro com o link '{link}' atualizado com sucesso.")
    else:
        # Inserir um novo registro
        cursor.execute('''
        INSERT INTO profile (
            link, nome, skills, sobre, cargo, experiencia, educacao, certificacoes,
            contato_email, contato_telefone, contato_linkedin, contato_github, foto
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            link,
            perfil.get("nome", ""),
            perfil.get("skills", ""),
            perfil.get("sobre", ""),
            perfil.get("cargo", ""),
            json.dumps(perfil.get("experiencia", [])),
            json.dumps(perfil.get("educacao", [])),
            json.dumps(perfil.get("certificacoes", [])),
            perfil.get("contato", {}).get("email", ""),
            perfil.get("contato", {}).get("telefone", ""),
            perfil.get("contato", {}).get("linkedin", ""),
            perfil.get("contato", {}).get("github", ""),
            perfil.get("foto", "")
        ))
        print(f"Registro com o link '{link}' salvo com sucesso.")

    # Confirmar transações e fechar conexão
    conn.commit()
    conn.close()
    print(f"Processamento concluído. Banco de dados atualizado: {nome_arquivo_db}!")

In [7]:
def is_page_exists(navegador):
    try:
        # Aguarda até 10 segundos para o elemento de erro aparecer
        error_element = WebDriverWait(navegador.driver, 4).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'h2.artdeco-empty-state__headline')))
        
        # Verifica se o texto "Esta página não existe" está no elemento
        if 'Esta página não existe' in error_element.text:
            # Se necessário, você pode adicionar um return ou outro comportamento
            # para interromper ou redirecionar o fluxo do seu código aqui.
            return False
        else:
            return True

    except TimeoutException:
        # Se o elemento não for encontrado dentro do tempo, segue normalmente
        return True



In [8]:
def is_page_is_not_found(navegador):
    try:
        # Espera até que o elemento com texto 'Página não encontrada' apareça
        navegador.wait.until(EC.presence_of_element_located((By.ID, 'i18n_pt_BR')))
    except TimeoutException:
        # Se o tempo expirar e não encontrar o texto, retorna False
        return False
    return True  # Se encontrar o texto, retorna True


In [9]:
def gerar_query(cargos = [], habilidades = [], bancos_dados = [], ferramentas = [], localizacoes = [], empresa=None):
    # Criar a parte da query para os cargos
    cargos_query = " OR ".join([f'"{cargo}"' for cargo in cargos])
    
    # Criar a parte da query para as habilidades
    habilidades_query = " OR ".join([f'"{habilidade}"' for habilidade in habilidades])
    
    # Criar a parte da query para os bancos de dados
    bancos_dados_query = " OR ".join([f'"{banco}"' for banco in bancos_dados])
    
    # Criar a parte da query para as ferramentas
    ferramentas_query = " OR ".join([f'"{ferramenta}"' for ferramenta in ferramentas])
    
    # Criar a parte da query para as localizações
    localizacoes_query = " OR ".join([f'"{localizacao}"' for localizacao in localizacoes])
    
    # Adicionar a empresa, se fornecida
    empresa_query = f' "{empresa}"' if empresa else ""
    
    # Montar a query final
    query = (f'site:linkedin.com/in/ ({cargos_query}) ({habilidades_query}) '
             f'({bancos_dados_query}) ({ferramentas_query}) ({localizacoes_query}){empresa_query}')
        
    return query

# Exemplo de uso da função
cargos = ["Backend Developer", "Backend Engineer", "Desenvolvedor Backend", "Engenheiro de Software", "Software Engineer"]
habilidades = ["Python", "Django", "Flask", "FastAPI", "REST API", "GraphQL", "Microservices"]
bancos_dados = ["SQL", "NoSQL", "PostgreSQL", "MySQL", "MongoDB"]
ferramentas = ["Docker", "Kubernetes", "Git", "GitHub", "CI/CD", "DevOps"]
localizacoes = ["Brasil", "Brazil", "Remoto", "Remote"]



"""query = gerar_query(cargos, habilidades, bancos_dados, ferramentas, localizacoes)
google_query = 'https://www.google.com.br/search?q=' + query
print(google_query)"""


"query = gerar_query(cargos, habilidades, bancos_dados, ferramentas, localizacoes)\ngoogle_query = 'https://www.google.com.br/search?q=' + query\nprint(google_query)"

In [10]:

def wait_for_captcha(navegador):

    try:
        # Aguarda até o elemento estar presente
        WebDriverWait(navegador.driver, 120).until(EC.presence_of_element_located((By.CLASS_NAME, "HZVG1b.Tg7LZd")))

        print("Elemento encontrado!")
    except TimeoutException:
        print("Elemento não encontrado dentro do tempo especificado.")
        # Fecha o navegador em caso de erro
        navegador.quit()
        # Interrompe a execução do código
        raise SystemExit("Execução encerrada devido a erro.")

In [39]:
def get_google_results(navegador, max_candidates):
    # Determina o número de páginas a serem processadas
    max_candidates = round(max_candidates / 5)
    perfis = []  # Lista para armazenar todos os perfis

    for i in range(max_candidates):
        # Busca elementos na página atual
        google_results = navegador.driver.find_elements(By.XPATH, '//span[@jscontroller="msmzHf"]')
        print(f"Processando candidato {i+1} de {max_candidates}")

        for result in google_results:

            try:
                # Tentando obter o link
                try:
                    link = result.find_element(By.TAG_NAME, "a").get_attribute("href")

                    link.click()

                    # Normaliza o prefixo para remover qualquer idioma ou região desnecessários
                    for prefix in ["/pt", "/en", "/es", "/fr", "/de"]:  # Adicione outros idiomas, se necessário
                        link = link.replace(prefix, "")

                    # Remove o prefixo "https://br." para uniformizar os links
                    link = link.replace("https://br.", "https://")

                    # Remove qualquer prefixo antes de linkedin.com (cm., ke., etc.)
                    link = link.split('linkedin.com', 1)[-1]  # Mantém a parte após 'linkedin.com'
                    link = "https://linkedin.com" + link  # Adiciona o prefixo padrão 'https://linkedin.com'

                except:
                    link = "Link não encontrado"


                # Adicionando os dados à lista de perfis
                perfis.append({
                    "link": link,
                    "nome": "",
                    "skills": "",
                    "sobre": "",
                    "cargo": "",
                    "experiencia": [],
                    "educacao": [],
                    "certificacoes": [],
                    "contato": {
                        "email": "",
                        "telefone": "",
                        "linkedin": "",
                        "github": ""
                    }
                })
            except Exception as e:
                print(f"Erro ao processar candidato: {e}")
        
        # Avança para a próxima página
        navegador.click("ID", "pnnext")

    return perfis


In [35]:
navegador = Navegador()

max_interactions = 5



In [77]:
google_results = navegador.driver.find_elements(By.XPATH, '//span[@jscontroller="msmzHf"]')

# Encontrar o link dentro do primeiro elemento encontrado
link_elemento = google_results[0].find_element(By.XPATH, './/a')

link_elemento.click()


In [78]:
icon_element = navegador.driver.find_element(By.CLASS_NAME, 'contextual-sign-in-modal__modal-dismiss-icon')

icon_element.click()

ElementNotInteractableException: Message: element not interactable
  (Session info: chrome=131.0.6778.204)
Stacktrace:
#0 0x614524af01fa <unknown>
#1 0x61452460066d <unknown>
#2 0x61452465066c <unknown>
#3 0x6145246441f5 <unknown>
#4 0x614524673582 <unknown>
#5 0x614524643b38 <unknown>
#6 0x61452467374e <unknown>
#7 0x614524692007 <unknown>
#8 0x614524673323 <unknown>
#9 0x614524641de0 <unknown>
#10 0x614524642dbe <unknown>
#11 0x614524abc12b <unknown>
#12 0x614524ac00c7 <unknown>
#13 0x614524aa96cc <unknown>
#14 0x614524ac0c47 <unknown>
#15 0x614524a8e67f <unknown>
#16 0x614524adf288 <unknown>
#17 0x614524adf450 <unknown>
#18 0x614524aef076 <unknown>
#19 0x78d2eb69ca94 <unknown>
#20 0x78d2eb729c3c <unknown>


In [44]:
google_result = get_google_results(navegador, max_interactions)




Processando candidato 1 de 1


StaleElementReferenceException: Message: stale element reference: stale element not found
  (Session info: chrome=131.0.6778.204); For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors#stale-element-reference-exception
Stacktrace:
#0 0x614524af01fa <unknown>
#1 0x614524600810 <unknown>
#2 0x6145246120cb <unknown>
#3 0x614524610e45 <unknown>
#4 0x614524606e39 <unknown>
#5 0x6145246051a0 <unknown>
#6 0x614524608a68 <unknown>
#7 0x614524608af3 <unknown>
#8 0x61452464f165 <unknown>
#9 0x61452464f7a1 <unknown>
#10 0x614524643c46 <unknown>
#11 0x6145246735ad <unknown>
#12 0x614524643b38 <unknown>
#13 0x61452467374e <unknown>
#14 0x614524692007 <unknown>
#15 0x614524673323 <unknown>
#16 0x614524641de0 <unknown>
#17 0x614524642dbe <unknown>
#18 0x614524abc12b <unknown>
#19 0x614524ac00c7 <unknown>
#20 0x614524aa96cc <unknown>
#21 0x614524ac0c47 <unknown>
#22 0x614524a8e67f <unknown>
#23 0x614524adf288 <unknown>
#24 0x614524adf450 <unknown>
#25 0x614524aef076 <unknown>
#26 0x78d2eb69ca94 <unknown>
#27 0x78d2eb729c3c <unknown>


In [32]:
def get_linkedin_profile(**kwargs):

    cookie = {
    "name": "li_at",
    "value": os.environ["LINKEDIN_COOKIE"],
    "domain": ".linkedin.com"
}

    navegador = Navegador()


    navegador.get('https://www.linkedin.com/')
    navegador.driver.add_cookie(cookie)

    navegador.get('https://www.google.com.br/')

    query = gerar_query(cargos = kwargs.get("cargos", []), 
                habilidades = kwargs.get("habilidades", []), 
                bancos_dados = kwargs.get("bancos_dados", []), 
                ferramentas = kwargs.get("ferramentas", []), 
                localizacoes = kwargs.get("localizacoes", [])
                )
    print(query)    
    navegador.get(f'https://www.google.com.br/search?q={query}')

    wait_for_captcha(navegador)

    max_interactions = kwargs.get("max_interactions", 5)

    #google_result = get_google_results(navegador, max_interactions)

    #navegador.close()
    

    return google_result

    

In [13]:

def get_certifications(navegador, profile_url):
    # pega certificacoes

    import time
    import json

    certification_url = profile_url + "/details/certifications/"

    navegador.get(certification_url)
    if not is_page_exists(navegador):
        return []
    # tratar caso nao tenha certificacoes
    try:
        navegador.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.t-20.t-bold.ph3.pt3.pb2')))


        navegador.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)

        html = navegador.driver.page_source

        soup = BeautifulSoup(html, 'html.parser')

        certification_sections = soup.select('.pvs-list__paged-list-item')

        certifications_data = []

        # Iterando pelas seções de certificações
        for section in certification_sections:
            try:
                # Extraindo informações específicas
                certification_name = section.select_one('.mr1.hoverable-link-text.t-bold span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.mr1.hoverable-link-text.t-bold span[aria-hidden="true"]') else "N/A"
                issuer = section.select_one('.t-14.t-normal span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.t-14.t-normal span[aria-hidden="true"]') else "N/A"
                issue_date = section.select_one('.pvs-entity__caption-wrapper span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.pvs-entity__caption-wrapper span[aria-hidden="true"]') else "N/A"
                credential_id = section.select_one('.t-14.t-normal.t-black--light span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.t-14.t-normal.t-black--light span[aria-hidden="true"]') else "N/A"
                school_url = section.select_one('a.optional-action-target-wrapper[href]')['href'] if section.select_one('a.optional-action-target-wrapper[href]') else "N/A"
                credential_url = section.select_one('a.artdeco-button[href]')['href'] if section.select_one('a.artdeco-button[href]') else "N/A"
                logo_element = soup.select_one('img[src*="company-logo_100_100"]')
                logo_url = logo_element.get('src') if logo_element else "N/A"
                
                # Adicionando ao JSON
                certifications_data.append({
                    "certification_name": certification_name,
                    "issuer": issuer,
                    "issue_date": issue_date,
                    "credential_id": credential_id,
                    "credential_url": credential_url,
                    "school_url": school_url,
                    "school_logo_url": logo_url
                })
            except Exception as e:
                print(f"Erro ao processar uma seção: {e}")

        # Salvando como JSON
        """with open("certifications.json", "w", encoding="utf-8") as f:
            json.dump(certifications_data, f, ensure_ascii=False, indent=4)"""
        
        return certifications_data

    except TimeoutException:
        return [{
            "certification_name": "N/A",
            "issuer": "N/A",
            "issue_date": "N/A",
            "credential_id": "N/A",
            "credential_url": "N/A",
            "school_url": "N/A"
        }]
    print("Dados de certificações extraídos e salvos em 'certifications.json'.")


In [14]:
def get_education(navegador, profile_url):
    # pega educacao
    import time
    import json

    education_url = profile_url + "/details/education/"
    #class t-20 t-bold ph3 pt3 pb2
    navegador.get(education_url)
    if not is_page_exists(navegador):
        return []
    try:
        navegador.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.t-20.t-bold.ph3.pt3.pb2')))

        navegador.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)

        html = navegador.driver.page_source

        soup = BeautifulSoup(html, 'html.parser')

        education_sections = soup.select('.pvs-list__paged-list-item')

        education_data = []

        for section in education_sections:
            try:
                # Extraindo informações específicas
                institution = section.select_one('.mr1.hoverable-link-text.t-bold span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.mr1.hoverable-link-text.t-bold span[aria-hidden="true"]') else "N/A"
                degree = section.select_one('.t-14.t-normal span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.t-14.t-normal span[aria-hidden="true"]') else "N/A"
                dates = section.select_one('.pvs-entity__caption-wrapper span[aria-hidden="true"]').get_text(strip=True) if section.select_one('.pvs-entity__caption-wrapper span[aria-hidden="true"]') else "N/A"
                logo_element = soup.select_one('img[src*="company-logo_100_100"]')
                logo_url = logo_element.get('src') if logo_element else "N/A"


                # Adicionando ao JSON
                education_data.append({
                    "institution": institution,
                    "degree": degree,
                    "dates": dates,
                    "school_logo_url": logo_url
                })
            except Exception as e:
                print(f"Erro ao processar uma seção: {e}")

        # Salvando como JSON
        """with open("education.json", "w", encoding="utf-8") as f:
            json.dump(education_data, f, ensure_ascii=False, indent=4)"""
            
        return education_data
    except TimeoutException:
        return [{
            "institution": "N/A",
            "degree": "N/A",
            "dates": "N/A"
        }]


In [15]:
def get_experiences(navegador, profile_url):

    import json
    import time
    experience_url = profile_url + "/details/experience/"

    navegador.get(experience_url)
    if not is_page_exists(navegador):
        return []
    
    try:
    #aguarda a pagina carregar
        navegador.wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'pvs-list__paged-list-item')))
        #scroll para baixo demorando 2 segundos
        navegador.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)

        # Extraindo o HTML renderizado
        html = navegador.driver.page_source

        # Parseando com BeautifulSoup
        soup = BeautifulSoup(html, 'html.parser')

        # Buscando as seções de experiência
        experience_sections = soup.select('.pvs-list__paged-list-item')  # Substitua pelo seletor correto

        experiences = []

        for section in experience_sections:
            try:
                job_title = section.select_one('.mr1.t-bold').get_text(strip=True) if section.select_one('.mr1.t-bold') else "N/A"
                company = section.select_one('.t-14.t-normal').get_text(strip=True) if section.select_one('.t-14.t-normal') else "N/A"
                duration = section.select_one('.pvs-entity__caption-wrapper').get_text(strip=True) if section.select_one('.pvs-entity__caption-wrapper') else "N/A"
                #classe das competencias display-flex align-items-center t-14 t-normal t-black
                description = section.select_one('.display-flex.align-items-center.t-14.t-normal.t-black')  # Substitua com o seletor correto
                description_text = description.get_text(strip=True) if description else "N/A"

                #pega a imagem da empresa
                logo_element = soup.select_one('img[src*="company-logo_100_100"]')
                logo_url = logo_element.get('src') if logo_element else "N/A"

                competencies_section = section.select_one('.display-flex.align-items-center.t-14.t-normal.t-black')
                
                # Adicionando ao JSON
                experiences.append({
                    "job_title": job_title,
                    "company": company,
                    "duration": duration,
                    "description": description_text,
                    "company_logo_url": logo_url
                })
            except Exception as e:
                print(f"Erro ao processar uma seção: {e}")

        # Salvando como JSON
        """with open("experiences.json", "w", encoding="utf-8") as f:
            json.dump(experiences, f, ensure_ascii=False, indent=4)"""
        
        return experiences
    except TimeoutException:
        return [{
            "job_title": "N/A",
            "company": "N/A",
            "duration": "N/A",
            "description": "N/A"
        }]
# Fechando o navegador.driver


In [16]:
def update_linkedin_profile(linkedin_profile, update_profile=False):

    import dotenv
    import sqlite3
    dotenv.load_dotenv()

    cookie = {
        "name": "li_at",
        "value": os.environ["LINKEDIN_VISITOR_ID"],
        "domain": ".linkedin.com"
    }

    navegador = Navegador()

    navegador.get('https://www.linkedin.com/feed/')

    navegador.driver.add_cookie(cookie) 

    linkedin_prof = linkedin_profile

    for profile in linkedin_prof:

        #verifica se o perfil ja existe no banco de dados
        conn = sqlite3.connect("profiles.db")
        cursor = conn.cursor()
        cursor.execute("SELECT link FROM profile WHERE link = ?", (profile['link'],))
        if cursor.fetchone() is not None and update_profile == False:
            continue

        if profile['link'] == "Link não encontrado":
            continue
        print(f"Coletando dados do perfil numero {linkedin_prof.index(profile)} do total de {len(linkedin_prof)}")
        profile_url = profile['link']

        #carregar o perfil
        navegador.get(profile_url)
        
        if not is_page_exists(navegador):
            #pula o perfil
            continue

        if is_page_is_not_found(navegador):
            #pula o perfil
            continue

        soup = BeautifulSoup(navegador.driver.page_source, 'html.parser')
        #scrola a pagina e aguarda a pagina carregar
        navegador.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        try:
            # Aumentando o tempo de espera para 20 segundos
            name_element = WebDriverWait(navegador.driver, 20).until(
                EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1.inline.t-24.v-align-middle.break-words'))
            )
            name = name_element.text.strip() if name_element else "N/A"
        except TimeoutException:
            print("Nome não encontrado. Pode ser uma página de erro.")
            name = "N/A"

        # Verifique se name_element é um objeto BeautifulSoup e use get_text apenas se não for None
        

        try:

            photo_element = soup.select_one('img.pv-top-card-profile-picture__image--show.evi-image')
            photo_url = photo_element.get('src') if photo_element else "N/A"

            headline = soup.find('div', {'class': 'text-body-medium break-words'})
            headline = headline.get_text().strip()

            about = soup.find('div', {'class': 'display-flex ph5 pv3'}) if soup.find('div', {'class': 'display-flex ph5 pv3'}) else "N/A"
        except TimeoutException:
            headline = "N/A"
            about = "N/A"
            photo_url = "N/A"
        if about == "N/A":
            about = "N/A"
        else:
            about = about.get_text().strip()

        #atualiza o perfil
        profile['nome'] = name
        profile['skills'] = headline
        profile['sobre'] = about
        profile['foto'] = photo_url
        #coleta de experiencia
        print('-'*100)
        print(f"coletando experiencia, educacao e certificacoes de {name}")
        experiences = get_experiences(navegador, profile_url)
        profile['experiencia'] = experiences
        #coleta de educacao
        education = get_education(navegador, profile_url)
        profile['educacao'] = education
        #coleta de certificacoes
        certifications = get_certifications(navegador, profile_url)
        profile['certificacoes'] = certifications
        
        print(f"dados de {name} coletados com sucesso")
        print('-'*100)

        salvar_perfil_unico(profile)

    navegador.close()
    
    return linkedin_prof


In [17]:
import sqlite3
import json

# Conexão com o banco de dados (ou criação do arquivo se não existir)
conn = sqlite3.connect("profiles.db")
cursor = conn.cursor()

# Criação da tabela
cursor.execute('''
CREATE TABLE IF NOT EXISTS profile (
    link TEXT,
    nome TEXT,
    skills TEXT,
    sobre TEXT,
    cargo TEXT,
    experiencia TEXT,
    educacao TEXT,
    certificacoes TEXT,
    contato_email TEXT,
    contato_telefone TEXT,
    contato_linkedin TEXT,
    contato_github TEXT,
    foto TEXT
);
''')

print("Tabela criada com sucesso!")

# Dados do JSON fornecido
data = {
    "link": "https://linkedin.com/in/joaopedroliveira/en",
    "nome": "Joao Pedro Oliveira",
    "skills": "Data Engineer | CI/CD | Python | Docker | Terraform | AWS Certified",
    "sobre": "Data Engineer with extensive experience in consulting and product companies, specializing in developing and managing complex data infrastructures, ETLs, and public cloud implementations (AWS).Technical Skills: - Proficient in Python, R, SQL, DBT, and tools like Docker and Terraform. - Expertise in developing ETL pipelines for both streaming and batch data. - Extensive experience in building interactive dashboards for stakeholder presentations using tools like Metabase. - Contributor to open-source software as a developer with the Elixir programming language.Experienced with Linux systems and certified as an AWS Cloud Practitioner with two years of experience managing data infrastructure in a regulated financial company. Certification:  AWS Cloud Practitioner (09/2021).Open Source Contributions: Contributor to the Explorer library (gh: elixir-nx/explorer), which adds DataFrames functionality to Elixir.Language Proficiency: Native/Fluent Portuguese Advanced English Basic FrenchAcademic Background: Graduate Research Assistant in the Political Science Department at Emory University (USA). Research Assistant at the Getúlio Vargas Foundation (FGV/EPGE). Data Intern in the Economics Department at PUC-Rio. Research Assistant at the Institute for Applied Economic Research (IPEA). Bachelor's degree in International Relations from PUC-Rio.",
    "cargo": "",
    "experiencia": [],
    "educacao": [],
    "certificacoes": [],
    "contato": {
        "email": "",
        "telefone": "",
        "linkedin": "",
        "github": ""
    },
    "foto": "https://media.licdn.com/dms/image/v2/C4E03AQH3t65XHMHQ4g/profile-displayphoto-shrink_200_200/profile-displayphoto-shrink_200_200/0/1642634082175?e=1743033600&v=beta&t=Om6aH4hQNx5b57xzDdEoTJbfQsesuMtUmunfYARS3xE"
}

# Inserção de dados
cursor.execute('''
INSERT INTO profile (
    link, nome, skills, sobre, cargo, experiencia, educacao, certificacoes,
    contato_email, contato_telefone, contato_linkedin, contato_github, foto
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
    data["link"],
    data["nome"],
    data["skills"],
    data["sobre"],
    data["cargo"],
    json.dumps(data["experiencia"]),  # Serializando lista como JSON
    json.dumps(data["educacao"]),  # Serializando lista como JSON
    json.dumps(data["certificacoes"]),  # Serializando lista como JSON
    data["contato"]["email"],
    data["contato"]["telefone"],
    data["contato"]["linkedin"],
    data["contato"]["github"],
    data["foto"]
))

# Confirmação e encerramento
conn.commit()
print("Dados inseridos com sucesso!")
conn.close()


Tabela criada com sucesso!
Dados inseridos com sucesso!


In [18]:
import sqlite3
import json

def salvar_ou_atualizar_perfis_em_banco(dados, nome_arquivo_db="profiles.db"):
    """
    Salva ou atualiza uma lista de perfis no banco de dados SQLite.

    :param dados: Lista de dicionários representando os perfis.
    :param nome_arquivo_db: Nome do arquivo do banco de dados SQLite.
    """
    # Conexão com o banco de dados
    conn = sqlite3.connect(nome_arquivo_db)
    cursor = conn.cursor()

    # Certificar-se de que a tabela existe
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS profile (
        link TEXT PRIMARY KEY,
        nome TEXT,
        skills TEXT,
        sobre TEXT,
        cargo TEXT,
        experiencia TEXT,
        educacao TEXT,
        certificacoes TEXT,
        contato_email TEXT,
        contato_telefone TEXT,
        contato_linkedin TEXT,
        contato_github TEXT,
        foto TEXT
    );
    ''')

    for perfil in dados:
        link = perfil.get("link", "")
        
        # Verificar se o link já existe na tabela
        cursor.execute("SELECT link FROM profile WHERE link = ?", (link,))
        if cursor.fetchone() is not None:
            # Atualizar o registro existente
            cursor.execute('''
            UPDATE profile
            SET nome = ?, skills = ?, sobre = ?, cargo = ?, experiencia = ?, 
                educacao = ?, certificacoes = ?, contato_email = ?, contato_telefone = ?, 
                contato_linkedin = ?, contato_github = ?, foto = ?
            WHERE link = ?
            ''', (
                perfil.get("nome", ""),
                perfil.get("skills", ""),
                perfil.get("sobre", ""),
                perfil.get("cargo", ""),
                json.dumps(perfil.get("experiencia", [])),  # Serializar lista como JSON
                json.dumps(perfil.get("educacao", [])),  # Serializar lista como JSON
                json.dumps(perfil.get("certificacoes", [])),  # Serializar lista como JSON
                perfil.get("contato", {}).get("email", ""),
                perfil.get("contato", {}).get("telefone", ""),
                perfil.get("contato", {}).get("linkedin", ""),
                perfil.get("contato", {}).get("github", ""),
                perfil.get("foto", ""),
                link
            ))
            print(f"Registro com o link '{link}' atualizado com sucesso.")
        else:
            # Inserir um novo registro
            cursor.execute('''
            INSERT INTO profile (
                link, nome, skills, sobre, cargo, experiencia, educacao, certificacoes,
                contato_email, contato_telefone, contato_linkedin, contato_github, foto
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                link,
                perfil.get("nome", ""),
                perfil.get("skills", ""),
                perfil.get("sobre", ""),
                perfil.get("cargo", ""),
                json.dumps(perfil.get("experiencia", [])),  # Serializar lista como JSON
                json.dumps(perfil.get("educacao", [])),  # Serializar lista como JSON
                json.dumps(perfil.get("certificacoes", [])),  # Serializar lista como JSON
                perfil.get("contato", {}).get("email", ""),
                perfil.get("contato", {}).get("telefone", ""),
                perfil.get("contato", {}).get("linkedin", ""),
                perfil.get("contato", {}).get("github", ""),
                perfil.get("foto", "")
            ))
            print(f"Registro com o link '{link}' salvo com sucesso.")

    # Confirmar transações e fechar conexão
    conn.commit()
    conn.close()
    print(f"Processamento concluído. Banco de dados atualizado: {nome_arquivo_db}!")


- Colocar no update profile para que salve o usuario no BUBBLE sempre que atualizar o dado
- Lembrar que deve ser passado o job_bubble_id para que salve o usuario no BUBBLE
- Avaliar como esse dado vai entrar no BUBBLE se vai utilizar o BD existente ou criar uma nova estrutura de tabelas para receber os dados


In [19]:
def get_google_profiles(cargos, habilidades, ferramentas, localizacoes, max_interactions, job_bubble_id=None):

    linkedin_profile = []
    update_profile = True

    linkedin_profile = get_linkedin_profile(
        cargos=cargos,
        habilidades=habilidades,
        ferramentas=ferramentas,
        localizacoes=localizacoes,
        max_interactions=max_interactions
    )

    total_profiles = update_linkedin_profile(linkedin_profile, update_profile)

    salvar_ou_atualizar_perfis_em_banco(total_profiles)

    return total_profiles




In [33]:
google_profile = get_google_profiles(
    cargos=["Devops"],
    habilidades=["Natural Language Processing", "NLP", "Python", "Machine Learning", "AI", "TensorFlow", "PyTorch", "Deep Learning", "Data Analysis"],
    ferramentas=["Git", "Docker", "AWS", "Google Cloud"],
    localizacoes=["Brasil"],
    max_interactions=5,
    job_bubble_id=1
)


site:linkedin.com/in/ ("Devops") ("Natural Language Processing" OR "NLP" OR "Python" OR "Machine Learning" OR "AI" OR "TensorFlow" OR "PyTorch" OR "Deep Learning" OR "Data Analysis") () ("Git" OR "Docker" OR "AWS" OR "Google Cloud") ("Brasil")
https://www.google.com.br/search?q=site:linkedin.com/in/ ("Devops") ("Natural Language Processing" OR "NLP" OR "Python" OR "Machine Learning" OR "AI" OR "TensorFlow" OR "PyTorch" OR "Deep Learning" OR "Data Analysis") () ("Git" OR "Docker" OR "AWS" OR "Google Cloud") ("Brasil")


NoSuchWindowException: Message: no such window: target window already closed
from unknown error: web view not found
  (Session info: chrome=131.0.6778.204)
Stacktrace:
#0 0x5785202351fa <unknown>
#1 0x57851fd45810 <unknown>
#2 0x57851fd1b48f <unknown>
#3 0x57851fdc09bd <unknown>
#4 0x57851fdd69bc <unknown>
#5 0x57851fdb8323 <unknown>
#6 0x57851fd86de0 <unknown>
#7 0x57851fd87dbe <unknown>
#8 0x57852020112b <unknown>
#9 0x5785202050c7 <unknown>
#10 0x5785201ee6cc <unknown>
#11 0x578520205c47 <unknown>
#12 0x5785201d367f <unknown>
#13 0x578520224288 <unknown>
#14 0x578520224450 <unknown>
#15 0x578520234076 <unknown>
#16 0x74549449ca94 <unknown>
#17 0x745494529c3c <unknown>


In [61]:
#salva o json
with open("linkedin_profile.json", "w", encoding="utf-8") as f:
    json.dump(google_profile, f, ensure_ascii=False, indent=4)


In [21]:
import sqlite3

def limpar_banco_de_dados(nome_arquivo_db="profiles.db"):
    """
    Remove todos os registros de todas as tabelas no banco de dados SQLite.

    :param nome_arquivo_db: Nome do arquivo do banco de dados SQLite.
    """
    # Conexão com o banco de dados
    conn = sqlite3.connect(nome_arquivo_db)
    cursor = conn.cursor()

    # Obter todas as tabelas do banco de dados
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tabelas = cursor.fetchall()

    if not tabelas:
        print("Nenhuma tabela encontrada no banco de dados.")
    else:
        for tabela in tabelas:
            nome_tabela = tabela[0]
            # Limpar a tabela
            cursor.execute(f"DELETE FROM {nome_tabela};")
            print(f"Tabela '{nome_tabela}' limpa com sucesso.")

        # Confirmar alterações
        conn.commit()

    # Fechar a conexão
    conn.close()
    print(f"Banco de dados '{nome_arquivo_db}' foi completamente limpo.")

limpar_banco_de_dados()

Tabela 'profile' limpa com sucesso.
Banco de dados 'profiles.db' foi completamente limpo.


In [3]:
conn = sqlite3.connect("profiles.db")
cursor = conn.cursor()

cursor.execute("SELECT * FROM profile")
result = cursor.fetchall()

print(result)

conn.close()


[('https://cm.linkedin.com/in/pone-fredy-411504187', 'Pone Fredy', 'Bubble.io Expert | MVP | SaaS | Landing Page', 'N/A', '', '{"job_title": "N/A", "company": "N/A", "duration": "N/A", "description": "N/A"}', '{"institution": "N/A", "degree": "N/A", "dates": "N/A"}', '{"certification_name": "N/A", "issuer": "N/A", "issue_date": "N/A", "credential_id": "N/A", "credential_url": "N/A", "school_url": "N/A"}', '', '', '', '', 'https://media.licdn.com/dms/image/v2/D4E03AQF9ESFXsrf9wg/profile-displayphoto-shrink_200_200/profile-displayphoto-shrink_200_200/0/1699542156796?e=1743033600&v=beta&t=YIRDfMmAYPzyY1Jqt6ZdG8aLFzjG9rT06sEdybmJWcg'), ('https://linkedin.com/in/david-rocha-nocode-developer-full-stack-b04806136', 'David Rocha - Nocode Developer Full Stack', 'Bubble.io Full Stack Developer, N8N, Supabase, Apis', 'N/A', '', '[{"job_title": "Bubble.io - Full StackBubble.io - Full Stack", "company": "FreelanceFreelance", "duration": "out de 2020 - o momento \\u00b7 4 anos 4 meses", "description

In [30]:
import requests

headers = {
    'Authorization': 'Bearer SEU_TOKEN_AQUI',
    'LinkedIn-Version': '202402'
}

response = requests.get(
    'https://api.linkedin.com/v2/people/(vanityName=anndregoliveira)',
    headers=headers
)

response

<Response [401]>