In [1]:
# Cell 1: Install required libraries
%pip install selenium webdriver-manager

Note: you may need to restart the kernel to use updated packages.


In [None]:
# Cell 2: Full scraping script with deduplication

import time
import csv
import urllib.parse
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

def clean_website(display_text, href):
    """
    Refine the website URL by using the display text if available 
    and removing UTM parameters.
    """
    text = display_text.strip() if display_text else ""
    website = text if text else href.strip()
    # Remove query parameters (like UTM)
    parsed = urllib.parse.urlparse(website)
    if parsed.scheme and parsed.netloc:
        website = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
    return website if website else "N/A"

def get_contact_details(driver, contact_card, max_attempts=2):
    """
    Try clicking the "Contact" button and extracting details.
    Returns (website, email, location). Retries if necessary.
    """
    website = "N/A"
    email = "N/A"
    location = "N/A"
    
    for attempt in range(max_attempts):
        try:
            contact_button = contact_card.find_element(By.XPATH, './/button[@aria-label="Contact"]')
            driver.execute_script("arguments[0].scrollIntoView(true);", contact_button)
            driver.execute_script("arguments[0].click();", contact_button)
            # Increase sleep time on retry
            time.sleep(2 if attempt else 1)
            
            wait = WebDriverWait(driver, 10)
            tooltip = wait.until(EC.visibility_of_element_located(
                (By.CSS_SELECTOR, 'div.tooltip_tooltip-inner___wDGV')
            ))
            
            li_elements = tooltip.find_elements(By.XPATH, './/li[contains(@class, "styles_item__9pNrw")]')
            # Reset values for this attempt
            website, email, location = "N/A", "N/A", "N/A"
            for li in li_elements:
                try:
                    a_tag = li.find_element(By.TAG_NAME, "a")
                    href = a_tag.get_attribute("href")
                    if href.startswith("mailto:"):
                        email = href.replace("mailto:", "").strip() or "N/A"
                    elif href.startswith("tel:"):
                        # Skip phone numbers.
                        continue
                    elif href.startswith("http"):
                        website = clean_website(a_tag.text, href)
                except Exception:
                    text_val = li.text.strip()
                    if text_val:
                        location = text_val
            if website != "N/A" or email != "N/A" or location != "N/A":
                break
        except Exception as e:
            print(f"Attempt {attempt+1} failed to extract contact details: {e}")
            time.sleep(2)
    return website, email, location

def scrape_current_page(driver):
    """
    Scrape all listings from the current page.
    Returns a list of dictionaries with keys: Business Name, Website, Location, Email.
    """
    page_data = []
    
    # Collect business name cards and contact cards separately.
    business_cards = driver.find_elements(By.CSS_SELECTOR, 
        "div.paper_paper__EGeEb.paper_outline__bqVmn.card_card__yyGgu.card_noPadding__OOiac.styles_wrapper__Jg8fe")
    contact_cards = driver.find_elements(By.CSS_SELECTOR, 
        "div.card_cardContent__4Js_A.styles_footerWrapper__fzSEA")
    
    total = min(len(business_cards), len(contact_cards))
    for i in range(total):
        # Extract business name.
        try:
            bn_elem = business_cards[i].find_element(By.CSS_SELECTOR, 
                        "p.typography_heading-xs__osRhC.typography_appearance-default__t8iAq")
            business_name = bn_elem.text.strip() or "N/A"
            if i == 0 and not business_name:
                time.sleep(2)
                business_name = bn_elem.text.strip() or "N/A"
        except Exception:
            business_name = "N/A"
        
        # Extract contact details from the corresponding contact card.
        website, email, location = get_contact_details(driver, contact_cards[i])
        
        page_data.append({
            "Business Name": business_name,
            "Website": website,
            "Location": location,
            "Email": email
        })
    return page_data

def scrape_category(driver, category, max_items=40):
    """
    Scrape listings from a category across multiple pages until max_items are collected.
    """
    data = []
    base_url = f"https://www.trustpilot.com/categories/{category}"
    driver.get(base_url)
    time.sleep(2)
    
    while len(data) < max_items:
        current_page_data = scrape_current_page(driver)
        data.extend([{"Category": category, **d} for d in current_page_data])
        print(f"Collected {len(data)} listings so far for {category}...")
        if len(data) >= max_items:
            break
        
        try:
            next_page_button = driver.find_element(By.CSS_SELECTOR, 'a[name="pagination-button-next"]')
            next_href = next_page_button.get_attribute("href")
            if next_href:
                next_url = urllib.parse.urljoin("https://www.trustpilot.com", next_href)
                driver.get(next_url)
                time.sleep(2)
            else:
                break
        except Exception as e:
            print(f"No next page found or error navigating: {e}")
            break
    
    return data[:max_items]

def deduplicate_records(records):
    """
    Deduplicate records based on a unique key created from all fields.
    """
    seen = set()
    deduped = []
    for rec in records:
        key = (
            rec["Category"].strip().lower(),
            rec["Business Name"].strip().lower(),
            rec["Website"].strip().lower(),
            rec["Email"].strip().lower(),
            rec["Location"].strip().lower()
        )
        if key not in seen:
            seen.add(key)
            deduped.append(rec)
    return deduped

def main():
    categories = ["beauty_wellbeing", "food_beverages_tobacco", "electronics_technology"]
    all_data = []
    
    options = Options()
    options.add_argument("--headless")
    options.add_argument("--disable-gpu")
    
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)
    
    # Loop through categories and scrape up to max_items for each.
    for cat in categories:
        print(f"Scraping category: {cat}")
        cat_data = scrape_category(driver, cat, max_items=100)
        all_data.extend(cat_data)
        time.sleep(1)
    
    driver.quit()
    
    # Deduplicate records.
    all_data = deduplicate_records(all_data)
    
    # Write the collected data to CSV.
    csv_file = "trustpilot_data.csv"
    fieldnames = ["Category", "Business Name", "Website", "Location", "Email"]
    try:
        with open(csv_file, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for row in all_data:
                writer.writerow(row)
        print(f"Scraping complete. {len(all_data)} deduplicated records saved to {csv_file}")
    except Exception as e:
        print("Error writing CSV:", e)

if __name__ == "__main__":
    main()
