In [1]:
import datetime
from pathlib import Path
import time
import re
from requests_html import HTML
import pandas as pd

In [2]:
from selenium import webdriver
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager

In [3]:
BASE_DIR = Path.cwd()
DATA_DIR = BASE_DIR / "data" # os.path.join(BASE_DIR, 'data')
if not DATA_DIR.exists(): # os.path.exists(DATA_DIR)
    DATA_DIR.mkdir(exist_ok=True) # os.makedirs(DATA_DIR, exist_ok=True)
    
product_category_links_output = DATA_DIR / "category-products.csv"
product_output = DATA_DIR / "products.csv"

In [4]:
options = Options()
options.add_argument("--headless")

driver = webdriver.Chrome(options=options,service=Service(ChromeDriverManager().install()))




[WDM] - Current google-chrome version is 103.0.5060
[WDM] - Get LATEST chromedriver version for 103.0.5060 google-chrome
[WDM] - Driver [C:\Users\kamil\.wdm\drivers\chromedriver\win32\103.0.5060.53\chromedriver.exe] found in cache


In [5]:
categories = [
    {"name": "toys-and-games", "url": "https://www.amazon.com/gp/bestsellers/toys-and-games"},
    {"name": "electronics", "url": "https://www.amazon.com/gp/bestsellers/electronics"},
    {"name": "books", "url": "https://www.amazon.com/gp/bestsellers/books"}
]

In [6]:
regex_options = [
    r"https://www.amazon.com/gp/product/(?P<product_id>[\w-]+)/",
    r"https://www.amazon.com/dp/(?P<product_id>[\w-]+)/",
    r"https://www.amazon.com/(?P<slug>[\w-]+)/dp/(?P<product_id>[\w-]+)/",
]

def extract_product_id_from_url(url):
    product_id = None
    for regex_str in regex_options:
        regex = re.compile(regex_str)
        match = regex.match(url)
        if match != None:
            try:
                product_id = match['product_id']
            except:
                pass
    return product_id

In [7]:
def clean_page_links(page_links=[], category=None):
    final_page_links = []
    for url in page_links:
        product_id = extract_product_id_from_url(url)
        if product_id != None:
            final_page_links.append({"url": url, "product_id": product_id, "category": category})
    return final_page_links

In [8]:
def scrape_category_product_links(categories=[]):
    all_product_links = []
    for category in categories:
        time.sleep(1.5)
        url = category.get("url")
        driver.get(url)
        body_el = driver.find_element(By.CSS_SELECTOR, "body")
        html_str = body_el.get_attribute("innerHTML")
        html_obj = HTML(html=html_str)
        page_links = [f"https://www.amazon.com{x}" for x in html_obj.links if x.startswith("/")]
        cleaned_links = clean_page_links(page_links=page_links, category=category)
        all_product_links += cleaned_links
    
    return all_product_links

In [9]:
def extract_categories_and_save(categories=[]):
    all_product_links = scrape_category_product_links(categories)
    category_df = pd.DataFrame(all_product_links)
    category_df.to_csv(product_category_links_output, index=False)

In [10]:
extract_categories_and_save(categories=categories)

In [11]:
def scrape_product_page(
    url, 
    title_lookup = "#productTitle",
    price_lookup = ".apexPriceToPay"
):
    driver.get(url)
    time.sleep(1.2)
    body_el = driver.find_element(By.CSS_SELECTOR, "body")
    html_str = body_el.get_attribute("innerHTML")
    html_obj = HTML(html=html_str)
    product_title = html_obj.find(title_lookup, first=True).text
    product_price = html_obj.find(price_lookup, first=True).text
    return product_title, product_price

In [12]:
def perform_scrape(cleaned_items=[]):
    data_extracted = []
    for obj in cleaned_items:
        link = obj['url']
        product_id = obj['product_id']
        title, price = (None, None)
        try:
            title, price = scrape_product_page(link)
        except:
            pass
        if title != None and price != None:
            print(link, title, price)
        product_data = {
            "url": link,
            "product_id": product_id,
            "title": title,
            "price": price
        }
        data_extracted.append(product_data)
        
    return data_extracted

In [13]:
# extracted_data = perform_scrape(cleaned_items=cleaned_links)

In [14]:
# print(extracted_data)

In [15]:
def row_scrape_event(row, *args, **kwargs):
    link = row['url']
    scraped = 0
    try:
        scraped = row['scraped']
    except:
        pass
#     print('link')
    if scraped == 1 or scraped == "1":
        return row
    product_id = row['product_id']
    title, price = (None, None)
    try:
        title, price = scrape_product_page(link)
    except:
        pass
    row['title'] = title
    row['price'] = price
    row['scraped'] = 1
    row['timestamp'] = datetime.datetime.now().timestamp()
#     print(title, price)
    return row

In [16]:
df = pd.read_csv(product_category_links_output)
df.head()

Unnamed: 0,url,product_id,category
0,https://www.amazon.com/Crayola-Colored-Pencil-...,B00006RVTS,"{'name': 'toys-and-games', 'url': 'https://www..."
1,https://www.amazon.com/SEQUENCE-Original-SEQUE...,B00000IVAK,"{'name': 'toys-and-games', 'url': 'https://www..."
2,https://www.amazon.com/Hoyle-Waterproof-Clear-...,B000J3Z7TC,"{'name': 'toys-and-games', 'url': 'https://www..."
3,https://www.amazon.com/Bentgo-Kids-Prints-Camo...,B07R2CNSTK,"{'name': 'toys-and-games', 'url': 'https://www..."
4,https://www.amazon.com/Funko-Pop-Marvel-Spider...,B09L1NML8K,"{'name': 'toys-and-games', 'url': 'https://www..."


In [17]:
df.shape

(90, 3)

In [18]:
df_sub = df.copy() # df.head(n=10)

In [None]:
df_sub = df_sub.apply(row_scrape_event, axis=1)

In [None]:
# df.to_csv(product_output, index=False)

In [None]:
products_df = pd.read_csv(product_output)
# products_df.head()

In [None]:
final_df = pd.concat([products_df, df_sub])
final_df.to_csv(product_output, index=False)

In [None]:
final_df.tail()