In [None]:
import pandas as pd
from time import sleep
from datetime import datetime, timedelta
import pyperclip
from selenium.webdriver.common.keys import Keys
from selenium import webdriver
import os
import re
import requests
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import TimeoutException, NoSuchElementException
opt = webdriver.ChromeOptions()
opt.add_argument("--start-maximized")
opt.add_experimental_option("excludeSwitches", ["disable-popup-blocking"])
# opt.add_argument('--headless')
# opt.add_argument('--disable-gpu')
# opt.add_argument("--lang=en")

In [None]:
def check_internet_connection():
    """check for active internet"""
    try:
        print("checking for active internet .....")
        requests.get('http://www.google.com', timeout=50)
        print("Internet connection is available.")
        return True
    except requests.ConnectionError:
        print("Internet connection is not available.")
        return False

In [None]:
def login(login_username, login_password, base_link, driver):
    """
    login into linkedin 
    """
    driver.get(base_link)
    print("logging in please wait ....")
    try:
        username=WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.XPATH, './/input[@id="username"]'))
            )
        username.send_keys(login_username)
        password=WebDriverWait(driver, 30).until(
                    EC.presence_of_element_located((By.XPATH, './/input[@id="password"]'))
                )
        password.send_keys(login_password)
        password.send_keys(Keys.ENTER)
        side_bar=WebDriverWait(driver, 30).until(
                    EC.presence_of_element_located((By.XPATH, './/div[@aria-label="Side Bar"]')))
        print("logging successfull")
        return True
    except TimeoutException:
        print("opps! login failed ...")
        return False

In [None]:
def get_post_id(url):
    """
    This function extracts the post ID from a LinkedIn URL.
    """
    post_id = []
    # Iterate over each string in the list
    for s in url.split("posts/")[-1].split("-"):
        # Find all numbers in the current string
        numbers = re.findall(r'\b\d{19}\b', s)
        # Convert the extracted numbers to integers and add to the list
        post_id.extend(map(int, numbers))
    return post_id[-1]

In [None]:
def extract_unix_timestamp(post_id):
    """
    This function converts a post ID to a Unix timestamp.
    """
    # Convert the post ID to a binary string.
    as_binary = format(int(post_id), "b")
    # Take the first 41 characters of the binary string.
    first_41_chars = as_binary[:41]
    # Convert the binary string to a Unix timestamp.
    timestamp = int(first_41_chars, 2) / 1000
    return timestamp
    


In [None]:
def unix_timestamp_to_string(timestamp):
    """
    This function converts a Unix timestamp to a string.
    """
    # Create a datetime object from the Unix timestamp.
    date_object = datetime.utcfromtimestamp(timestamp)
    # Format the datetime object in a string format.
    formatted_date = date_object.strftime("%Y-%m-%d %H:%M")
    return formatted_date

In [None]:
def date_difference_calculater(given_date_str):
    """ this will check if the post date is older than 4 years or not  """
    given_date = datetime.strptime(given_date_str, '%Y-%m-%d %H:%M')

    # Current date
    current_date = datetime.now()

    # Date 4 years ago from today
    date_4_years_ago = current_date - timedelta(days=4*365)

    # Check if the given date is older than 4 years
    is_older_than_4_years = given_date < date_4_years_ago
    return is_older_than_4_years



In [None]:
def get_date(url):
    """
    This function takes a LinkedIn URL and returns the date of the post.
    """
    # Extract the post ID from the URL.
    post_id = get_post_id(url)
    # Convert the post ID to a Unix timestamp.
    unix_timestamp = extract_unix_timestamp(post_id)
    # Convert the Unix timestamp to a human-readable date.
    human_date_format = unix_timestamp_to_string(unix_timestamp)
    return human_date_format

In [None]:
def scroll_post_to_center_of_screen(driver, post_element):
    """ scroll the post to the center of screen to make it clickable """
    viewport_height = driver.execute_script("return window.innerHeight;")
    element_y = post_element.location['y']
    element_height = post_element.size['height']
    # Calculate scroll position to center the element
    scroll_y = element_y - (viewport_height / 2) + (element_height / 2)
    # Scroll the page using JavaScript to center the element
    driver.execute_script(f"window.scrollTo(0, {scroll_y});")
    # Optional: Add a delay to see the scroll action (adjust as needed)
    sleep(2)

In [None]:
def get_each_post_details(driver, each_post):
    """ get post link and date of a post """
    # click to open post menu 
    try:
        post_header=each_post.find_element(by=By.XPATH, value=f'.//div[@class="relative"]')
        scroll_post_to_center_of_screen(driver,post_header)
        post_header.find_element(by=By.XPATH, value=f'.//div[@class="artdeco-dropdown artdeco-dropdown--placement-bottom artdeco-dropdown--justification-right ember-view"]').click()
        post_menu=WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, './/div[@class="artdeco-dropdown__content-inner"]'))
            )
        # copied to clipboard
        post_menu.find_elements(by=By.XPATH, value=f'.//li')[1].click()
        post_url=pyperclip.paste()
        linked_in_time=each_post.find_element(by=By.XPATH, value=f'.//a[@class="app-aware-link  update-components-actor__sub-description-link"]').text.split(" ")[0]
        date_time_stamp=get_date(post_url)
        return {"post_url":post_url,
                "linkedin_time":linked_in_time,
                "time_stamp":date_time_stamp
                }
    except TimeoutException:
        print("error in loading post menu")
        return False
    except Exception as e:
        print(f"error in loading post \n{e}")
        return False

In [None]:
def load_profile_posts(driver, profile_link):
    """load posts page of a profile"""
    print(f"loading posts for \n{profile_link} ")
    driver.get(f"{profile_link}/recent-activity/all/")   
    try:
        WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.XPATH, './/div[@class="pv-recent-activity-detail__core-rail"]')))
        print("posts page loaded successfully")
        return True
    except TimeoutException:
        print("error in loading post page")
        return False



In [None]:
def scroll_n_get_posts(driver, len_prevs_results):
    """scroll and wait for the results to load or end"""
    continue_scrapping_flag=True
    secs_to_wait=60  #secs to wait to load more results on scrolling
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    sleep(2)
#     see if end results message is show

    try:
        WebDriverWait(driver, secs_to_wait).until(
            EC.invisibility_of_element_located((By.XPATH, './/div[@class="artdeco-loader artdeco-loader--small ember-view"]'))
            )
    except:
        continue_scrapping_flag=False
        print("Timed out waiting for the loading more results.\n\nEnding Scrolling")
        return continue_scrapping_flag
        

    for x in range(secs_to_wait):
        sleep(1)
        all_new_results=len(driver.find_elements(by=By.XPATH, value='.//li[@class="profile-creator-shared-feed-update__container"]'))
        if all_new_results>len_prevs_results:
            return continue_scrapping_flag 
    
    continue_scrapping_flag=False
    print("Ending Scrolling")
    return continue_scrapping_flag

In [None]:
def save_results(result_dicts, data_folder_path, output_file_name):
    """writes result dicts to output folder"""
    df = pd.DataFrame(result_dicts)
    df.to_csv(f'{data_folder_path}/{output_file_name.replace(" ","_").replace(",","_")}.csv', index=False)

In [None]:
def scrape_pofile_posts(driver,profile_link,all_profile_posts_dicts):
    """scrape all the posts for a profile and their relevent details"""
    load_profile_posts(driver, profile_link)
    print("scolling n getting posts url")
    profile_result_dicts=[]
    while True:
        continue_scolling_flag=scroll_n_get_posts(driver, len(profile_result_dicts))

        all_posts=driver.find_elements(by=By.XPATH, value='.//li[@class="profile-creator-shared-feed-update__container"]')
        for each_post in all_posts[len(profile_result_dicts):]:
            # skip first scrapped results
            if len(profile_result_dicts)==len(all_posts):
                break
            
            each_result_dict=get_each_post_details(driver,each_post)
            if each_result_dict != False:

                each_result_dict["profile_link"]=profile_link
                if date_difference_calculater(each_result_dict["time_stamp"]):
                    continue_scolling_flag=False
                    print("post older than 4 years, stopping scrapping")
                    
                    break
                else:
                    profile_result_dicts.append(each_result_dict)
                    all_profile_posts_dicts.append(each_result_dict)
                    save_results(all_profile_posts_dicts, data_folder_path, output_file_name)
        if not continue_scolling_flag:
            break
    return all_profile_posts_dicts

# main 


In [None]:
data_folder_path = f"data/extra/linkedin"
output_file_name="metadata_links"
if (not os.path.exists(data_folder_path)):
    os.makedirs(data_folder_path)
    print(f'new data folder {data_folder_path} created success')

In [49]:
import json
with open("config/creds.json",'r') as f:
    creds=json.load(f)

{'user_name': 'linkedinnlinkd@gmail.com', 'password': 'temp123++--'}


In [None]:
login_username=creds['user_name']
login_password=creds['password']
base_link="https://www.linkedin.com/login?fromSignIn=true&trk=guest_homepage-basic_nav-header-signin"


In [None]:
profile_links_list=[
                    "https://www.linkedin.com/in/hassan-qaiser-3686b0169/",
                    "https://www.linkedin.com/in/qualman/",
                    ]

In [None]:

if check_internet_connection():
    driver = webdriver.Chrome(options=opt)
    login(login_username, login_password, base_link, driver)
        

In [None]:
all_profile_posts_dicts=[]
for profile_link in profile_links_list:
    all_profile_posts_dicts=scrape_pofile_posts(driver,profile_link,all_profile_posts_dicts)

In [None]:
driver.close()
print("congrats! scrapping completed")