In [5]:
import asyncio
import json
import urllib.parse
from typing import List, Dict, Union
from httpx import AsyncClient, Response
from parsel import Selector
from urllib.parse import urlencode, quote_plus
# from loguru import logger as log

# initialize an async httpx client
client = AsyncClient(
    # enable http2
    http2=True,
    # add basic browser like headers to prevent getting blocked
    headers={
        "Accept-Language": "en-US,en;q=0.9",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Cookie": "intl_splash=false"
    },
)



In [6]:
def parse_search(response: Response) -> List[Dict]:
    """parse search data from search pages"""
    selector = Selector(response.text)
    data = []
    for item in selector.xpath("//ol[@class='sku-item-list']/li[@class='sku-item']"):
        name = item.xpath(".//h4[@class='sku-title']/a/text()").get()
        link = item.xpath(".//h4[@class='sku-title']/a/@href").get()
        price = item.xpath(".//div[@data-testid='customer-price']/span/text()").get()
        price = int(price[price.index("$") + 1:].replace(",", "").replace(".", "")) // 100 if price else None
        original_price = item.xpath(".//div[@data-testid='regular-price']/span/text()").get()
        original_price = int(original_price[original_price.index("$") + 1:].replace(",", "").replace(".", "")) // 100 if original_price else None
        sku = item.xpath(".//div[@class='sku-model']/div[2]/span[@class='sku-value']/text()").get()
        model = item.xpath(".//div[@class='sku-model']/div[1]/span[@class='sku-value']/text()").get()
        rating = item.xpath(".//p[contains(text(),'out of 5')]/text()").get()
        rating_count = item.xpath(".//span[contains(@class,'c-reviews')]/text()").get()
        is_sold_out = bool(item.xpath(".//strong[text()='Sold Out']").get())
        image = item.xpath(".//img[contains(@class,'product-image')]/@src").get()

        data.append({
            "name": name,
            "link": "https://www.bestbuy.com" + link,
            "image": image,
            "sku": sku,
            "model": model,
            "price": price,
            "original_price": original_price,
            "save": f"{round((1 - price / original_price) * 100, 2):.2f}%" if price and original_price else None,
            "rating": float(rating[rating.index(" "):rating.index(" out")].strip()) if rating else None,
            "rating_count": int(rating_count.replace("(", "").replace(")", "").replace(",", "")) if rating_count and rating_count != "Not Yet Reviewed" else None,
            "is_sold_out": is_sold_out,
        })
    total_count = selector.xpath("//span[@class='item-count']/text()").get()
    total_count = int(total_count.split(" ")[0]) // 18 # convert the total items to pages, 18 items in each page

    return {"data": data, "total_count": total_count}




In [7]:
async def scrape_search(
        search_query: str, sort: Union["-bestsellingsort", "-Best-Discount"] = None, max_pages=None
        ) -> List[Dict]:
    """scrape search data from bestbuy search"""

    def form_search_url(page_number: int):
        """form the search url"""
        base_url = "https://www.bestbuy.com/site/searchpage.jsp?"
        # search parameters
        params = {
            "st": quote_plus(search_query),
            "sp": sort, # None = best match
            "cp": page_number
        }
        return base_url + urlencode(params)

    first_page = await client.get(form_search_url(1))
    data = parse_search(first_page)
    search_data = data["data"]
    total_count = data["total_count"]

    # get the number of total search pages to scrape
    if max_pages and max_pages < total_count:
        total_count = max_pages

    print(f"scraping search pagination, {total_count - 1} more pages")
    # add the remaining pages to a scraping list to scrape them concurrently
    to_scrape = [
        client.get(form_search_url(page_number))
        for page_number in range(2, total_count + 1)
    ]
    for response in asyncio.as_completed(to_scrape):
        response = await response
        data = parse_search(response)["data"]
        search_data.extend(data)
    
    print(f"scraped {len(search_data)} products from search pages")
    return search_data

In [8]:
async def run():
    search_data = await scrape_search(
        search_query="macbook",
        max_pages=3
    )
    # save the results to a JSOn file
    with open("search.json", "w", encoding="utf-8") as file:
        json.dump(search_data, file, indent=2, ensure_ascii=False)    


In [9]:
asyncio.run(run())

AttributeError: module 'asyncio' has no attribute 'run'