This notebook allows you to step through the various LinkedIN selenium functions intervening at whatever step is needed. I am not including the proxy related stuff here, because all of the protonVPN proxies are 'flagged' and could contribute to my accounts being flagged.

In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.keys import Keys
from selenium_stealth import stealth
import re
import os
import glob
import pandas as pd
from time import sleep
import random

functions

In [None]:
def create_driver(agent="user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"):
    try:
        print('driver started')
        chrome_options = Options()
        chrome_options.add_argument(agent)
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
        driver.maximize_window() #max size for consistency with element names
        stealth(driver,
            languages=["en-US", "en"],
            vendor="Google Inc.",
            platform="MacIntel",
            webgl_vendor="Apple Inc.",
            renderer="Apple GPU",
            fix_hairline=True,
        )
        return driver
    except Exception as e:
        print(f"Error: {e}")

def login_linkedin(driver, username, password):
    # Navigate to LinkedIn, enter username and password, submit form
    # If verification page appears, call handle_verification
    # Navigate to the LinkedIn login page
    driver.get('https://www.linkedin.com/login')
    
    # Input username
    username_field = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.ID, 'username')))
    username_field.send_keys(username)
    sleep(random.random()*3)    


    # Input password
    password_field = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.ID, 'password')))
    password_field.send_keys(password)
    sleep(random.random()*3)    

    # Locate the sign in button
    sign_in_button = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.XPATH, '//button[@type="submit"]')))

    # Create an ActionChains object mouse movement to avoid detection
    actions = ActionChains(driver)

    # Move to the sign in button and click it
    actions.move_to_element(sign_in_button).click().perform()



def get_verification_code_from_file(download_dir='/Users/adamhunter/Downloads'):
    # Get a list of all the pgp*.txt files in the download directory
    files = glob.glob(os.path.join(download_dir, 'pgp*.txt'))

    # Find the most recent file
    latest_file = max(files, key=os.path.getctime)

    # Open the file and read the contents
    with open(latest_file, 'r') as f:
        contents = f.read()

    # Use regex to find the verification code in the line that starts with "Subject:"
    match = re.search(r'Subject:.*?(\d{6})', contents)
    if match:
        verification_code = match.group(1)
    else:
        print("No verification code found in email.")
        verification_code = None

    return verification_code

def grab_verification(driver, username, password):
    # Save the handle of the original tab
    original_tab = driver.current_window_handle

    # Open a new tab
    driver.execute_script("window.open('');")

    # Switch to the new tab (it's always the last one)
    driver.switch_to.window(driver.window_handles[-1])

    # Navigate to ProtonMail
    driver.get('https://mail.protonmail.com/login')
    # Input username
    username_field = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.ID, 'username')))
    sleep(random.random()*2)    
    username_field.send_keys(username)

    password_field = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.ID, 'password')))
    sleep(random.random()*3)    
    password_field.send_keys(password)

    # Submit form
    login_button = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.button-large')))
    sleep(random.random()*3)    
    login_button.click()
    print('clicked login to email')

    # Click the first email in the inbox
    first_email = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.item-container-wrapper:nth-child(1) .item-subject .max-w100')))
    first_email.click()

    # Click 'More options' button
    more_options_button = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.button:nth-child(11)')))
    more_options_button.click()

    # Click 'View headers' button
    view_headers_button = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.dropdown-item:nth-child(10) .flex-item-fluid')))
    view_headers_button.click()

    # Click 'Download' button
    download_button = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.button-solid-norm:nth-child(2)')))
    download_button.click()

    

    # Switch back to the original tab
    driver.switch_to.window(original_tab)

    sleep(3)

    verification_code = get_verification_code_from_file()

    # Input verification code
    verification_field = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.ID, 'input__email_verification_pin')))
    sleep(random.random()*3)    
    verification_field.send_keys(verification_code)
    sleep(random.random())

    # Click the verify button
    verify_button = WebDriverWait(driver, 30).until(EC.element_to_be_clickable((By.ID, 'email-pin-submit-button')))

    # Create an ActionChains object mouse movement to avoid detection
    actions = ActionChains(driver)

    # Move to the sign in button and click it
    actions.move_to_element(verify_button).click().perform()

from selenium.webdriver.common.action_chains import ActionChains

def collect_group_or_network_links(driver, scraped_profiles, target_count=10):
    
    collected_links = set()

    while len(collected_links) < target_count:
        print(len(collected_links))

        # Scroll to the top and wait 1-3 seconds triggering infinite scroll basically
        driver.execute_script("window.scrollTo(0, 0);")
        sleep(1 + 2*random.random())

        # Scroll down
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        sleep(1 + 2*random.random())
        
        # Collect all links
        all_links = WebDriverWait(driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "a")))
        member_links = [link.get_attribute('href') for link in all_links if link.get_attribute('href').startswith("https://www.linkedin.com/in/")]

        # Use regex to shorten the member links to cutoff anything after the profile slug
        member_links = [re.match("(https://www.linkedin.com/in/[^/]*)", link).group(1) for link in member_links]

        # Convert the list to a set to remove duplicates
        member_links = set(member_links)

        # Filter the member_links to include only those that are not in the scraped_profiles
        new_links = [link for link in member_links if link not in scraped_profiles]

        # Add the new links to the collected links set
        collected_links.update(new_links)

    # Switch to the current tab
    driver.switch_to.window(driver.window_handles[-1])

    # Convert the set back to a list and return it
    return list(collected_links)

def collect_search_links(driver, scraped_profiles,page_number=2, target_count=100):
    # Initialize an empty set to store the collected links
    collected_links = set()

    # Try to read the page number from a file
    with open('../reference/page_number.txt', 'r') as f:
        page_number = int(f.read())
    print(page_number)
    while len(collected_links) < target_count:
        # Define the search URL
        search_url = f"https://www.linkedin.com/search/results/PEOPLE/?geoUrn=%5B%22103644278%22%5D&keywords=data%20analyst&network=%5B%22F%22%2C%22S%22%2C%22O%22%5D&origin=FACETED_SEARCH&page={page_number}&sid=m0%3A"

        # Navigate to the search URL
        driver.get(search_url)

        # Wait for the page to load and collect all links
        all_links = WebDriverWait(driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "a")))
        member_links = [link.get_attribute('href') for link in all_links if link.get_attribute('href').startswith("https://www.linkedin.com/in/")]

        # Use regex to shorten the member links to cutoff anything after the profile slug
        member_links = [re.match("(https://www.linkedin.com/in/[^/]*)", link).group(1) for link in member_links]

        # Convert the list to a set to remove duplicates
        member_links = set(member_links)

        # Filter the member_links to include only those that are not in the scraped_profiles
        new_links = [link for link in member_links if link not in scraped_profiles]

        # Add the member links to the collected links set
        collected_links.update(new_links)

        # Increment the page number
        page_number += 1

        # Save the current page number to a file
        with open('page_number.txt', 'w') as f:
            f.write(str(page_number))
        
        sleep(10+20*random.random())

    driver.switch_to.window(driver.window_handles[-1])

        # Convert the set back to a list and return it
    return list(collected_links)


def scrape_all(driver, member_links, num_to_scrape=None):
    # If num_to_scrape is not specified, scrape all member_links
    if num_to_scrape is None:
        num_to_scrape = len(member_links)

    for member_link in member_links[:num_to_scrape]:
        try:
            driver.get(member_link)
            print('scraping'+member_link)
            sleep(10+20*random.random())
            # Get the page source and save it as a .txt file
            page_source = driver.page_source
            # Extract the profile name from the member link
            profile_name = member_link.rstrip('/').split('/')[-1]            # Use the profile name to name the .txt file
            with open(f'../data/{profile_name}_page_source.txt', 'w') as f:
                f.write(page_source)
            # Open the scraped_profiles file in append mode
            with open('../reference/scraped_profiles.txt', 'a') as f:
                # Write the member_link to the file
                f.write(member_link + '\n')
            
            # Check if 'sign in' is in the page title
            if 'sign in' in driver.title.lower():
                # If 'sign in' is in the title, abort the loop
                break


            sleep(1+3*random.random())
        except Exception as e:
            print(e)
            continue
    # Switch to the current tab
    driver.switch_to.window(driver.window_handles[-1])


Create your driver

In [None]:
driver = create_driver()
driver.get('https://www.linkedin.com/login')

Attempt auto login, or manually login and skip the next cell. Add your username and password here. This account should be a real trusted account, to be used for pulling links from a application-acceptance based LinkedIN group. If you want to use a all-accepting linkedin group then just use a bot account for this part too.

In [None]:
import os

trusted_password = os.environ.get('ACTUAL_LINKED_IN_PASS')
trusted_username = os.environ.get('ACTUAL_LINKED_IN_ACC')

In [None]:
login_linkedin(driver, trusted_username, trusted_password)

Use the opened tab to navigate to group page or 'my network' page collect_links will gather profile urls up to a chosen limit, checking against list of profiles already scraped. You could probably set this amount pretty high, like 1,000, and maybe not get in trouble since all that will happen from linkedIn's perspective is a login followed by scrolling down a huge list in a group. This step requires the driver to be in focus, possibly because it is executing js commands to scroll to top and to bottom. This doesn't seem like a huge issue in general since the links gathered in this step take 20x longer to actually scrape.

In [None]:
with open('../reference/scraped_profiles.txt', 'r') as file:
    scraped_profiles = file.read().split('\n')

collected_links = collect_search_links(driver,scraped_profiles,target_count = 30)

with open('../reference/links_to_scrape.txt', 'a') as file:
    for link in collected_links:
        file.write("%s\n" % link)

Only quit here if you are going to a different account for the profile scraping phase.

In [None]:
# driver.quit()

Now with a list of profiles accounts and user-agent strings, pick a random account and do some scraping. In a fully fledged version this would iterate through all of them rather than pick a random.

In [None]:
# Read the CSV file into a DataFrame
accounts_df = pd.read_csv('../reference/accounts.csv')

# Filter the DataFrame to include only active accounts
active_accounts = accounts_df[accounts_df['acc_status'] == 1]

# Select a random row from the DataFrame
random_account = active_accounts.sample(1).iloc[0]

bot_username = random_account['username']
user_agent = random_account['user_agent']
bot_password = random_account['password']

In [None]:
driver = create_driver(user_agent)
login_linkedin(driver, bot_username, bot_password)
restriction_text = '</h1><p>We\'ve restricted your account until '
if restriction_text in driver.page_source:
    print(driver.page_source)


The following cell will work if the bot accounts are created with protonmail, using the same password as their associated linkedin acc. At some point I should probably generate a random list of human sounding emails and passwords.

In [None]:
if "Let's do a quick verification" in driver.page_source:
    grab_verification(driver, bot_username, bot_password)
    # Get the verification code from the most recent pgp*.txt file
elif "Let's do a quick security check" in driver.page_source: #captcha page
    driver.switch_to.window(driver.current_window_handle) #brings page to focus for you
    sleep(30)

Specify a number in scrape all to limit number of links scraped or leave blank to scrape all of them.

In [None]:
with open('../reference/scraped_profiles.txt', 'r') as file:
    scraped_profiles = file.read().split('\n')

with open('../reference/links_to_scrape.txt', 'r') as file:
    links_to_scrape = file.read().split('\n')
    
links_to_scrape = list(set(links_to_scrape)) # Remove duplicates by converting to set and back to list

# Remove any links that are already in scraped_profiles
links_to_scrape = [link for link in links_to_scrape if link not in scraped_profiles]

print(len(links_to_scrape))

with open('../reference/links_to_scrape.txt', 'w') as file:
    for link in links_to_scrape:
        file.write("%s\n" % link)

with open('../reference/links_to_scrape.txt', 'r') as file:
    collected_links = file.read().split('\n')

In [None]:
scrape_all(driver, collected_links,200)