In [60]:
pip install selenium webdriver-manager fake_useragent 

Note: you may need to restart the kernel to use updated packages.


In [88]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from fake_useragent import UserAgent
import random
import time
import requests
from bs4 import BeautifulSoup

In [89]:
def get_headers():
    ua = UserAgent()
    return {"User-Agent": ua.random}

# ✅ Use Proxies (Optional - Add your own proxies here)
PROXIES = [
    "http://138.128.91.65:8000",
    "http://192.252.208.67:14282",
    "http://185.199.229.156:7492"
]

def get_random_proxy():
    return random.choice(PROXIES)

In [90]:
from urllib.parse import urlparse, urljoin
yelp_url = "https://www.yelp.com/biz/mix-kitchen-and-bar-ithaca-11?osq=Reservations"
def extract_business_slug(yelp_url):
    """Extracts the business slug from a Yelp URL"""
    parsed_url = urlparse(yelp_url)
    path_parts = parsed_url.path.split("/")
    
    if len(path_parts) > 2 and path_parts[1] == "biz":
        business_slug = path_parts[2]  
        clean_url = urljoin("https://www.yelp.com", f"/biz/{business_slug}")  
    return clean_url

clean_yelp_url = extract_business_slug(yelp_url)
clean_yelp_url

'https://www.yelp.com/biz/mix-kitchen-and-bar-ithaca-11'

In [91]:
def fetch_page(clean_yelp_url, max_retries=3):
    session = requests.Session()
    
    for attempt in range(max_retries):
        try:
            headers = get_headers()
            proxy = {"http": get_random_proxy(), "https": get_random_proxy()}  # Rotate proxies
            response = session.get(clean_yelp_url, headers=headers, proxies=proxy, timeout=10)

            if response.status_code == 200:
                return response
            
            elif response.status_code in [403, 429]:
                print(f"Blocked! Changing User-Agent & Proxy... ({attempt+1}/{max_retries})")
                time.sleep(random.uniform(5, 10))  # Sleep before retrying

        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")

    print("Max retries reached. Exiting...")
    return None

In [92]:
import random
def get_driver():
    options = Options()
    options.add_argument("--headless")  
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")       
    options.add_argument("--window-size=1920,1080")
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36")  # Fake user-agent
    time.sleep(random.uniform(10, 20))  # Increased wait time
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    return driver

In [93]:
def click_next_page(driver):
    try:
        next_button = driver.find_element(By.XPATH, "//a[contains(@class, 'next')]")
        driver.execute_script("arguments[0].click();", next_button)
        time.sleep(random.uniform(5, 10))  # <---- Add delay
        return True
    except:
        return False

In [94]:
def scroll_page(driver):
    last_height = driver.execute_script("return document.body.scrollHeight")
    
    for _ in range(3):  
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.uniform(5, 10))  
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height:
            break  
        last_height = new_height

In [95]:
def scrape_yelp_reviews(clean_yelp_url):
    time.sleep(random.uniform(7, 14))
    driver = get_driver()  # Initialize WebDriver
    driver.get(clean_yelp_url)
    
    all_reviews = []
    page = 1

    while True:
        time.sleep(random.uniform(4, 15))  # Random delay to avoid detection

        review_sections = driver.find_elements(By.XPATH, "//ul[@class= 'list__09f24__ynIEd']//li")
        print(f"Found {len(review_sections)} reviews on page {page}.")

        if not review_sections:
            print("No reviews found on this page.")
            break 
        individual_reviews = review_sections.find_elements(By.XPATH, "//ul[@class= 'list__09f24__ynIEd']//li")
        
        page_reviews = []
        for review in individual_reviews:
            try:
                reviewer_name = review.find_element(By.XPATH, ".//a[contains(@class, 'css-1lx1e1r')]").text
                reviewer_location = review.find_element(By.XPATH, ".//span[contains(@class, 'css-qgunke')]").text
                
                rating_element = review.find_elements(By.XPATH, ".//div[contains(@class, 'y-css-dnttlc')]")
                if rating_element:
                    rating = rating_element[0].get_attribute("aria-label")  
                else:
                    continue

                review_text = review.find_element(By.XPATH, ".//p[contains(@class, 'raw__09f24__T4Ezm')]").text
                review_date = review.find_element(By.XPATH, ".//span[contains(@class, 'y-css-1d8mpv1')]").text

                helpful = review.find_element(By.XPATH, ".//span[contains(@class, 'y-css-ghxju8') and text()='Helpful']/following-sibling::span[contains(@class, 'y-css-7nL72w')]").text
                thanks = review.find_element(By.XPATH, ".//span[contains(@class, 'y-css-ghxju8') and text()='Thanks']/following-sibling::span[contains(@class, 'y-css-7nL72w')]").text
                love = review.find_element(By.XPATH, ".//span[contains(@class, 'y-css-ghxju8') and text()='Love this']/following-sibling::span[contains(@class, 'y-css-7nL72w')]").text
                oh_no = review.find_element(By.XPATH, ".//span[contains(@class, 'y-css-ghxju8') and text()='Oh no']/following-sibling::span[contains(@class, 'y-css-7nL72w')]").text

                review_data = {
                    "Reviewer Name": reviewer_name,
                    "Location": reviewer_location,
                    "Star Rating": rating,
                    "Review Text": review_text,
                    "Date": review_date,
                    "Helpful": helpful,
                    "Thanks": thanks,
                    "Love": love,
                    "Oh No": oh_no
                }
                page_reviews.append(review_data)
            except Exception as e:
                print(f"Error extracting review: {e}") 
        
        all_reviews.extend(page_reviews)  
        print(f"Stored {len(page_reviews)} reviews from Page {page}.")
        
        if page > 2:
            print(f"Reached page limit {page}")
            break

        if not click_next_page(driver):
            print("No more pages to scrape reviews from")
            break
        
        page += 1

    driver.quit()
    return all_reviews

df_reviews = scrape_yelp_reviews(clean_yelp_url)
print(df_reviews)

Found 0 reviews on page 1.
No reviews found on this page.
[]


In [96]:
import asyncio
import random
from playwright.async_api import async_playwright

async def get_driver():
    """Set up Playwright browser with stealth settings to avoid detection."""
    playwright = await async_playwright().start()
    browser = await playwright.chromium.launch(headless=True)  # Run in headless mode for stealth
    context = await browser.new_context(
        user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        viewport={"width": 1920, "height": 1080}
    )
    page = await context.new_page()
    return playwright, browser, page

async def scrape_yelp_reviews(clean_yelp_url):
    """Scrapes Yelp reviews using Playwright while handling CAPTCHA & pagination."""
    playwright, browser, page = await get_driver()
    await page.goto(clean_yelp_url, timeout=60000)
    await page.wait_for_load_state("networkidle")

    page_content = await page.content()
    print(page_content[:1000])  # Print first 1000 characters of the page

    all_reviews = []
    page_number = 1

    while True:
        await asyncio.sleep(random.uniform(2, 4))

        review_sections = await page.query_selector_all("//section[contains(@aria-label, 'Recommended Reviews')]//ul[contains(@class, 'list__09f24__ynIEd')]/li")
        
        if not review_sections:
            print("No reviews found on this page.")
            break

        print(f" Found {len(review_sections)} reviews on Page {page_number}.")
        page_reviews = []

        for review in review_sections:
            try:
                reviewer_name = await review.query_selector_eval(".//h4[contains(@class, 'css-1l1q8er')]", "el => el.textContent") or "N/A"
                reviewer_location = await review.query_selector_eval(".//span[contains(@class, 'css-qgunke')]", "el => el.textContent") or "N/A"
                rating_element = await review.query_selector(".//div[contains(@class, 'y-css-dnttlc')]")
                rating = await rating_element.get_attribute("aria-label") if rating_element else "N/A"
                review_text = await review.query_selector_eval(".//p[contains(@class, 'raw__09f24__T4Ezm')]", "el => el.textContent") or "N/A"
                review_date = await review.query_selector_eval(".//span[contains(@class, 'y-css-1d8mpv1')]", "el => el.textContent") or "N/A"

                helpful = await review.query_selector_eval(".//span[contains(@class, 'y-css-ghxju8') and text()='Helpful']/following-sibling::span", "el => el.textContent") or "0"
                thanks = await review.query_selector_eval(".//span[contains(@class, 'y-css-ghxju8') and text()='Thanks']/following-sibling::span", "el => el.textContent") or "0"
                love = await review.query_selector_eval(".//span[contains(@class, 'y-css-ghxju8') and text()='Love this']/following-sibling::span", "el => el.textContent") or "0"
                oh_no = await review.query_selector_eval(".//span[contains(@class, 'y-css-ghxju8') and text()='Oh no']/following-sibling::span", "el => el.textContent") or "0"

                review_data = {
                    "Reviewer Name": reviewer_name.strip(),
                    "Location": reviewer_location.strip(),
                    "Star Rating": rating.strip(),
                    "Review Text": review_text.strip(),
                    "Review Date": review_date.strip(),
                    "Helpful": helpful.strip(),
                    "Thanks": thanks.strip(),
                    "Love": love.strip(),
                    "Oh No": oh_no.strip(),
                }
                page_reviews.append(review_data)

            except Exception as e:
                print(f"⚠️ Error extracting review: {e}")

        all_reviews.extend(page_reviews)
        print(f"Stored {len(page_reviews)} reviews from Page {page_number}.")

        if page_number >= 2:
            print("Reached page limit, stopping.")
            break

        success = await click_next_page(page)
        if not success:
            print("No more pages to scrape.")
            break

        page_number += 1

    await browser.close()
    return all_reviews

async def click_next_page(page, max_retries=5):
    """Handles 'Next' button clicks with exponential backoff for 503 errors."""
    retries = 0
    wait_time = 2

    while retries < max_retries:
        try:
            await asyncio.sleep(random.uniform(2, 3))
            next_button = await page.query_selector("//a[contains(@class, 'next-link')]")
            if next_button:
                await next_button.scroll_into_view_if_needed()
                await asyncio.sleep(random.uniform(1, 2))
                await next_button.click()
                print("Clicked 'Next' button, loading next page...")
                await asyncio.sleep(random.uniform(4, 6))
                return True
        except Exception as e:
            print(f"No more pages or 'Next' button not found: {e}")
            return False

        retries += 1
        print(f"503 Error! Retrying in {wait_time} seconds... (Attempt {retries}/{max_retries})")
        await asyncio.sleep(wait_time)
        wait_time *= 2

    print("Max retries reached. Skipping page.")
    return False

# Run the scraper
clean_yelp_url = "https://www.yelp.com/biz/sik-kitchen-and-bar-ithaca-3"
asyncio.run(scrape_yelp_reviews(clean_yelp_url))

RuntimeError: asyncio.run() cannot be called from a running event loop