In [1]:
#pip install undetected-chromedriver selenium beautifulsoup4

In [1]:
import time
import random
import re
import math
import csv
import os
from datetime import date
from urllib.parse import quote_plus, urljoin

import undetected_chromedriver as uc
from bs4 import BeautifulSoup


# ===================== CONFIG =====================
BASE = "https://www.amazon.eg"

SCRAPE_DATE = date.today().isoformat()  # YYYY-MM-DD

DATA_DIR = "Data"
os.makedirs(DATA_DIR, exist_ok=True)

OUT_CSV = os.path.join(DATA_DIR, f"amazon_eg_{SCRAPE_DATE}.csv")

MAX_PAGES_PER_QUERY = 7  # cap pages per query


# ===================== SEARCH MAP (groups) =====================
# Each inner list is a "keyword group" -> we join it into one query string.
# Example: ["oil", "cooking oil"] -> "oil cooking oil"
SEARCH_MAP = {
    "Grocery": [
        ["oil", "cooking oil"],
        ["rice"],
        ["sugar"],
        ["pasta"],
        ["tea"],
        ["coffee"],
        ["milk"],
        ["tuna"],
        ["tomato paste"],
        ["detergent"],
        ["dishwashing liquid"],
        ["diapers"]
    ],

    "Electronics": [
        ["smartphone"],
        ["mobile phone"],
        ["laptop"],
        ["tablet"],
        ["smart watch"],
        ["earbuds"],
        ["power bank"],
        ["headphones"]
    ],

    "Home_Appliances": [
        ["air fryer"],
        ["microwave"],
        ["coffee machine"],
        ["electric kettle"],
        ["blender"],
        ["vacuum cleaner"]
    ],

    "TV_and_Screens": [
        ["smart tv"],
        ["tv"],
        ["led tv"],
        ["uhd tv"],
        ["android tv"]
    ]
}


# ===================== HELPERS =====================
def clean_text(x: str) -> str:
    return re.sub(r"\s+", " ", (x or "")).strip()


def is_blocked(html: str) -> bool:
    return bool(re.search(
        r"Robot Check|Enter the characters you see|Sorry, we just need to make sure|captcha",
        html,
        re.I
    ))


def build_driver(headless: bool = False):
    options = uc.ChromeOptions()

    # headless may increase blocking; start with False
    if headless:
        options.add_argument("--headless=new")

    options.add_argument("--window-size=1280,900")
    options.add_argument("--lang=en-US")
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")

    options.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )

    driver = uc.Chrome(options=options)
    driver.set_page_load_timeout(60)
    return driver


def human_scroll(driver):
    steps = random.randint(2, 4)
    for i in range(steps):
        driver.execute_script(
            f"window.scrollTo(0, document.body.scrollHeight*{(i + 1) / (steps + 1)});"
        )
        time.sleep(random.uniform(1.0, 2.0))


def fetch_html(driver, url: str) -> str:
    driver.get(url)
    time.sleep(random.uniform(3.5, 5.5))
    human_scroll(driver)
    time.sleep(random.uniform(1.5, 2.5))
    return driver.page_source


def get_search_url(query: str, page: int) -> str:
    return f"{BASE}/s?k={quote_plus(query)}&page={page}"


# ===================== EXTRACTORS =====================
def extract_title_and_url(card):
    """
    Title priority:
      1) h2[aria-label] (most accurate)
      2) h2 span
    URL:
      1) parent anchor of h2
      2) fallback any /dp/ link
    """
    title, url = "", ""

    h2 = card.select_one("h2.a-size-base-plus")
    if h2:
        if h2.get("aria-label"):
            title = clean_text(h2.get("aria-label"))

        if not title:
            span = h2.select_one("span")
            if span:
                title = clean_text(span.get_text())

        a = h2.find_parent("a", href=True)
        if a and a.get("href"):
            url = urljoin(BASE, a.get("href"))

    if not url:
        a2 = card.select_one('a[href*="/dp/"]')
        if a2 and a2.get("href"):
            url = urljoin(BASE, a2.get("href"))

    return title, url


def extract_price(card) -> str:
    price_el = card.select_one("span.a-price > span.a-offscreen")
    return clean_text(price_el.get_text()) if price_el else ""


def extract_old_price(card) -> str:
    # Old price: List: or Was:
    for prefix in ("List:", "Was:"):
        block = card.select_one(
            f'div[aria-hidden^="{prefix}"] span.a-price.a-text-price span.a-offscreen'
        )
        if block and clean_text(block.get_text()):
            return clean_text(block.get_text())
    return ""


def extract_rating_reviews(card):
    rating_el = card.select_one("span.a-icon-alt")  # e.g. "4.1 out of 5 stars"
    rating = clean_text(rating_el.get_text()) if rating_el else ""

    reviews_el = card.select_one('a[href*="#customerReviews"] span') or card.select_one("span.a-size-base")
    reviews = clean_text(reviews_el.get_text()) if reviews_el else ""

    return rating, reviews


def extract_stock_status(card):
    """
    Returns:
      stock_status: in_stock / out_of_stock / unknown
      stock_text: message if found
    """
    aria = card.select_one('span[aria-label*="stock" i], span[aria-label*="unavailable" i]')
    msg = clean_text(aria.get("aria-label")) if aria and aria.get("aria-label") else ""

    if msg:
        low = msg.lower()
        if "left in stock" in low or "in stock" in low:
            return "in_stock", msg
        if "currently unavailable" in low or "out of stock" in low or "unavailable" in low:
            return "out_of_stock", msg

    txt = card.get_text(" ", strip=True).lower()
    if "currently unavailable" in txt or "out of stock" in txt:
        return "out_of_stock", "Currently unavailable / Out of stock"
    if "left in stock" in txt or "in stock" in txt:
        return "in_stock", "In stock"

    return "unknown", ""


# ===================== PARSING =====================
def parse_search_page(html: str, category: str, group_keywords: list[str], query: str):
    soup = BeautifulSoup(html, "html.parser")
    rows = []

    for card in soup.select('div[data-component-type="s-search-result"][data-asin]'):
        asin = (card.get("data-asin") or "").strip()
        if not asin:
            continue

        title, url = extract_title_and_url(card)
        if not title and not url:
            continue

        price = extract_price(card)
        old_price = extract_old_price(card)
        rating, reviews = extract_rating_reviews(card)
        stock_status, stock_text = extract_stock_status(card)

        rows.append({
            "scrape_date": SCRAPE_DATE,
            "category": category,
            "keyword_group": " | ".join(group_keywords),  # keep original group
            "query": query,  # joined group query used in URL
            "asin": asin,
            "title": title,
            "url": url,
            "price": price,
            "old_price": old_price,
            "rating": rating,
            "reviews": reviews,
            "stock_status": stock_status,
            "stock_text": stock_text
        })

    return rows


# ===================== PAGE COUNT =====================
def extract_total_results(html: str) -> int:
    """
    Tries to read: "1-48 of over 10,000 results for" OR "1-48 of 312 results for"
    If not found, returns 0.
    """
    soup = BeautifulSoup(html, "html.parser")
    text = soup.get_text(" ", strip=True)

    m = re.search(r"of\s+(over\s+)?([\d,]+)\s+results", text, re.I)
    if not m:
        return 0

    try:
        return int(m.group(2).replace(",", ""))
    except:
        return 0


def compute_pages_to_scrape(first_page_html: str, max_pages: int) -> int:
    total_results = extract_total_results(first_page_html)
    if total_results <= 0:
        return 0

    # Commonly ~48 items per page on desktop (may vary)
    estimated_pages = max(1, math.ceil(total_results / 48))
    return min(estimated_pages, max_pages)


# ===================== MAIN =====================
def scrape_daily(search_map: dict, max_pages_per_query: int = MAX_PAGES_PER_QUERY, out_csv: str = OUT_CSV, headless: bool = False):
    driver = build_driver(headless=headless)
    all_rows = []

    try:
        # warm session
        driver.get(BASE)
        time.sleep(random.uniform(2.0, 4.0))

        for category, keyword_groups in search_map.items():
            for group in keyword_groups:
                # group is list[str]
                group_keywords = group
                query = " ".join(group_keywords).strip()

                if not query:
                    continue

                # ---- page 1 first ----
                url1 = get_search_url(query, 1)
                html1 = fetch_html(driver, url1)

                if is_blocked(html1):
                    raise RuntimeError(
                        "CAPTCHA / Robot Check detected. Run headless=False, solve it once, then rerun."
                    )

                pages_to_scrape = compute_pages_to_scrape(html1, max_pages_per_query)
                if pages_to_scrape == 0:
                    print(f"[{SCRAPE_DATE}] SKIP (no results): {category} | {group_keywords}")
                    continue

                rows1 = parse_search_page(html1, category, group_keywords, query)
                print(f"[{SCRAPE_DATE}] {category} | {group_keywords} | page 1/{pages_to_scrape} -> {len(rows1)} items")
                all_rows.extend(rows1)

                # early stop if page1 has no cards
                if len(rows1) == 0:
                    continue

                time.sleep(random.uniform(3.0, 6.0))

                # ---- remaining pages ----
                for p in range(2, pages_to_scrape + 1):
                    urlp = get_search_url(query, p)
                    htmlp = fetch_html(driver, urlp)

                    if is_blocked(htmlp):
                        raise RuntimeError(
                            "CAPTCHA / Robot Check detected during paging. Reduce pages/delays or run headless=False."
                        )

                    rowsp = parse_search_page(htmlp, category, group_keywords, query)
                    print(f"[{SCRAPE_DATE}] {category} | {group_keywords} | page {p}/{pages_to_scrape} -> {len(rowsp)} items")
                    all_rows.extend(rowsp)

                    # stop early if empty
                    if len(rowsp) == 0:
                        break

                    time.sleep(random.uniform(3.0, 6.0))

    finally:
        driver.quit()

    # save CSV
    if all_rows:
        with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
            writer = csv.DictWriter(f, fieldnames=list(all_rows[0].keys()))
            writer.writeheader()
            writer.writerows(all_rows)

    return all_rows


if __name__ == "__main__":
    data = scrape_daily(
        search_map=SEARCH_MAP,
        max_pages_per_query=MAX_PAGES_PER_QUERY,
        out_csv=OUT_CSV,
        headless=False  # start with False
    )

    print("DONE:", len(data), "->", OUT_CSV)



KeyboardInterrupt



In [4]:
import time
import random
import re
import math
import csv
from datetime import date
from urllib.parse import quote_plus, urljoin

import undetected_chromedriver as uc
from bs4 import BeautifulSoup


BASE = "https://www.amazon.eg"

# ===================== إعدادات التجميع اليومي =====================
SCRAPE_DATE = date.today().isoformat()
OUT_CSV = f"amazon_eg_{SCRAPE_DATE}.csv"

# ===================== Max pages =====================
MAX_PAGES_PER_QUERY = 5  # ✅ لو أكتر من كده نجمع 5 بس

SEARCH_MAP = {
    # الفئة دي هي الأسهل في الـ Matching لأن الموديلات واضحة جداً
    "Smartphones": [
        "iPhone 16 128GB", "iPhone 15 128GB", "Samsung Galaxy S24 Ultra", 
        "Samsung Galaxy A55", "Xiaomi Redmi Note 13", "realme 12 Pro"
    ],

    # فئة أساسية لكشف خصومات "الجمعة البيضاء" الوهمية
    "Laptops_and_Screens": [
        "MacBook Air M2 13", "HP Victus 15", "Lenovo IdeaPad 3",
        "Samsung TV 43 inch Crystal", "LG TV 50 inch UHD", "Toshiba TV 43 inch"
    ],

    # فئة التضخم (Inflation) - ركزت على براندات محددة لضمان دقة المقارنة
    "Grocery_Inflation_Index": [
        "Crystal sunflower oil 0.8L", "Sultana Rice 1kg", "Lipton Tea 100 Bags",
        "Nescafe Red Cup 200g", "Almarai Milk Full Cream 1L", "Fern Ghee 700g",
        "Persil Gel 3L", "Pampers Size 4", "Dettol Liquid 500ml"
    ],

    # أجهزة منزلية (High Ticket Items)
    "Home_Appliances": [
        "Philips Air Fryer XL", "Black and Decker Espresso Machine", 
        "Tornado Electric Kettle 1.7L", "Fresh Microwave 25L"
    ],

    # إكسسوارات براندات (High Quality Data)
    "Tech_Accessories": [
        "AirPods Pro 2", "Samsung Buds FE", "Anker PowerCore 20000",
        "Apple Watch Series 9", "Xiaomi Smart Band 8"
    ]
}



# ===================== Helpers =====================
def clean_text(x: str) -> str:
    return re.sub(r"\s+", " ", (x or "")).strip()


def is_blocked(html: str) -> bool:
    return bool(re.search(
        r"Robot Check|Enter the characters you see|Sorry, we just need to make sure|captcha",
        html,
        re.I
    ))


def build_driver(headless: bool = False):
    options = uc.ChromeOptions()
    if headless:
        options.add_argument("--headless=new")

    options.add_argument("--window-size=1280,900")
    options.add_argument("--lang=en-US")
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )

    driver = uc.Chrome(options=options)
    driver.set_page_load_timeout(60)
    return driver


def human_scroll(driver):
    steps = random.randint(2, 4)
    for i in range(steps):
        driver.execute_script(
            f"window.scrollTo(0, document.body.scrollHeight*{(i+1)/(steps+1)});"
        )
        time.sleep(random.uniform(1.0, 2.0))


def fetch_html(driver, url: str) -> str:
    driver.get(url)
    time.sleep(random.uniform(3.5, 5.5))
    human_scroll(driver)
    time.sleep(random.uniform(1.5, 2.5))
    return driver.page_source


def get_search_url(query: str, page: int) -> str:
    return f"{BASE}/s?k={quote_plus(query)}&page={page}"


# ===================== Extractors =====================
def extract_title_and_url(card):
    title, url = "", ""
    h2 = card.select_one("h2.a-size-base-plus")

    if h2:
        if h2.get("aria-label"):
            title = clean_text(h2.get("aria-label"))
        if not title:
            span = h2.select_one("span")
            if span:
                title = clean_text(span.get_text())

        a = h2.find_parent("a", href=True)
        if a and a.get("href"):
            url = urljoin(BASE, a.get("href"))

    if not url:
        a2 = card.select_one('a[href*="/dp/"]')
        if a2 and a2.get("href"):
            url = urljoin(BASE, a2.get("href"))

    return title, url


def extract_price(card) -> str:
    price_el = card.select_one("span.a-price > span.a-offscreen")
    return clean_text(price_el.get_text()) if price_el else ""


def extract_old_price(card) -> str:
    for prefix in ("List:", "Was:"):
        block = card.select_one(
            f'div[aria-hidden^="{prefix}"] span.a-price.a-text-price span.a-offscreen'
        )
        if block and clean_text(block.get_text()):
            return clean_text(block.get_text())
    return ""


def extract_rating_reviews(card):
    rating_el = card.select_one("span.a-icon-alt")
    rating = clean_text(rating_el.get_text()) if rating_el else ""

    reviews_el = card.select_one('a[href*="#customerReviews"] span') or card.select_one("span.a-size-base")
    reviews = clean_text(reviews_el.get_text()) if reviews_el else ""

    return rating, reviews


def extract_stock_status(card):
    aria_span = card.select_one('span[aria-label*="stock" i], span[aria-label*="unavailable" i]')
    aria_msg = clean_text(aria_span.get("aria-label")) if aria_span and aria_span.get("aria-label") else ""

    if aria_msg:
        msg_l = aria_msg.lower()
        if "left in stock" in msg_l or "in stock" in msg_l:
            return "in_stock", aria_msg
        if "currently unavailable" in msg_l or "out of stock" in msg_l or "unavailable" in msg_l:
            return "out_of_stock", aria_msg

    full_text = card.get_text(" ", strip=True).lower()
    if "currently unavailable" in full_text or "out of stock" in full_text:
        return "out_of_stock", "Currently unavailable / Out of stock"
    if "left in stock" in full_text or "in stock" in full_text:
        return "in_stock", "In stock"

    return "unknown", ""


# ===================== Parse page =====================
def parse_search_page(html: str, category: str, query: str):
    soup = BeautifulSoup(html, "html.parser")
    rows = []

    for card in soup.select('div[data-component-type="s-search-result"][data-asin]'):
        asin = (card.get("data-asin") or "").strip()
        if not asin:
            continue

        title, url = extract_title_and_url(card)
        price = extract_price(card)
        old_price = extract_old_price(card)
        rating, reviews = extract_rating_reviews(card)
        stock_status, stock_text = extract_stock_status(card)

        if not title and not url:
            continue

        rows.append({
            "scrape_date": SCRAPE_DATE,
            "category": category,
            "query": query,
            "asin": asin,
            "title": title,
            "url": url,
            "price": price,
            "old_price": old_price,
            "rating": rating,
            "reviews": reviews,
            "stock_status": stock_status,
            "stock_text": stock_text
        })

    return rows


# ===================== Page count logic =====================
def extract_total_results(html: str) -> int:
    """
    Amazon غالبًا بيكتب:
    "1-48 of over 10,000 results for"
    أو "1-48 of 312 results for"
    هنستخرج الرقم النهائي ونحوّله int.
    """
    soup = BeautifulSoup(html, "html.parser")
    txt = ""

    # أشكال شائعة لنتائج البحث
    el = soup.select_one("span.sg-col-inner .a-section.a-spacing-small.a-spacing-top-small")
    if el:
        txt = el.get_text(" ", strip=True)
    if not txt:
        # fallback: search in whole page text (خفيف)
        txt = soup.get_text(" ", strip=True)

    # نمط: of 312 results / of over 10,000 results
    m = re.search(r"of\s+(over\s+)?([\d,]+)\s+results", txt, re.I)
    if not m:
        return 0

    num = m.group(2).replace(",", "")
    try:
        return int(num)
    except:
        return 0


def compute_pages_to_scrape(first_page_html: str, max_pages: int) -> int:
    total_results = extract_total_results(first_page_html)
    if total_results <= 0:
        return 0

    # Amazon عادة 48 نتيجة لكل صفحة على الديسكتوب (أحيانًا 24/16)
    # هنفترض 48 ونستخدم ceil. حتى لو مختلف شويه، عندنا early stop لو صفحة فاضية.
    estimated_pages = max(1, math.ceil(total_results / 48))
    return min(estimated_pages, max_pages)


# ===================== Main Scraper =====================
def scrape_daily(search_map: dict, max_pages_per_query: int = MAX_PAGES_PER_QUERY, out_csv: str = OUT_CSV, headless: bool = False):
    driver = build_driver(headless=headless)
    all_rows = []

    try:
        # Warm session
        driver.get(BASE)
        time.sleep(random.uniform(2.0, 4.0))

        for category, queries in search_map.items():
            for q in queries:
                # ---- Fetch page 1 first (to detect pages/results) ----
                url1 = get_search_url(q, 1)
                html1 = fetch_html(driver, url1)

                if is_blocked(html1):
                    raise RuntimeError("CAPTCHA / Robot Check ظهر. شغّلي headless=False وحليه مرة ثم أعيدي التشغيل.")

                pages_to_scrape = compute_pages_to_scrape(html1, max_pages_per_query)

                if pages_to_scrape == 0:
                    print(f"[{SCRAPE_DATE}] SKIP (no results): Category='{category}' Query='{q}'")
                    continue

                # ---- Parse page 1 ----
                rows1 = parse_search_page(html1, category=category, query=q)
                print(f"[{SCRAPE_DATE}] Category='{category}' Query='{q}' Page=1/{pages_to_scrape} -> {len(rows1)} items")
                all_rows.extend(rows1)

                # Early stop لو الصفحة الأولى مافيهاش منتجات فعلية
                if len(rows1) == 0:
                    continue

                time.sleep(random.uniform(3.0, 6.0))

                # ---- باقي الصفحات حتى الحد ----
                for p in range(2, pages_to_scrape + 1):
                    urlp = get_search_url(q, p)
                    htmlp = fetch_html(driver, urlp)

                    if is_blocked(htmlp):
                        raise RuntimeError("CAPTCHA / Robot Check ظهر أثناء الصفحات. قللي عدد الصفحات/زودي delays.")

                    rowsp = parse_search_page(htmlp, category=category, query=q)
                    print(f"[{SCRAPE_DATE}] Category='{category}' Query='{q}' Page={p}/{pages_to_scrape} -> {len(rowsp)} items")
                    all_rows.extend(rowsp)

                    # ✅ لو صفحة طلعت فاضية نوقف بدري
                    if len(rowsp) == 0:
                        break

                    time.sleep(random.uniform(3.0, 6.0))

    finally:
        driver.quit()

    # Save CSV
    if all_rows:
        with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
            writer = csv.DictWriter(f, fieldnames=list(all_rows[0].keys()))
            writer.writeheader()
            writer.writerows(all_rows)

    return all_rows


if __name__ == "__main__":
    data = scrape_daily(
        search_map=SEARCH_MAP,
        max_pages_per_query=MAX_PAGES_PER_QUERY,  # ✅ يجمع لحد 5
        out_csv=OUT_CSV,
        headless=False
    )
    print("DONE:", len(data), "->", OUT_CSV)


[2025-12-25] Category='Mobiles' Query='iPhone 16' Page=1/4 -> 51 items
[2025-12-25] Category='Mobiles' Query='iPhone 16' Page=2/4 -> 58 items
[2025-12-25] Category='Mobiles' Query='iPhone 16' Page=3/4 -> 60 items
[2025-12-25] Category='Mobiles' Query='iPhone 16' Page=4/4 -> 60 items
[2025-12-25] Category='Mobiles' Query='iPhone 15' Page=1/4 -> 48 items
[2025-12-25] Category='Mobiles' Query='iPhone 15' Page=2/4 -> 52 items
[2025-12-25] Category='Mobiles' Query='iPhone 15' Page=3/4 -> 52 items
[2025-12-25] Category='Mobiles' Query='iPhone 15' Page=4/4 -> 56 items
[2025-12-25] Category='Mobiles' Query='iPhone 14' Page=1/4 -> 48 items
[2025-12-25] Category='Mobiles' Query='iPhone 14' Page=2/4 -> 55 items
[2025-12-25] Category='Mobiles' Query='iPhone 14' Page=3/4 -> 60 items
[2025-12-25] Category='Mobiles' Query='iPhone 14' Page=4/4 -> 60 items
[2025-12-25] Category='Mobiles' Query='Samsung Galaxy S' Page=1/4 -> 50 items
[2025-12-25] Category='Mobiles' Query='Samsung Galaxy S' Page=2/4 -> 5

In [None]:
# 1. Electronics & Gadgets (Targeting high price fluctuations)
electronics_keywords = [
    "iPhone 15", "iPhone 14", "Samsung Galaxy S23", "Samsung Galaxy A54", 
    "Xiaomi Redmi Note 12", "Realme 11", "MacBook Air M2", "HP Pavilion", 
    "Dell Vostro", "Lenovo IdeaPad", "ASUS Vivobook", "AirPods Pro", 
    "Samsung Buds", "Smart Watch Ultra", "Power Bank 20000mAh", 
    "Air Fryer", "Espresso Machine", "Microwave", "Electric Kettle"
]

# 2. Groceries & Essentials (Targeting inflation tracking)
grocery_keywords = [
    "Sunflower Oil 1L", "Corn Oil", "White Rice 1kg", "Pasta 400g", 
    "Sugar 1kg", "Ghee", "Full Cream Milk 1L", "Instant Coffee 200g", 
    "Tea Bags 100", "Cheddar Cheese", "Laundry Detergent Powder", 
    "Dishwashing Liquid", "Shampoo 400ml", "Toilet Paper"
]

# 3. All Keywords (If you want to run one big loop)
search_queries = electronics_keywords + grocery_keywords

# Example of how to use it in your scraper:
# for query in search_queries:
#     print(f"Scraping results for: {query}")
#     # Your scraping logic here...

In [5]:
import re
from urllib.parse import urljoin
from bs4 import BeautifulSoup

BASE = "https://www.amazon.eg"

def clean_text(x: str) -> str:
    return re.sub(r"\s+", " ", (x or "")).strip()

def extract_title_and_url(card):
    """
    Amazon search cards vary a lot, so we try multiple selectors.
    Return: (title, url)
    """
    # 1) Most common: h2 a
    a = card.select_one("h2 a.a-link-normal[href]")
    if not a:
        # 2) Sometimes anchor has different classes
        a = card.select_one('a.a-link-normal.s-underline-text.s-underline-link-text.s-link-style[href]')
    if not a:
        # 3) Fallback: image container link often exists
        a = card.select_one('span[data-component-type="s-product-image"] a.a-link-normal[href]')
    if not a:
        # 4) Any product link that looks like /dp/...
        a = card.select_one('a[href*="/dp/"]')

    url = urljoin(BASE, a.get("href")) if a else ""

    # Title could be inside h2 span OR in aria-label
    title = ""
    if a:
        span = a.select_one("span")
        if span and clean_text(span.get_text()):
            title = clean_text(span.get_text())
        else:
            # sometimes title is on h2 aria-label
            h2 = card.select_one("h2")
            if h2 and h2.get("aria-label"):
                title = clean_text(h2.get("aria-label"))

    # Another fallback: h2 span directly
    if not title:
        t = card.select_one("h2 span")
        if t:
            title = clean_text(t.get_text())

    return title, url

def extract_price(card):
    # current price
    price_el = card.select_one("span.a-price > span.a-offscreen")
    price = clean_text(price_el.get_text()) if price_el else ""

    return price

def extract_old_price(card):
    """
    Old price appears as:
    - List: ... OR Was: ... inside div[aria-hidden^="List:"] / div[aria-hidden^="Was:"]
    We grab the numeric from span.a-price.a-text-price span.a-offscreen.
    """
    old = ""

    # Prefer "List:" then "Was:" (you can swap priority if you want)
    for prefix in ("List:", "Was:"):
        block = card.select_one(f'div[aria-hidden^="{prefix}"] span.a-price.a-text-price span.a-offscreen')
        if block and clean_text(block.get_text()):
            old = clean_text(block.get_text())
            break

    return old

def extract_rating_reviews(card):
    rating_el = card.select_one("span.a-icon-alt")  # e.g. "4.1 out of 5 stars"
    rating = clean_text(rating_el.get_text()) if rating_el else ""

    # reviews count commonly in: a[href*="#customerReviews"] span, or span.a-size-base
    reviews_el = card.select_one('a[href*="#customerReviews"] span') or card.select_one("span.a-size-base")
    reviews = clean_text(reviews_el.get_text()) if reviews_el else ""

    return rating, reviews

def parse_search_page(html: str, query: str):
    soup = BeautifulSoup(html, "html.parser")

    rows = []
    for card in soup.select('div[data-component-type="s-search-result"][data-asin]'):
        asin = (card.get("data-asin") or "").strip()
        if not asin:
            continue

        title, url = extract_title_and_url(card)
        price = extract_price(card)
        old_price = extract_old_price(card)
        rating, reviews = extract_rating_reviews(card)

        # Skip junk results without url/title
        if not url and not title:
            continue

        rows.append({
            "query": query,
            "asin": asin,
            "title": title,
            "url": url,
            "price": price,
            "old_price": old_price,
            "rating": rating,
            "reviews": reviews,
        })

    return rows


In [6]:
import time, random, csv
from urllib.parse import quote_plus
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

BASE = "https://www.amazon.eg"

def selenium_html(url: str, headless=False) -> str:
    opts = Options()
    if headless:
        opts.add_argument("--headless=new")
    opts.add_argument("--window-size=1280,900")
    opts.add_argument("--disable-blink-features=AutomationControlled")
    opts.add_argument("--lang=en-US")
    opts.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )

    driver = webdriver.Chrome(options=opts)
    try:
        driver.get(url)
        time.sleep(4)

        # help load cards
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight*0.7);")
        time.sleep(2)

        return driver.page_source
    finally:
        driver.quit()

def scrape_queries(queries, pages_per_query=3, out_csv="amazon_eg_multi.csv", headless=False):
    all_rows = []

    for q in queries:
        for p in range(1, pages_per_query + 1):
            url = f"{BASE}/s?k={quote_plus(q)}&page={p}"
            html = selenium_html(url, headless=headless)

            # IMPORTANT: uses parse_search_page from section (1)
            rows = parse_search_page(html, query=q)

            print(f"Query='{q}' Page={p} -> {len(rows)} items")
            all_rows.extend(rows)

            time.sleep(random.uniform(3, 6))

    if all_rows:
        with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
            w = csv.DictWriter(f, fieldnames=list(all_rows[0].keys()))
            w.writeheader()
            w.writerows(all_rows)

    return all_rows

if __name__ == "__main__":
    ITEMS_TO_SEARCH = ["mobile", "iphone", "samsung", "xiaomi redmi"]
    data = scrape_queries(ITEMS_TO_SEARCH, pages_per_query=5, out_csv="amazon_eg_multi.csv", headless=False)
    print("DONE", len(data))


Query='mobile' Page=1 -> 48 items
Query='mobile' Page=2 -> 51 items
Query='mobile' Page=3 -> 51 items
Query='mobile' Page=4 -> 51 items
Query='mobile' Page=5 -> 51 items
Query='iphone' Page=1 -> 48 items
Query='iphone' Page=2 -> 48 items
Query='iphone' Page=3 -> 53 items
Query='iphone' Page=4 -> 53 items
Query='iphone' Page=5 -> 53 items
Query='samsung' Page=1 -> 60 items
Query='samsung' Page=2 -> 58 items
Query='samsung' Page=3 -> 58 items
Query='samsung' Page=4 -> 58 items
Query='samsung' Page=5 -> 58 items
Query='xiaomi redmi' Page=1 -> 50 items
Query='xiaomi redmi' Page=2 -> 51 items
Query='xiaomi redmi' Page=3 -> 51 items
Query='xiaomi redmi' Page=4 -> 51 items
Query='xiaomi redmi' Page=5 -> 50 items
DONE 1052
