In [6]:
import time
import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import os
import csv
import logging
from dotenv import load_dotenv
import random
def create_logfile():
    date_time = datetime.datetime.today().strftime('%Y-%m-%d_%H.%M.%S')
    logfile = f"log/{date_time}.log"
    logging.basicConfig(filename=logfile, filemode='w', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%m-%Y %H:%M:%S', force=True)
    logging.info(f'Log file {logfile} created')
    
    return logging

def create_file(file, logging):
    logging.info("Creating daily csv file...")
    header = ['date_time', 'search_keyword', 'search_count', 'job_id', 'update_time', 'job_title', 'company', 'location', 'job_description']
    with open(file, 'w', newline='', encoding='utf-8') as f:
        w = csv.writer(f)
        w.writerow(header)
        logging.info(f"{file} created")

def login(logging):
    url_login = "https://pl.linkedin.com/"
    load_dotenv()
    # structure of .env:
    # LINKEDIN_USERNAME=email@gmail.com
    # LINKEDIN_PASSWORD=password

    LINKEDIN_USERNAME = os.getenv('LINKEDIN_USERNAME')
    LINKEDIN_PASSWORD = os.getenv('LINKEDIN_PASSWORD')

    # start chrome
    chrome_options = Options()
    chrome_options.add_argument("--start-maximized")

    # login to LinkedIn
    logging.info(f"Logging in to LinkedIn as {LINKEDIN_USERNAME}...")
    wd = webdriver.Chrome(executable_path='./chromedriver', options=chrome_options)
    wd.get(url_login)
    wd.find_element(By.ID, "session_key").send_keys(LINKEDIN_USERNAME)
    wd.find_element(By.ID, "session_password").send_keys(LINKEDIN_PASSWORD)
    
    time.sleep(2)
    wd.find_element(By.XPATH, "//button[@type = 'submit']").click()
    time.sleep(15)
    logging.info("Log in complete. Scraping data...")

    return wd
def page_search(wd, search_keyword, search_page, search_count, file, logging):
    # wait time for events in seconds
    page_wait = 20 + random.uniform(-5,5)
    click_wait = 5
    attempts = 3 # times to retry when trouble with scraping element
    list_jobs = []
    url_search = f"https://www.linkedin.com/jobs/search/?keywords={search_keyword}&start={search_page}"
    wd.get(url_search)
    time.sleep(page_wait)
    try:
        search_count = wd.find_element(By.XPATH, '//small').text
        search_count = search_count.split(" ")
        # numbers in range <1 000; 999 999> are split into a list of 3 strings eg. "1 234 wyniki" => ["1", "234", "wyniki"] 
        # numbers in range <1; 999> are split into a list of 2 strings
        # we want to ditch the word "wyniki"/"wyników"
        if len(search_count)>2:
            search_count = search_count[0]+search_count[1]
        else:
            search_count = search_count[0]
        search_count = int(search_count)
    except:
        search_count = 999999999

    logging.info(f"Loading page {round(search_page/25) + 1} of {round(search_count/25)} for {search_keyword}'s {search_count} results...")
    # get all the job_id's for xpath for current page to click each element
    for attempt in range(attempts):
        try:
            search_results = wd.find_elements(By.XPATH, "//ul[@class='scaffold-layout__list-container']/li")
            result_ids = [result.get_attribute('data-occludable-job-id') for result in search_results if result.get_attribute('data-occludable-job-id') != '']
            break
        except:
            time.sleep(click_wait)
    for job_id in result_ids:
        random_wait = random.uniform(-1,1)
        click_wait=click_wait+random_wait
        time.sleep(click_wait)
        for attempt in range(attempts):
            try:
                wd.find_element(By.XPATH, f"//ul[@class='scaffold-layout__list-container']/li[@data-occludable-job-id='{job_id}']").click()
                time.sleep(0.2)
            except:
                time.sleep(click_wait)
        for attempt in range(attempts):
            try:
                job_title = wd.find_element(By.XPATH, '//h2[@class="t-24 t-bold jobs-unified-top-card__job-title"]').text
                break
            except:
                job_title = ''
                time.sleep(click_wait)
        
        for attempt in range(attempts):
            try:
                company = wd.find_element(By.XPATH, '//a[contains(@class, "ember-view t-black t-normal")]').text
                location = wd.find_element(By.XPATH, '//span[contains(@class, "jobs-unified-top-card__bullet")]').text
                break
            except:
                company = ''
                location = ''
                time.sleep(click_wait)
        for attempt in range(attempts):
            try:
                update_time = wd.find_element(By.XPATH, '//span[contains(@class, "jobs-unified-top-card__posted-date")]').text
                break
            except: 
                update_time = ''
                time.sleep(click_wait)
        
        for attempt in range(attempts):
            try:
                job_description = wd.find_element(By.XPATH, '//div[@id="job-details"]/span')
                job_description = job_description.text
                break
            except:
                job_description = ''
                time.sleep(click_wait)

        # append (a) line to file
        date_time = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
        search_keyword = search_keyword.replace("%20", " ")
        list_job = [date_time, search_keyword, search_count, job_id, update_time, job_title, company, location, job_description]
        for element in range(len(list_job)):
            if "," in str(list_job[element]):
                list_job[element]=list_job[element].replace(",", " ")
            if "\n" in str(list_job[element]):
                list_job[element]=list_job[element].replace("\n", " ")
        list_jobs.append(list_job)
        list_job = []
    with open(file, "a", newline='', encoding='utf-8') as f:
        w = csv.writer(f)
        w.writerows(list_jobs)
    
    list_jobs = []
    
    logging.info(f"Page {round(search_page/25) + 1} of {round(search_count/25)} loaded for {search_keyword}")
    search_page += 25

    return search_page, search_count, url_search


In [7]:
# create logging file
logging = create_logfile()

# create csv file
date = datetime.date.today().strftime('%Y-%m-%d')
file = f"output/{date}.csv"
create_file(file, logging)

# login to linkedin and assign webdriver to variable
wd = login(logging)

# URL search terms focusing on what type of skills are required for Data Analyst & Data Scientist
search_keywords = ['Data Analyst', 'Data Scientist', 'Data Engineer']

# Counting Exceptions
exception_first = 0
exception_second = 0

for search_keyword in search_keywords:
    search_keyword = search_keyword.lower().replace(" ", "%20")

# Loop through each page and write results to csv
    search_page = 0
    search_count = 1
    while (search_page < search_count) and (search_page != 1000):
        try:
            search_page, search_count, url_search = page_search(wd, search_keyword, search_page, search_count, file, logging)
        except Exception as e:
            logging.error(f'(1) FIRST exception for {search_keyword} on {search_page} of {search_count}, retrying...')
            logging.error(f'Current URL: {url_search}')
            logging.error(e)
            logging.exception('Traceback ->')
            exception_first += 1
            time.sleep(5) 
            try:
                search_page, search_count, url_search = page_search(wd, search_keyword, search_page, search_count, file, logging)
                logging.warning(f'Solved Exception for {search_keyword} on {search_page} of {search_count}')
            except Exception as e:
                logging.error(f'(2) SECOND exception remains for {search_keyword}. Skipping to next page...')
                logging.error(f'Current URL: {url_search}')
                logging.error(e)
                logging.exception('Traceback ->')
                search_page += 25 # skip to next page to avoid entry
                exception_second += 1
                logging.error(f'Skipping to next page for {search_keyword}, on {search_page} of {search_count}...')


logging.info(f'LinkedIn data scraping complete with {exception_first} first and {exception_second} second exceptions')
logging.info(f'Regard all further alarms...')

  wd = webdriver.Chrome(executable_path='./chromedriver', options=chrome_options)
