In [1]:
from selenium import webdriver
import time
import pandas as pd
import os
import csv
import logging
from dotenv import load_dotenv
import datetime

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from chromedriver_py import binary_path


In [2]:
def create_logfile():
    # Create a logfile
    date_time = datetime.datetime.today().strftime('%d-%b-%y')
    logfile = f"./log/{date_time}.log"
    logging.basicConfig(filename=logfile, filemode='w', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', force=True)
    logging.info(f'Log file {logfile} created')
    return logging


In [3]:
def create_file(file, logging):
    # delete existing file if re-running
    logging.info("Checking if current daily csv exists...")
    if os.path.exists(file):
        os.remove(file)
        logging.info(f"{file} deleted")
    else:
        logging.info(f"{file} ain't exist")
    
    # create file and add header
    logging.info("Creating daily csv file...")
    header = ['date_time', 'search_keyword', 'search_count', 'job_id', 'job_title', 'company', 'location', 'update_time', 'applicants', 'job_time', 'job_position', 'company_size', 'company_industry', 'job_details']
    with open(file, 'w') as f:
        w = csv.writer(f)
        w.writerow(header)
        logging.info(f"{file} created")

In [4]:
def login(loggin):
    url_login = "https://www.linkedin.com/"

    # pulls login information from file called '.env' 
    load_dotenv()
    LINKEDIN_USERNAME = os.getenv('LINKEDIN_USERNAME')
    LINKEDIN_PASSWORD = os.getenv('LINKEDIN_PASSWORD')

    # setup chrome to run headless
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--window-size=1920,1080")

    # login to LinkedIn account
    logging.info(f"Logging in to LinkedIn as {LINKEDIN_USERNAME}...")

    service = Service(executable_path=binary_path)
    options = webdriver.ChromeOptions()
    wd = webdriver.Chrome(service=service, options=options)
    wd.get(url_login)

    time.sleep(30)
    
    #fill in username and password then click login button
    wd.find_element(By.ID,"session_key").send_keys(LINKEDIN_USERNAME)
    wd.find_element(By.ID,"session_password").send_keys(LINKEDIN_PASSWORD)
    wd.find_element(By.XPATH,"//button[@data-id='sign-in-form__submit-btn']").click()

    # random confirm acount information pop up that may come up
    try: 
        wd.find_element(By.XPATH,"//button[@class='primary-action-new']").click()
    except:
        pass
    logging.info("Log in complete. Scraping data...")

    return wd

In [5]:
def page_search(wd, search_location, search_keyword, search_remote, search_posted, search_page, search_count, file, logging):
    # wait time for events in seconds
    page_wait = 30
    click_wait = 5
    async_wait = 5

    # when retrying, number of attempts
    attempts = 3
    
    # https://www.linkedin.com/jobs/search/?
    # geoId=105790653&keywords=data%20analyst&
    # location=Hanoi%2C%20Hanoi%2C%20Vietnam&
    # refresh=true    

    # search for jobs
    url_search = f"https://www.linkedin.com/jobs/search/?f_TPR={search_posted}&f_WT={search_remote}&geoId=105790653&keywords={search_keyword}&location={search_location}&refresh=truestart={search_page}"
    wd.get(url_search)
    time.sleep(page_wait) # add sleep so don't get caught

    # find the number of results 
    search_count = wd.find_element(By.XPATH,"//div[@class='jobs-search-results-list__subtitle']").text
    search_count = int(search_count.split(' ')[0].replace(',', ''))  # get number before space & remove comma (ex. "1,245 results")
    logging.info(f"Loading page {round(search_page/25) + 1} of {round(search_count/25)} for {search_keyword}'s {search_count} results...")         
    # get jobs from the search result
    for attempt in range(attempts):
        try:
            search_results = wd.find_element(By.XPATH,"//ul[@class='scaffold-layout__list-container']").find_elements(By.TAG_NAME,"li")
            result_ids = [result.get_attribute('data-occludable-job-id') for result in search_results if result.get_attribute('data-occludable-job-id') != '' and result.get_attribute('data-occludable-job-id') is not None]
            break
        except:
            time.sleep(click_wait) # wait for page to load

    list_jobs = [] #initate a blank list to append each page to
    # loop through each job on the page
    for id in result_ids:
        try: 
            job_id = id
            # select a job and start extracting information
            wd.find_element(By.XPATH,f"//div[@data-job-id={job_id}]").click()
        except:
            continue
            # exception likely to job deleteing need to go to next id

        # get job title
        for attempt in range(attempts):
            try:
                # from analysis 3 attempts at 5 second waits gets job titles 99.99% of time 
                job_title = wd.find_element(By.XPATH,"//h2[@class='t-24 t-bold jobs-unified-top-card__job-title']") 
                job_title = job_title.text
                break
            except:
                job_title = ''
                time.sleep(click_wait)

        wd.refresh() # refresh page to get new HTML
        
        # get company name and location

        for attempt in range(attempts):
            try:
                company = wd.find_element(By.XPATH,"//div[@class='jobs-unified-top-card__content--two-pane']/div[@class='jobs-unified-top-card__primary-description']/div/a").text
                location = search_location
                location = location.replace('%2C', ',')
                location = location.replace('%20', ' ')
                #location = wd.find_element(By.XPATH,"//input[@id='jobs-search-box__text-input jobs-search-box__ghost-text-input]").get_attribute('data-job-search-box-location-input-trigger')
                break
            except:
                company = ''
                location = ''
                time.sleep(click_wait)            

        # get jobs update time

        for attempt in range(attempts):
            try:
                update_time = wd.find_element(By.XPATH,"//div[@class='jobs-unified-top-card__content--two-pane']/div[@class='jobs-unified-top-card__primary-description']/div/span[@class='tvm__text tvm__text--neutral'][1]/span[2]")
                update_time = update_time.text
                break
            except: 
                update_time = '' # after #attempts leave as blank and move on
                time.sleep(click_wait)

        # if update time is empty, try again in other way
        if update_time == '':
            for attempt in range(attempts):
                try:
                    update_time = wd.find_element(By.XPATH,"//div[@class='jobs-unified-top-card__content--two-pane']/div[@class='jobs-unified-top-card__primary-description']/div/span[@class='tvm__text tvm__text--neutral'][1]/span")
                    update_time = update_time.text
                    break
                except: 
                    update_time = '' # after #attempts leave as blank and move on
                    time.sleep(click_wait)

        # get number of applicants
        for attempt in range(attempts):
            try:
                applicants = wd.find_element(By.XPATH,"//div[@class='jobs-unified-top-card__content--two-pane']/div[@class='jobs-unified-top-card__primary-description']/div/span[@class='tvm__text tvm__text--positive']/strong")
                applicants = applicants.text
                applicants = applicants.split(" ")[0]
                break
            except: 
                applicants = '' # after #attempts leave as blank and move on
                time.sleep(click_wait)

        # if applicants is empty, try again in other way
        if applicants == '':
            for attempt in range(attempts):
                try:
                    applicants = wd.find_element(By.XPATH,"//div[@class='jobs-unified-top-card__content--two-pane']/div[@class='jobs-unified-top-card__primary-description']/div/span[@class='tvm__text tvm__text--neutral'][3]")
                    applicants = applicants.text
                    applicants = applicants.split(" ")[0]
                    break
                except: 
                    applicants = ''
                    time.sleep(click_wait)

        # Due to (slow) ASYNCHRONOUS updates, need wait times to get job_info
        job_time = ''
        job_position = '' 
        
        for attempt in range(attempts):
            try:
                # 1 - make sure HTML element is loaded
                element = WebDriverWait(wd, 10).until(EC.presence_of_element_located((By.XPATH, "//div[@class='mt5 mb2']")))
                # 2 - make sure text is loaded
                try:
                    job_infor = wd.find_element(By.XPATH,"//div[@class='mt5 mb2']/ul/li[1]/span").text
                    if " · " in job_infor:
                        job_time  = job_infor.split(" · ")[0]
                        job_position = job_infor.split(" · ")[1]
                    else:
                        job_time  = job_infor 
                        job_position = ''
                    break
                except:
                    # error means page didn't load so try again
                    time.sleep(async_wait)
            except:
                # error means page didn't load so try again
                time.sleep(async_wait)
    
        # get company details and seperate on size and industry
        company_size = '' # assigning as blanks as not important info and can skip if not obtained below
        company_industry = ''
        job_details = ''  
        for attempt in range(attempts):
            try:
                company_details = wd.find_element(By.XPATH,"//div[@class='mt5 mb2']/ul/li[2]/span").text
                if " · " in company_details:
                    company_size = company_details.split(" · ")[0]
                    company_size = company_size.replace(" employees", '')
                    company_industry = company_details.split(" · ")[1]
                    
                else:
                    company_size = company_details
                    company_size = company_size.replace(" employees", '')
                    company_industry = ''
                job_details = wd.find_element(By.XPATH,"//div[@class='mt5 mb2']/ul/li[5]/button/a").text
                break
            except: 
                time.sleep(click_wait)

        # append (a) line to file
        date_time = datetime.datetime.now().strftime("%d%b%Y-%H:%M:%S")
        search_keyword = search_keyword.replace("%20", " ")
        list_job = [date_time, search_keyword, search_count, job_id, job_title, company, location, update_time, applicants,job_time, job_position, company_size, company_industry, job_details]
        list_jobs.append(list_job)

        with open(file, "a", encoding="utf-8") as f:
            w = csv.writer(f)
            w.writerows(list_jobs)
            list_jobs = []
    
    logging.info(f"Page {round(search_page/25) + 1} of {round(search_count/25)} loaded for {search_keyword}")
    search_page += 25

    return search_page, search_count, url_search

In [6]:
# create logging file
logging = create_logfile()

# create daily csv file
date = datetime.date.today().strftime('%d-%b-%y')
file = f"data/{date}.csv"
create_file(file, logging)

# login to linkedin and assign webdriver to variable
wd = login(logging)

# URL search terms focusing on what type of skills are required for Data Analyst & Data Scientist
#search_keywords = ['Data Analyst', 'Data Scientist', 'Data Engineer']
search_keywords = ['Data Engineer']
search_location = "Hanoi%2C%20Hanoi%2C%20Vietnam"
search_remote = "1" # filter for remote positions
search_posted = "r2592000" # filter for past 24 hours

# Counting Exceptions
exception_first = 0
exception_second = 0

for search_keyword in search_keywords:
    search_keyword = search_keyword.lower().replace(" ", "%20")

# Loop through each page and write results to csv
    search_page = 0 # start on page 1
    search_count = 1 # initiate search count until looks on page
    while (search_page < search_count) and (search_page != 1000):
        # Search each page and return location after each completion
        try:
            search_page, search_count, url_search = page_search(wd, search_location, search_keyword, search_remote, search_posted, search_page, search_count, file, logging)
        except Exception as e:
            logging.error(f'(1) FIRST exception for {search_keyword} on {search_page} of {search_count}, retrying...')
            logging.error(f'Current URL: {url_search}')
            logging.error(e)
            logging.exception('Traceback ->')
            exception_first += 1
            time.sleep(5) 
            try:
                search_page, search_count, url_search = page_search(wd, search_location, search_keyword, search_remote, search_posted, search_page, search_count, file, logging)
                logging.warning(f'Solved Exception for {search_keyword} on {search_page} of {search_count}')
            except Exception as e:
                logging.error(f'(2) SECOND exception remains for {search_keyword}. Skipping to next page...')
                logging.error(f'Current URL: {url_search}')
                logging.error(e)
                logging.exception('Traceback ->')
                search_page += 25 # skip to next page to avoid entry
                exception_second += 1
                logging.error(f'Skipping to next page for {search_keyword}, on {search_page} of {search_count}...')

# close browser
wd.quit()

logging.info(f'LinkedIn data scraping complete with {exception_first} first and {exception_second} second exceptions')
logging.info(f'Regard all further alarms...')