In [2]:
import os
import requests
import zipfile
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import random
import json
from datetime import datetime
import getpass

In [3]:
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup
import time
import random
import json
import os
from datetime import datetime
import re
import getpass

In [10]:
class WasteManagementScraper:
    def __init__(self, username, password, output_dir="waste_management_data"):
        """Initialize the Waste Management LinkedIn scraper"""
        self.username = username
        self.password = password
        self.output_dir = output_dir
        self.driver = None
        self.wait_time_range = (50, 60)
        
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)

        # Download ChromeDriver and Chrome Headless Shell
        self.download_chromedriver()
        self.download_chrome_headless_shell()

    def download_file(self, url, dest):
        """Download a file from a URL to a specified destination."""
        response = requests.get(url)
        with open(dest, 'wb') as file:
            file.write(response.content)

    def download_chromedriver(self):
        """Download ChromeDriver."""
        chromedriver_url = "https://storage.googleapis.com/chrome-for-testing-public/131.0.6778.85/mac-x64/chromedriver-mac-x64.zip"
        dest = "chromedriver.zip"
        self.download_file(chromedriver_url, dest)
        
        # Unzip the downloaded file
        with zipfile.ZipFile(dest, 'r') as zip_ref:
            zip_ref.extractall(".")
        os.remove(dest)  # Remove the zip file after extraction
        os.chmod("chromedriver", 0o755)  # Make it executable

    def download_chrome_headless_shell(self):
        """Download Chrome Headless Shell."""
        headless_shell_url = "https://storage.googleapis.com/chrome-for-testing-public/131.0.6778.85/mac-x64/chrome-headless-shell-mac-x64.zip"
        dest = "chrome-headless-shell.zip"
        self.download_file(headless_shell_url, dest)
        
        # Unzip the downloaded file
        with zipfile.ZipFile(dest, 'r') as zip_ref:
            zip_ref.extractall(".")
        os.remove(dest)  # Remove the zip file after extraction
        os.chmod("chrome-headless-shell", 0o755)  # Make it executable

    def setup_driver(self):
        """Configure and return ChromeDriver with appropriate settings for local setup."""
        chrome_options = webdriver.ChromeOptions()
        
        # Set headless mode
        chrome_options.binary_location = os.path.abspath("chrome-headless-shell")  # Use the headless shell
        chrome_options.add_argument('--headless')  # Run in headless mode
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--disable-extensions')
        chrome_options.add_argument('--ignore-certificate-errors')
        chrome_options.add_argument('--ignore-ssl-errors')
        chrome_options.add_argument("--enable-javascript")
        chrome_options.add_argument("--window-size=1920,1080")
        
        # Update this path to the actual location of chromedriver
        chromedriver_path = "/Users/sammy/Desktop/Fitsol/Market_assignment/chromedriver-mac-x64/chromedriver"
        
        if not os.path.exists(chromedriver_path):
            raise FileNotFoundError(f"ChromeDriver not found at {chromedriver_path}")
        
        service = Service(chromedriver_path)
        
        # Initialize driver
        driver = webdriver.Chrome(service=service, options=chrome_options)
        
        # Set timeouts
        driver.set_page_load_timeout(60)
        driver.implicitly_wait(20)
        
        # Test the driver
        driver.get('https://www.google.com')
        print("Driver setup successful!")
        print(f"Using Chrome version: {driver.capabilities['browserVersion']}")
        
        return driver

    def verify_setup(self):
      """Verify that the Chrome setup is working"""
      try:
          print("Verifying Chrome setup...")
          print("Chrome binary location:", self.driver.capabilities['chrome']['chromedriverVersion'])
          print("Current URL:", self.driver.current_url)
          return True
      except Exception as e:
          print(f"Setup verification failed: {str(e)}")
          return False      

    def login(self):
        """Log into LinkedIn with 2FA handling"""
        try:
            print("Initiating login process...")
            self.driver = self.setup_driver()
            
            # Test if driver is working
            if not self.verify_setup():
              print("Chrome setup verification failed!")
              return False
        
            print("Chrome setup verified. Proceeding with login...")
            
            # Proceed with LinkedIn login
            print("Navigating to LinkedIn...")
            self.driver.get('https://www.linkedin.com/login')
            self.random_wait()
            
            # Enter username
            username_field = WebDriverWait(self.driver, 20).until(
                EC.presence_of_element_located((By.ID, "username"))
            )
            username_field.send_keys(self.username)
            self.random_wait(2, 4)
            
            # Enter password
            password_field = self.driver.find_element(By.ID, "password")
            password_field.send_keys(self.password)
            self.random_wait(2, 4)
            
            # Click sign in
            sign_in_button = self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']")
            sign_in_button.click()
            
            # Handle 2FA
            try:
                print("Waiting for 2FA prompt...")
                pin_field = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pin']"))
                )
                
                verification_code = input("\nPlease enter the 6-digit verification code sent to your email/phone: ")
                pin_field.send_keys(verification_code)
                self.random_wait(1, 2)
                
                verify_button = self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']")
                verify_button.click()
                
            except TimeoutException:
                print("No 2FA prompt found - checking if login was successful...")
            
            # Final check for successful login
            try:
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.ID, "global-nav"))
                )
                print("Successfully logged in!")
                self.random_wait()
                return True
                
            except TimeoutException:
                print("Login failed - could not verify successful login")
                # Save page source for debugging
                with open('login_page.html', 'w', encoding='utf-8') as f:
                    f.write(self.driver.page_source)
                return False
                
        except Exception as e:
            print(f"Login failed: {str(e)}")
            if self.driver:
                print("Current URL:", self.driver.current_url)
                with open('error_page.html', 'w', encoding='utf-8') as f:
                    f.write(self.driver.page_source)
            return False

    def random_wait(self, min_time=None, max_time=None):
        """Wait for a random amount of time"""
        if min_time is None:
            min_time = self.wait_time_range[0]
        if max_time is None:
            max_time = self.wait_time_range[1]
        time.sleep(random.uniform(min_time, max_time))

    def classify_waste_category(self, text):
        """Classify the waste management requirement into categories"""
        text = text.lower()
        categories = []
        
        for category, keywords in self.category_keywords.items():
            if any(keyword.lower() in text for keyword in keywords):
                categories.append(category)
        
        if not categories:
            # Check for hazardous waste subcategories
            hazardous_keywords = {
                'chemical': 'G1',
                'pesticide': 'G2',
                'asbestos': 'G3',
                'sludge': 'G4',
                'contaminated soil': 'G5'
            }
            
            for keyword, subcode in hazardous_keywords.items():
                if keyword in text:
                    categories.append(f"G-{subcode}")
            
            # Check for non-hazardous waste subcategories
            non_hazardous_keywords = {
                'food': 'H1',
                'paper': 'H2',
                'textile': 'H3',
                'glass': 'H4',
                'wood': 'H5',
                'rubber': 'H6'
            }
            
            for keyword, subcode in non_hazardous_keywords.items():
                if keyword in text:
                    categories.append(f"H-{subcode}")
        
        return categories if categories else ['Unclassified']

    def search_posts(self, search_term):
        """Search for posts using a specific term"""
        try:
            # Navigate to LinkedIn search
            search_url = f"https://www.linkedin.com/search/results/content/?keywords={search_term}&origin=GLOBAL_SEARCH_HEADER"
            self.driver.get(search_url)
            self.random_wait()
            
            # Scroll and collect posts
            posts_data = []
            last_height = self.driver.execute_script("return document.body.scrollHeight")
            
            while len(posts_data) < 100:  # Limit to 100 posts per search term
                # Scroll down
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                self.random_wait(3, 5)
                
                # Get posts
                posts = self.driver.find_elements(By.CSS_SELECTOR, ".feed-shared-update-v2")
                
                for post in posts:
                    try:
                        post_data = self.extract_post_data(post)
                        if post_data and self.is_relevant_post(post_data['content']):
                            posts_data.append(post_data)
                    except Exception as e:
                        print(f"Error extracting post data: {str(e)}")
                        continue
                
                # Check if scrolled to bottom
                new_height = self.driver.execute_script("return document.body.scrollHeight")
                if new_height == last_height:
                    break
                last_height = new_height
            
            return posts_data
            
        except Exception as e:
            print(f"Error searching posts: {str(e)}")
            return []

    def extract_post_data(self, post_element):
        """Extract required data from a post"""
        try:
            # Extract post content
            content = post_element.find_element(By.CSS_SELECTOR, ".feed-shared-update-v2__description").text
            
            # Extract poster information
            poster_info = post_element.find_element(By.CSS_SELECTOR, ".feed-shared-actor__title")
            poster_name = poster_info.text
            
            # Try to get designation and company
            try:
                designation_company = post_element.find_element(By.CSS_SELECTOR, ".feed-shared-actor__description").text
                designation, company = self.parse_designation_company(designation_company)
            except:
                designation, company = "", ""
            
            # Get post date
            post_date = post_element.find_element(By.CSS_SELECTOR, "time").get_attribute("datetime")
            
            # Classify waste category
            categories = self.classify_waste_category(content)
            
            return {
                'content': content,
                'poster_name': poster_name,
                'designation': designation,
                'company': company,
                'post_date': post_date,
                'categories': categories
            }
            
        except Exception as e:
            print(f"Error extracting post data: {str(e)}")
            return None

    def parse_designation_company(self, text):
        """Parse designation and company from LinkedIn description"""
        parts = text.split(' at ')
        if len(parts) > 1:
            return parts[0].strip(), parts[1].strip()
        return text.strip(), ""

    def is_relevant_post(self, content):
        """Check if the post is relevant to waste management requirements"""
        relevant_keywords = [
            'requirement', 'needed', 'looking for', 'seeking',
            'waste', 'disposal', 'recycling', 'management'
        ]
        content_lower = content.lower()
        return any(keyword in content_lower for keyword in relevant_keywords)

    def save_data(self, data, filename=None):
        """Save scraped data to CSV and JSON"""
        if filename is None:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f'waste_management_data_{timestamp}'
        
        # Save as JSON
        json_path = os.path.join(self.output_dir, f'{filename}.json')
        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        # Save as CSV
        df = pd.DataFrame(data)
        csv_path = os.path.join(self.output_dir, f'{filename}.csv')
        df.to_csv(csv_path, index=False, encoding='utf-8')
        
        print(f"Data saved to {json_path} and {csv_path}")

    def scrape_waste_management_requirements(self):
        """Main function to scrape waste management requirements"""
        try:
            if not self.login():
                return
            
            all_posts = []
            
            for search_term in self.search_terms:
                print(f"Searching for: {search_term}")
                posts = self.search_posts(search_term)
                all_posts.extend(posts)
                self.random_wait()
            
            # Remove duplicates based on content
            unique_posts = {post['content']: post for post in all_posts}.values()
            
            # Save data
            self.save_data(list(unique_posts))
            
            return list(unique_posts)
            
        finally:
            if self.driver:
                self.driver.quit()

In [11]:
if __name__ == "__main__":
    try:
        # Get credentials securely
        username = "21f3001088@ds.study.iitm.ac.in"
        password = "Ritesh@200212"
        
        # Initialize and run scraper
        scraper = WasteManagementScraper(username, password)
        data = scraper.scrape_waste_management_requirements()
        
    except Exception as e:
        print(f"Execution failed: {str(e)}")

Execution failed: [Errno 2] No such file or directory: 'chromedriver'


In [None]:
# 21f3001088@ds.study.iitm.ac.in
# Ritesh@200212