In [1]:
import requests
from dotenv import load_dotenv
import os
import pandas as pd
load_dotenv()

import sqlite3

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from bs4 import BeautifulSoup
from requests.exceptions import Timeout, RequestException
import time
import json
import re
import random
import threading

In [3]:
fields = ["Registered_Address", "CEO", "Establishment_Year", "Number_Of_Employees", "Revenue_Size" ,
        "Website", "NAICS_Code", "SIC_Code", "Status"]

Class for selenium webscraping

In [2]:
class SeleniumExtractionError(Exception):
    """Custom exception for Selenium extraction errors."""
    pass

class WebScraper():
    def __init__(self) -> None:
        # Set up Chrome driver with webdriver manager
        self.options = webdriver.ChromeOptions()
        self.options.add_argument('--headless')  # Run headless for no browser window
        self.options.add_argument('--disable-gpu')  # Disable GPU acceleration
        # self.options.add_argument('--no-sandbox')  # Required for some Linux environments
        self.options.add_argument('--disable-extensions')
        self.options.add_argument('--disable-plugins')
        self.options.add_argument('--disable-images')  # Prevent loading images to save bandwidth
        self.options.add_argument('--disable-browser-side-navigation')
        self.options.add_argument('--mute-audio') 
        self.options.page_load_strategy = 'eager'  

        # Automatically download and use ChromeDriver
        self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=self.options)
        self.driver.set_page_load_timeout(4)
        
        # Paremeters for requests
        self.requests_headers =  {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        }
        self.requests_timeout = 4

        self.n_webpages_to_scrape = 5
        self.webpages = {}


    def get_top_webpages(self,web_search_results: dict) -> list:
        self.webpages = {}
        print("Getting urls of top webpages")
    
        # get dict of site names and urls
        if 'webPages' not in web_search_results.keys():
            print("No webpages found")
            return self.webpages
        else:
            for result in web_search_results['webPages']['value']:
                # print("Result is ", result)
                # print(result["siteName"])
                if "siteName" in result.keys():
                    self.webpages[result["siteName"]] = result["url"]
                elif "name" in result.keys():
                    self.webpages[result["name"]] = result["url"]
                else:
                    pass
                if len(self.webpages) >= self.n_webpages_to_scrape:
                    break
                
            # print("debug: length of webpages is ", len(self.webpages))
            return self.webpages
        
    def extract_text_with_selenium(self,url):
        try:
            # Open the URL in the browser
            print("Selenium DEBUG: getting url")
            # self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=self.options)
            # self.driver.set_page_load_timeout(5)
            self.driver.get(url)
            time.sleep(0.1)

            # Wait for the page body to be present (max 5 seconds)
            WebDriverWait(self.driver, 5).until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
            print("Selenium DEBUG: found body")
            # Get page source and parse it after it has fully loaded
            page_source = self.driver.page_source
            print("Selenium DEBUG: found page source")
            soup = BeautifulSoup(page_source, 'html.parser')
            text = soup.get_text(separator='\n')

            # self.safe_quit_selenium()
            return text
        
        except TimeoutException:
            error_message = "Page load exceeded time limit of 5 seconds"
            print(f"An error occurred with Selenium:: {error_message}")
            # self.safe_quit_selenium()
            return "Page contents not loaded in time"

        except Exception as e:
            error_message = str(e)
            print(f"An error occurred with Selenium: {error_message}")
            self.safe_quit_selenium()
            if "disconnected: not connected to DevTools" in error_message:
                print(f"Error occurred: {error_message}")
                raise SeleniumExtractionError(f"DevTools disconnection error for URL: {url}")
            else:
                raise SeleniumExtractionError(f"An unexpected error occurred while extracting: {error_message}")
    
    def safe_quit_selenium(self):
        try:
            self.driver.quit()
        except Exception as e:
            print(f"Error during driver quit: {e}")
        finally:
            self.kill_chrome_processes()

    def kill_chrome_processes(self):
        import psutil
        PROCNAME = "chromedriver" # or "chrome" depending on your setup
        for proc in psutil.process_iter():
            # check whether the process name matches
            if proc.name() == PROCNAME:
                proc.kill()
            
        
    def extract_text_with_requests(self,url):
        try:
            # Fetch the content from the URL with headers
            print("Requests DEBUG: getting url")
            response = requests.get(url, headers=self.requests_headers, timeout=self.requests_timeout)
            response.raise_for_status()  # Check if the request was successful
            print("Requests DEBUG: parsing text")
            # Parse text
            soup = BeautifulSoup(response.text, 'html.parser')
            text = soup.get_text(separator='\n')  # Using '\n' to preserve some structure
            return text

        except Timeout:
            print(f"Request timed out after 5 seconds for URL: {url}")
            return None
        except RequestException as e:
            print(f"Error fetching the URL {url}: {e}")
            return None
        except Exception as e:
            print(f"Unexpected error occurred while processing {url}: {e}")
            return None
        
    def scrape_webpage(self, url: str) -> str:
        # use both methods and return the one that works
        # print("Extracting webpage with selenium")
        text_1 = self.extract_text_with_selenium(url)
        # print("Extracting webpage with requests")
        text_2 = self.extract_text_with_requests(url)

        if text_1 is None and text_2 is None:
            return None
        elif text_1 is None:
            return text_2
        elif text_2 is None:
            return text_1
        elif len(text_1) > len(text_2):
            return text_1
        else:
            return text_2
        
    def clean_text(text):
        if text is None:
            return None
        # Replace multiple newlines with a single newline
        text = re.sub(r'\n+', '\n', text)
        # Remove leading/trailing whitespace from each line
        text = '\n'.join(line.strip() for line in text.splitlines())
        # Remove extra spaces between words
        text = re.sub(r'\s+', ' ', text)
        return text

Connect to sqlite tables

In [None]:
conn_websearch = sqlite3.connect("firms_web_search_results.db")
conn_websites = sqlite3.connect("firms_web_search_results.db.db")
cursor_websearch = conn_websearch.cursor()
cursor_websites = conn_websites.cursor()

cursor_websites.execute('''
CREATE TABLE IF NOT EXISTS firm_web_search_website_scrapings (
               id INTEGER PRIMARY KEY AUTOINCREMENT,
               Firm_Name TEXT NOT NULL,
               Registered_Address TEXT,
               CEO TEXT,
               Establishment_Year TEXT,
               Number_Of_Employees TEXT,
               Revenue_Size TEXT,
               Website TEXT,
               NAICS_Code TEXT,
               SIC_Code TEXT,
               Status TEXT
               )
               ''')

## Loop to construct database

In [None]:
site_scraper = WebScraper()
start_time = time.time()

# Get all firms from the web search results database
conn_websearch.execute(''' SELECT id, Firm_Name FROM firms_web_search_results ''')
firm_web_search_results = conn_websearch.fetchall()

for web_search_result in firm_web_search_results:
    firm_id = web_search_result[0]
    firm_name = web_search_result[1]

    # Check if firm already exists in the target database, insert row if it doesnt
    cursor_websites.execute("SELECT id FROM firm_web_search_website_scrapings WHERE id = ? AND Firm_Name = ?", (firm_id, firm_name,))
    firm_row = cursor_websites.fetchone()
    if firm_row is None:
        print("Inserting new firm:, ", firm_name)
        cursor_websites.execute("INSERT INTO firm_web_search_website_scrapings (Firm_Name) VALUES (?)", (firm_name,))
        firm_id = cursor_websites.lastrowid  
    else:
        # Get the existing firm's id
        print("Found row for firm, ", firm_name)
        firm_id = firm_row[0]

    # Now iterate through each field's search results for the given firm

    for field in fields:

        # Check if the field value in the target database is NULL
        cursor_websites.execute(f"SELECT {field} FROM firm_properties WHERE id = ? AND Firm_Name = ? AND {field} IS NOT NULL", (firm_id,firm_name,))
        if cursor_websites.fetchone() is not None:
            print(f"Field '{field}' already has data for firm '{firm_name}', skipping.")
            continue
      
        # Target database has no value, so fill in
        # Get the websearch results
        conn_websearch.execute(f"SELECT {field} FROM firms_web_search_results WHERE id = ? AND Firm_Name = ?", (firm_id, firm_name,))
        web_search_result = conn_websearch.fetchone()
        if web_search_result is not None:
            web_search_result = json.loads(web_search_result[0]) # TODO check indexing here
        else:
            web_search_result = "No web search data available"
            continue

        # Get the website URLs from the web search results

        try:
            webpages = site_scraper.get_top_webpages(web_search_result)
            website_info = {}

            for website_name, website_url in webpages.items():
                print(f"Getting Contents of the website of {website_name} with url {website_url}, t = {round(time.time() - start_time,2)}")
                result = site_scraper.scrape_webpage(website_url)
                website_info[website_name] = result

            website_info = json.dumps(website_info)

            # Update cell value in database
            cursor_websites.execute(f"""
                        UPDATE firms_web_search_results
                        SET {field} = ?
                        WHERE id = ? AND Firm_Name = ?
                        """, (website_info, firm_id, firm_name))
            
            conn_websites.commit()

        except SeleniumExtractionError as e:
            print(f"Error extracting data for {firm_name} and {field}: {e}")
            continue


cursor_websearch.close()
conn_websearch.close()
cursor_websites.close()
conn_websites.close()