In [2]:
import time
import os
import random
import pandas as pd
from dotenv import load_dotenv
load_dotenv()  
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException,WebDriverException, UnexpectedAlertPresentException

In [3]:
def setup_driver():
    options = Options()
    
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("start-maximized")
    options.add_argument("disable-infobars")
    options.add_argument("--disable-extensions")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--disable-notifications")
    options.add_argument("--page-load-strategy=normal")  # Ensure the page fully loads
    user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    options.add_argument(f"user-agent={user_agent}")

    driver_path = os.getenv("DRIVER_PATH")
    service = Service(driver_path)
    driver = webdriver.Chrome(service= service, options=options)
    return driver

In [5]:
def extract_reviews(driver, review_url, retries=2):
    review_list = []
    for attempt in range(retries):
        try:
            driver.get(review_url)
            WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'div[data-hook="review"]')))
            reviews = driver.find_elements(By.CSS_SELECTOR, 'div[data-hook="review"]')
            for item in reviews:
                try:
                    review_body = item.find_element(By.CSS_SELECTOR, 'span[data-hook="review-body"]').text.strip()
                    review_list.append(review_body)
                except NoSuchElementException:
                    continue
            break  # If reviews are found, exit the retry loop
        except (TimeoutException, WebDriverException, UnexpectedAlertPresentException) as e:
            print(f"Error while trying to load {review_url}: {e}")
            if attempt == retries - 1:
                continue
            time.sleep(random.uniform(2, 3))  # Random delay before retrying
    
    return review_list

In [9]:
base_url = os.getenv("AMAZON_REVIEW_URL")
url_suffix = os.getenv("REVIEW_URL_SUFFIX")
def amazon_product_page_url(asin, page_number, star_rating):
    return f"{base_url}{asin}{url_suffix}{page_number}?ie=UTF8&pageNumber={page_number}&formatType=current_format&filterByStar={star_rating}_star#reviews-filter-bar"

In [10]:
def is_asin_not_found(driver):
    try:
        not_found_element = driver.find_element(By.XPATH, "//*[contains(text(), 'Looking for something?')]")
        return not_found_element is not None
    except NoSuchElementException:
        return False

In [None]:
# Read ASINs from the Excel sheet
file_name = os.getenv("FILE_PATH")
sheet_name = 'Sheet1'
df = pd.read_excel(file_name, sheet_name=sheet_name)

df.columns = ['ASIN', 'status'] + [f'Review_{i}' for i in range(1, len(df.columns) - 1)]

asins = df.iloc[0:, 0].tolist()  # Start from the first row (A2 corresponds to index 0 in DataFrame)

driver = setup_driver()

# Prepare to append reviews to the DataFrame
for index, asin in enumerate(asins):
    # Skip ASINs that already have "done" or "ASIN not found" in the status column
    if df.iat[index, 1] in ['done', 'ASIN not found', 'No reviews found']:
        continue

    star_ratings = ["five", "four", "three", "two", "one"]
    all_reviews = []
    alternate_url_reviews_found = False
    rating_reviews_dict = {rating: [] for rating in star_ratings}

    # Check if ASIN is found or not
    alternate_url = amazon_product_page_url(asin, "five", 1)
    driver.get(alternate_url)
    if is_asin_not_found(driver):
        print(f"ASIN {asin} not found.")
        df.iat[index, 1] = 'ASIN not found'
        continue

    # Check the first page for all star ratings using the alternate URL
    for star_rating in star_ratings:
        review_url = amazon_product_page_url(asin, star_rating, 1)
        print(f"Checking first page for {star_rating}-star rating using alternate URL for ASIN {asin}...")
        reviews = extract_reviews(driver, review_url)
        if reviews:
            alternate_url_reviews_found = True
            rating_reviews_dict[star_rating].extend(reviews)
            print(f"Found reviews for {star_rating}-star rating on the first page using alternate URL for ASIN {asin}.")
        else:
            print(f"No reviews found on the first page for {star_rating}-star rating using alternate URL.")

    if not alternate_url_reviews_found:
        print(f"No reviews found for ASIN {asin} using alternate URL.")
        df.iat[index, 1] = 'No reviews found'
        continue

    # Fetch all reviews using the alternate URL
    for star_rating in star_ratings:
        for page_num in range(2, 11):  # Start from page 2 since page 1 was already checked
            review_url = amazon_product_page_url(asin, star_rating, page_num)
            print(f"Fetching page {page_num} for {star_rating}-star rating using alternate URL for ASIN {asin}...")
            reviews = extract_reviews(driver, review_url)
            if reviews:
                rating_reviews_dict[star_rating].extend(reviews)
                print(f"Found more reviews on page {page_num} for {star_rating}-star rating using alternate URL.")
            else:
                print(f"No more reviews found on page {page_num} for {star_rating}-star rating using alternate URL. Stopping here.")
                break

    # Merge all reviews into a single list
    for rating in star_ratings:
        all_reviews.extend(rating_reviews_dict[rating])

    # Ensure DataFrame has enough columns
    num_existing_columns = len(df.columns)
    num_needed_columns = len(all_reviews) + 2  # Account for ASIN and "status" columns
    if num_needed_columns > num_existing_columns:
        additional_columns = num_needed_columns - num_existing_columns
        new_columns = [f'Review_{i}' for i in range(num_existing_columns - 2, num_existing_columns - 2 + additional_columns)]
        df = pd.concat([df, pd.DataFrame(columns=new_columns)], axis=1)

    # Insert the reviews starting from the third column
    for col_num, review in enumerate(all_reviews, start=2):
        df.iat[index, col_num] = review
    
    # Mark the ASIN as "done" in the status column
    df.iat[index, 1] = 'done'
    
    # Save the updated DataFrame back to the Excel file
    df.to_excel(file_name, sheet_name=sheet_name, index=False)
    
    print(f"Reviews for ASIN {asin} have been fetched and saved to the Excel sheet.")

driver.quit()
print("All reviews have been successfully saved to the Excel sheet.")
