In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import re

In [None]:
# Function to extract Product ID (ASIN) from URL or HTML
def get_product_id(url, soup=None):
    try:
        if "/dp/" in url:
            return url.split("/dp/")[1].split("/")[0]
        elif "/product/" in url:
            return url.split("/product/")[1].split("/")[0]
        else:
            match = re.search(r"/([A-Z0-9]{10})(?:[/?]|$)", url)
            if match:
                return match.group(1)
    except:
        pass

    if soup:
        try:
            return soup.find("div", attrs={"data-asin": True})["data-asin"]
        except Exception:
            return "Unknown"
    return "Unknown"

In [None]:
# Function to extract discount percentage
def extract_discount_percentage(soup):
    try:
        discount_span = soup.find("span", class_="savingsPercentage")
        if discount_span:
            return discount_span.text.strip()
        return "No discount"
    except Exception as e:
        return f"Error: {str(e)}"


In [None]:
# Function to extract actual price
def extract_actual_price(soup):
    try:
        actual_price_span = soup.find("span", class_="a-price a-text-price")
        if actual_price_span:
            actual_price = actual_price_span.find("span", class_="a-offscreen").text.strip()
            return actual_price
        return "Actual price not found"
    except Exception as e:
        return f"Error: {str(e)}"

In [None]:
# Function to extract 'About this item' (feature bullets)
def extract_about_product(soup):
    try:
        about_section = soup.find("div", id="feature-bullets")
        if not about_section:
            return "No About this item section found."
        bullet_points = about_section.find_all("span", class_="a-list-item")
        about_product = [point.text.strip() for point in bullet_points]
        about_product = "; ".join(about_product)  # Concatenate features with semicolons
        return about_product
    except Exception as e:
        return f"Error: {str(e)}"

In [None]:
# Function to extract specifications from the product page
def extract_specifications(soup):
    try:
        specifications_section = soup.find("div", class_="a-section a-spacing-small a-spacing-top-small")
        specifications = {}

        if specifications_section:
            table = specifications_section.find("table", class_="a-normal a-spacing-micro")
            rows = table.find_all("tr") if table else []

            for row in rows:
                cols = row.find_all("td")
                if len(cols) == 2:
                    label = cols[0].get_text(strip=True)
                    value = cols[1].get_text(strip=True)
                    specifications[label] = value

        return specifications
    except Exception as e:
        return f"Error: {str(e)}"

In [None]:
# Function to detect product category based on URL, title, or brand
def detect_category(url, title, brand):
    text = f"{url} {title} {brand}".lower()

    # Apple product categories
    if "iphone" in text:
        return "phone"
    if "macbook" in text:
        return "computer"
    if "ipad" in text:
        return "tablet"
    if "imac" in text:
        return "computer"
    if "apple watch" in text or "watch" in text:
        return "watch"
    if "airpods" in text:
        return "earburd"

    # Samsung product categories
    if "galaxy" in text and "s" in text:
        return "phone"
    if "galaxy a" in text:
        return "phone"
    if "galaxy tab" in text:
        return "tablet"
    if "samsung" in text and "laptop" in text:
        return "computer"
    if "samsung" in text and "monitor" in text:
        return "computer"

    # Laptop generic
    if "laptop" in text or "notebook" in text:
        return "computer"

    # Desktop / PC
    if "desktop" in text or "computer" in text or "pc" in text:
        return "computer"

    # Monitor
    if "monitor" in text or "display" in text:
        return "computer"

    # Tablet
    if "tablet" in text:
        return "tablet"

    # Headphone / Earbuds
    if "earbuds" in text or "headphone" in text or "earphone" in text:
        return "earburd"

    return "Other"


In [None]:
# Function to extract Product Info
def extract_product_data(product_url):
    HEADERS = {'User-Agent': '', 'Accept-Language': 'en-US, en;q=0.5'}
    try:
        webpage = requests.get(product_url, headers=HEADERS, timeout=10)
        soup = BeautifulSoup(webpage.content, "html.parser")

        brand_tag = soup.find("a", id="bylineInfo")
        if not brand_tag:
            brand_tag = soup.find("span", id="bylineInfo")
        brand = brand_tag.text.strip() if brand_tag else ""
        product_id = get_product_id(product_url, soup)
        title = soup.find("span", attrs={"id": 'productTitle'}).text.strip() if soup.find("span", attrs={"id": 'productTitle'}) else ""
        price_whole = soup.find("span", class_="a-price-whole")
        price_fraction = soup.find("span", class_="a-price-fraction")
        price = f"{price_whole.text.strip()}.{price_fraction.text.strip()}" if price_whole and price_fraction else ""
        rating = soup.find("span", attrs={'class': 'a-icon-alt'}).text.strip() if soup.find("span", attrs={'class': 'a-icon-alt'}) else ""
        review_count = soup.find("span", attrs={'id': 'acrCustomerReviewText'}).text.strip() if soup.find("span", attrs={'id': 'acrCustomerReviewText'}) else ""
        availability = soup.find("div", attrs={'id': 'availability'}).find("span").text.strip() if soup.find("div", attrs={'id': 'availability'}) else "Not Available"
        discount_percentage = extract_discount_percentage(soup)
        actual_price = extract_actual_price(soup)
        about_product = extract_about_product(soup)
        specifications = extract_specifications(soup)  # Extract specifications

        # Gộp 'about_product' vào 'specifications'
        if "About this item" in specifications:
            specifications["About this item"] += "; " + about_product
        else:
            specifications["About this item"] = about_product

        # Adjust actual_price if there is no discount
        if discount_percentage == "No discount":
            actual_price = price

        return {
            "brand":brand,
            "product_id": product_id,
            "title": title,
            "price": price,
            "actual_price": actual_price,
            "discount_percentage": discount_percentage,
            "rating": rating,
            "reviews": review_count,
            "availability": availability,
            "specifications": specifications  # Updated to include specifications with About this item
        }
    except Exception as e:
        return {
            "brand": "",
            "product_id": "",
            "title": "",
            "price": "",
            "actual_price": "",
            "discount_percentage": "",
            "rating": "",
            "reviews": "",
            "availability": "",
            "specifications": ""
        }

In [None]:
def clean_brand(raw_brand):
    if raw_brand is None:
        return ""

    text = raw_brand.lower()

    if "samsung" in text:
        return "Samsung"
    if "apple" in text:
        return "Apple"

    return ""


In [None]:
# Main Code
if __name__ == '__main__':
    HEADERS = {'User-Agent': '', 'Accept-Language': 'en-US, en;q=0.5'}
    base_url = "https://www.amazon.com/s?k=macbook&crid=1SPSF1ATIYFYD&sprefix=macbook%2Caps%2C422&ref=nb_sb_ss_p13n-expert-pd-ops-ranker_1_7"

    cookies = {
        'session-id': '136-1190426-9756923',
        'at': 'main Atza|IwEBIHQG-VUInMAWCyZGqZ-p5UDaPOLYEqntop-AaAqKbOla3genUGQXeWo4nY3s3eYJgjZBTSSPDd3rhFv00wqQHGp2F_aJAqNUAjpm3-2Iaef3a-MjNt34sFk_JvN7Q48tdgdi_0RxWeL26ipjfpSfnJZlx7qMTN1BMjxkPqxfoef_VAnqyvm4p60nk7HG8Dazg4fvVv4gvzsCyIxio4jPZnZMrn0_vsbEEq7GIWv_JJL4MQ',
        'session-token': '',
        'ubid-main': ''
    }

    links_list = []

    for page in range(1, 21):
        URL = f"{base_url}&page={page}"
        webpage = requests.get(URL, headers=HEADERS, cookies=cookies, timeout=10)
        soup = BeautifulSoup(webpage.content, "html.parser")
        links = soup.find_all("a", attrs={'class': 'a-link-normal s-no-outline'})
        links_list.extend(["https://www.amazon.com" + link.get('href') for link in links])

    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(extract_product_data, links_list))

    amazon_df5 = pd.DataFrame(results)


    amazon_df5["brand"] = amazon_df5["brand"].replace("", pd.NA)
    amazon_df5["brand"] = amazon_df5["brand"].fillna(method="ffill")
    amazon_df5["brand"] = amazon_df5["brand"].fillna(method="bfill")
    amazon_df5["brand"] = amazon_df5["brand"].apply(clean_brand)

    # Category
    amazon_df5["category"] = amazon_df5.apply(
        lambda row: detect_category("", row["title"], row["brand"]),
        axis=1
    )

    amazon_df5['title'].replace('', pd.NA, inplace=True)
    amazon_df5 = amazon_df5.dropna(subset=['title'])
    amazon_df5.rename(columns={'specifications': 'about_product'}, inplace=True)

    amazon_df5.to_csv("amazon_data_optimized.csv", header=True, index=False)
    print("Data saved to amazon_data_optimized.csv")


Data saved to amazon_data_optimized.csv


  amazon_df5["brand"] = amazon_df5["brand"].fillna(method="ffill")
  amazon_df5["brand"] = amazon_df5["brand"].fillna(method="bfill")
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  amazon_df5['title'].replace('', pd.NA, inplace=True)


In [None]:

amazon_df5["brand"] = amazon_df5["brand"].replace("", pd.NA)

amazon_df5["brand"] = amazon_df5["brand"].fillna(method="ffill")

amazon_df5["brand"] = amazon_df5["brand"].fillna(method="bfill")


  amazon_df5["brand"] = amazon_df5["brand"].fillna(method="ffill")
  amazon_df5["brand"] = amazon_df5["brand"].fillna(method="bfill")


In [None]:
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
from pathlib import Path

# Ví dụ DataFrame (thay bằng dữ liệu thực của bạn)
# amazon_df = pd.DataFrame({...})

# Đường dẫn đúng trên Colab
output_path = Path("/content/drive/My Drive/CAPSTONE2 - NHÓM 5/Data")
output_path.mkdir(parents=True, exist_ok=True)
output_file = "MERGED_DATA.csv"
output_full_path = output_path / output_file

print("DataFrame shape:", amazon_df5.shape)
print("Lưu tới:", output_full_path)

amazon_df5.to_csv(output_full_path, header=True, index=False)
print(f"Data saved to {output_full_path}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
DataFrame shape: (5, 11)
Lưu tới: /content/drive/My Drive/CAPSTONE2-NHÓM 5/Data/SAMSUNG WATCH.csv
Data saved to /content/drive/My Drive/CAPSTONE2-NHÓM 5/Data/SAMSUNG WATCH.csv
