In [None]:
pip install webdriver-manager

In [None]:
pip install --upgrade selenium

In [None]:
pip install fake_useragent

In [None]:
from bs4 import BeautifulSoup
import requests
import csv
import re
import time
import random
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from fake_useragent import UserAgent

# Collect Data

#### VPN or proxy rotator recommended

In [None]:
logging.basicConfig(level=logging.INFO) # logger used for debugging
logger = logging.getLogger(__name__)

ua = UserAgent() # generate user agents

bad_uas = [] # list to hold unreliable user agents

In [None]:
num_pages = # number of pages in search results
product_links = set() # a set is used to avoid duplicate links

In [None]:
product_data = [] # list to hold data scraped from individual product pages
faulty_links = [] # list to hold product pages that can't be reached within the number of tries specified by max_retries

In [None]:
def scrape_product_list(i, max_retries):
    for attempt in range(max_retries):
        chrome_options = Options()
        chrome_options.add_argument("--headless") # headless browser
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        
        user_agent = ua.random # pick random user agent
        while user_agent in bad_uas: # ensure user agent isn't a bad one
            user_agent = ua.random
        chrome_options.add_argument(f'user-agent={user_agent}')
        
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

        try:
            logger.info(f"Attempting page {i} with user agent {user_agent} (attempt {attempt+1})")
            driver.get(f'link to product list, use an f string to help denote how the url changes as you go to a new page')
            WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, 's-pagination-strip'))) # dynamic delay while loading page

            soup = BeautifulSoup(driver.page_source, 'lxml')

            product_list = soup.find_all(attrs={"data-asin": re.compile("B0.{8}")})
            logger.info(f"Found {len(product_list)} products on page {i}")

            for product in product_list:
                product_link = product.find('a', href=re.compile(".+B0.+"))
                if product_link is not None and 'amazon.com' not in product_link['href']:
                    product_links.add('https://www.amazon.com' + product_link['href'])
            
            time.sleep(random.uniform(1.0, 5.0)) # random delay after successful scrape and before proceeding to next page
            
            break # exit loop if successful
        
        except Exception as e:
            logger.error(f"Failed with user agent {user_agent}: {e}")
            bad_uas.append(user_agent)
            if attempt == max_retries - 1:
                logger.error(f"Max retries reached for page {i}, moving on.")
            time.sleep(random.uniform(1.0, 5.0)) # random delay after failed user-agent and before retry
        
        finally:
            driver.quit()

In [None]:
def scrape_product_page(link, max_retries, iteration):
    for attempt in range(max_retries):
        chrome_options = Options()
        chrome_options.add_argument("--headless") # headless browser
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")

        user_agent = ua.random # pick random user agent
        while user_agent in bad_uas: # ensure user agent isn't a bad one
            user_agent = ua.random
        chrome_options.add_argument(f'user-agent={user_agent}')

        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

        try:
            logger.info(f"Attempting product #{iteration}, {link}, with user agent {user_agent} (attempt {attempt+1})")
            driver.get(link)
            WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'prodDetails'))) # dynamic delay while loading page
            soup = BeautifulSoup(driver.page_source, 'lxml')
            
            try:
                name = soup.find('span', class_='a-size-large product-title-word-break').text.strip()
            except Exception as e:
                logger.error(f"Failed to find name of {link}: {e}")
                name = None

            try:
                rating = soup.find('span', class_='a-icon-alt').text.strip()
            except Exception as e:
                logger.error(f"Failed to find rating of {link}: {e}")
                rating = None
                
            try:
                num_ratings = soup.find('span', id='acrCustomerReviewText').text.strip()
            except Exception as e:
                logger.error(f"Failed to find the number of ratings of {link}: {e}")
                num_ratings = None
            
            try:
                monthly_purchases = soup.find('span', id='social-proofing-faceout-title-tk_bought').text.strip()
            except Exception as e:
                logger.error(f"Failed to find the number of monthly purchases of {link}: {e}")
                monthly_purchases = None
            
            try:
                price = soup.find('span', class_='a-price aok-align-center reinventPricePriceToPayMargin priceToPay').text.strip()
            except:
                try:
                    price = soup.find('div', id='corePrice_feature_div').text.strip()
                except:
                    try:
                        price = soup.find('span', class_='a-price a-text-price a-size-medium apexPriceToPay').text.strip()
                    except Exception as e:
                        logger.error(f"Failed to find price of {link}: {e}")
                        price = None
            
            try:
                product_info = soup.find('div', id='productDetails_feature_div').text.strip()
            except:
                try:
                    product_info = soup.find('div', id='detailBulletsWithExceptions_feature_div').text.strip()
                except:
                    try: 
                        product_info = soup.find('div', id='prodDetails').text.strip()
                    except Exception as e:
                        logger.error(f"Failed to find product information of {link}: {e}")
                        product_info = None

            if product_info != None:
                index = product_info.find("Warranty & Support") # remove unnecessary warranty & support section
                if index != -1:
                    product_info = product_info[:index].strip()
                index1 = product_info.find("Feedback") # remove unneccessary feedback section
                if index1 != -1:
                    product_info = product_info[:index1].strip()
            
            product = {
                'Name': name,
                'Rating': rating,
                'Number of Ratings': num_ratings,
                'Monthly Purchases': monthly_purchases,
                'Price': price,
                'Product Information': product_info
            }
            
            for key, value in product.items():
                if value == '':
                    product[key] = None

            product_data.append(product)
            time.sleep(random.uniform(1.0, 5.0)) # random delay after successful scrape and before proceeding to next link
            
            break # exit loop if sucessful
            
        except Exception as e:
            logger.error(f"Failed with user agent {user_agent}: {e}")
            bad_uas.append(user_agent)
            if attempt == max_retries - 1:
                logger.error(f"Max retries reached for {link}, moving on.")
                faulty_links.append(link)
            time.sleep(random.uniform(1.0, 5.0)) # random delay after failed user-agent and before retry
        
        finally:
            driver.quit()

In [None]:
# scrape product lists

try:
    for i in range(1, num_pages+1):
        scrape_product_list(i, 5)
        
finally:
    logger.info(f"Total product links found: {len(product_links)}")

In [None]:
product_links = list(product_links) # convert product_links to a list to make it subscriptable

In [None]:
# scrape product pages

try:
    iterations = 1 # track number of products scraped to make debugging easier
    for link in product_links:
        if iterations % 40 == 0: # reset bad_uas occasionally to avoid running out of reliable user agents, doesn't necessarily have to be every 40 iterations
            bad_uas = []
        scrape_product_page(link, 3, iterations)
        iterations += 1
                        
finally:
    logger.info(f"Info of {len(product_data)} products found")
    logger.info(product_data[:5])
    logger.info(f"Failed to scrape {len(faulty_links)} links")
    logger.info(faulty_links)

In [None]:
# review product data of one product

for key, value in product_data[0].items(): 
    print(f"{key}: {value}")

In [None]:
# convert product_data to pandas dataframe

product_data_df = pd.DataFrame(product_data)
product_data_df.head()

In [None]:
product_data_df.isna().sum() # check how many rows of each category are missing

In [None]:
product_data_df.to_csv('name of csv', index=False)