In [1]:
pip install selenium webdriver-manager pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import os
print("File đang được lưu tại:", os.getcwd())

File đang được lưu tại: c:\VsCode\Python_AI\code\báo cáo tháng 12


In [1]:
import os
import re
import time
import random
import json
import pandas as pd

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException,
    StaleElementReferenceException,
    WebDriverException,
)

# =========================
# CONFIG
# =========================
SEARCH_URL = "https://cellphones.com.vn/catalogsearch/result?q=laptop%20c%C5%A9"

START_INDEX = 6400
TARGET_LINKS = 1000
TOTAL_NEEDED = START_INDEX + TARGET_LINKS

OUT_FILE = f"cellphones_laptop_specs_{START_INDEX}_{START_INDEX + TARGET_LINKS}.csv"
SEP = ","

OUT_LINKS_FILE = "collected_links.json"
KEEP_BROWSER_AFTER_RUN = True

MAX_RETRY_PER_PRODUCT = 2
PAGE_WAIT = 12

SLEEP_PAGE_MIN = 0.25
SLEEP_PAGE_MAX = 0.55
SLEEP_AFTER_CLICK = 0.25

# ====== REUSE CHROME (FIX TAB/ WINDOW NEW) ======
# Nếu True: code sẽ cố attach vào Chrome đang mở (remote debugging)
ATTACH_EXISTING_CHROME = False
DEBUG_PORT = 9222
# Nên dùng user-data-dir riêng để tránh conflict
USER_DATA_DIR = None

# =========================
# SELENIUM SETUP
# =========================
driver = None
wait = None

def fast_sleep(a=SLEEP_PAGE_MIN, b=SLEEP_PAGE_MAX):
    time.sleep(random.uniform(a, b))

def is_invalid_session(err: Exception) -> bool:
    msg = (str(err) or "").lower()
    return ("invalid session id" in msg) or ("session deleted because of page crash" in msg)

def build_driver_attach():
    """
    Attach vào Chrome đã chạy với --remote-debugging-port=9222
    """
    options = webdriver.ChromeOptions()
    options.add_experimental_option("debuggerAddress", f"127.0.0.1:{DEBUG_PORT}")
    drv = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    w = WebDriverWait(drv, PAGE_WAIT)
    return drv, w

def build_driver_new():
    """
    Mở Chrome mới (có profile cố định để giữ login/cookie nếu cần)
    """
    options = webdriver.ChromeOptions()
    # options.add_argument("--headless=new")
    options.add_argument("--start-maximized")
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--no-sandbox")
    # options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
    options.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36"
    )
    drv = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    w = WebDriverWait(drv, PAGE_WAIT)
    return drv, w

def ensure_driver():
    global driver, wait
    if driver is not None:
        return

    if ATTACH_EXISTING_CHROME:
        try:
            driver, wait = build_driver_attach()
            return
        except Exception as e:
            print(f"[WARN] Không attach được Chrome (port {DEBUG_PORT}). Sẽ mở Chrome mới. Lý do: {e}")

    driver, wait = build_driver_new()

def restart_driver():
    global driver, wait
    try:
        if driver:
            driver.quit()
    except:
        pass
    driver = None
    wait = None
    ensure_driver()

# =========================
# PARSING HELPERS (GIỮ NGUYÊN)
# =========================
def clean_text(text):
    if not text:
        return ""
    text = text.replace('\n', ', ').replace('\r', '').strip()
    return re.sub(r'\s+', ' ', text)

def first_match(text, patterns):
    if not text:
        return ""
    for p in patterns:
        m = re.search(p, text, flags=re.I | re.DOTALL)
        if m:
            return m.group(1).strip() if m.groups() else m.group(0).strip()
    return ""

def parse_cpu(text):
    text = clean_text(text)
    cpu_cores = first_match(text, [r"(\d+)\s*lõi"])
    cpu_threads = first_match(text, [r"(\d+)\s*luồng"])

    cpu_brand = "Other"
    if "Intel" in text: cpu_brand = "Intel"
    elif "AMD" in text: cpu_brand = "AMD"
    elif "Apple" in text or "M1" in text or "M2" in text or "M3" in text: cpu_brand = "Apple"

    cpu_detail = first_match(text, [
        r"(Core\s+i\d\s+[\w\-]+)",
        r"(Ryzen\s+\d\s+[\w\-]+)",
        r"(M\d\s+(?:Pro|Max|Ultra)?)"
    ])
    return cpu_brand, cpu_detail, cpu_cores, cpu_threads

def parse_ram(text):
    text = clean_text(text)
    ram_size = first_match(text, [r"(\d+)\s*GB"])
    ram_type = first_match(text, [r"(DDR\d\w*)", r"(LPDDR\d\w*)"])
    return ram_size, ram_type

def parse_storage(text):
    text = clean_text(text)
    storage = first_match(text, [r"(\d+\s*(?:GB|TB))"])
    storage_type = first_match(text, [r"(SSD|HDD|eMMC)"])
    return storage, storage_type

# =========================
# CORE FUNCTIONS (GIỮ NGUYÊN LOGIC)
# =========================
def click_show_more_specs():
    try:
        element = driver.find_element(By.ID, "thong-so-ky-thuat")
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element)
        fast_sleep(0.15, 0.30)

        btn = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".button__show-modal-technical")))
        driver.execute_script("arguments[0].click();", btn)
        fast_sleep(0.20, 0.35)
        return True
    except Exception:
        return False

def collect_specs_from_dom():
    specs = {}
    try:
        rows = driver.find_elements(By.CSS_SELECTOR, ".technical-content .technical-content-item")
        for row in rows:
            cols = row.find_elements(By.TAG_NAME, "td")
            if len(cols) >= 2:
                key = cols[0].text.strip()
                val = cols[1].text.strip()
                key_clean = key.lower().replace(':', '')
                if key_clean and val:
                    specs[key_clean] = val
    except Exception as e:
        print(f"Lỗi khi crawl bảng thông số: {e}")
    return specs

def extract_one_product(link):
    ensure_driver()
    driver.get(link)

    try:
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".box-product-name h1")))
    except:
        pass
    fast_sleep(0.15, 0.30)

    data = {"link": link, "title": "", "price_sale": "", "price_base": "", "brand": ""}

    try:
        data["title"] = driver.find_element(By.CSS_SELECTOR, ".box-product-name h1").text.strip()
    except:
        pass

    try:
        data["price_sale"] = driver.find_element(By.CSS_SELECTOR, ".box-product-price .sale-price").text.strip()
    except:
        pass

    try:
        data["price_base"] = driver.find_element(By.CSS_SELECTOR, ".box-product-price .base-price").text.strip()
    except:
        pass

    if "macbook" in data["title"].lower() or "apple" in data["title"].lower():
        data["brand"] = "Apple"
    elif data["title"]:
        parts = data["title"].split()
        if len(parts) > 1 and parts[0].lower() == "laptop":
            data["brand"] = parts[1]

    click_show_more_specs()
    raw_specs = collect_specs_from_dom()

    def get_spec(keys):
        for k in keys:
            if k in raw_specs:
                return raw_specs[k]
        return ""

    cpu_text = get_spec(["loại cpu", "bộ vi xử lý", "cpu"])
    ram_text = get_spec(["dung lượng ram", "ram", "bộ nhớ ram"])
    disk_text = get_spec(["ổ cứng", "dung lượng ổ cứng", "bộ nhớ trong"])
    gpu_text = get_spec(["loại card đồ họa", "card đồ họa", "vga"])
    screen_text = get_spec(["kích thước màn hình", "màn hình"])
    res_text = get_spec(["độ phân giải màn hình", "độ phân giải"])
    battery_text = get_spec(["pin", "dung lượng pin"])
    screen_tech_text = get_spec(["công nghệ màn hình"])

    cpu_brand, cpu_detail, cpu_cores, cpu_threads = parse_cpu(cpu_text)
    ram_size, ram_type = parse_ram(ram_text)
    storage_size, storage_type = parse_storage(disk_text)

    raw_ram_type = get_spec(["loại ram"])
    if raw_ram_type:
        ram_type = first_match(raw_ram_type, [r"(DDR\d\w*)", r"(LPDDR\d\w*)"])

    data.update({
        "cpu_brand": cpu_brand,
        "cpu_detail": cpu_detail,
        "cpu_cores": cpu_cores,
        "cpu_threads": cpu_threads,
        "ram_size": ram_size,
        "ram_type": ram_type,
        "storage_size": storage_size,
        "storage_type": storage_type,
        "gpu_info": gpu_text,
        "screen_size": clean_text(screen_text),
        "screen_res": clean_text(res_text),
        "screen_tech": clean_text(screen_tech_text),
        "battery": clean_text(battery_text),
        "raw_specs_count": len(raw_specs)
    })

    return data

# =========================
# COLLECT LINKS (FIX RESUME 851)
# =========================
def _ensure_on_search_page():
    ensure_driver()
    cur = ""
    try:
        cur = driver.current_url or ""
    except:
        pass

    if not cur.startswith(SEARCH_URL):
        driver.get(SEARCH_URL)
        try:
            wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[href]")))
        except:
            pass
        fast_sleep(0.20, 0.40)

def _get_search_container():
    selectors = [
        "div#search-catalog-page",
        "div[id*='search-catalog-page']",
        "main",
        "body"
    ]
    for css in selectors:
        try:
            return driver.find_element(By.CSS_SELECTOR, css)
        except:
            pass
    return driver.find_element(By.TAG_NAME, "body")

def _get_product_links_in_order():
    container = _get_search_container()
    links = []

    anchors = container.find_elements(By.CSS_SELECTOR, "a[href*='/laptop-'][href$='.html']")
    if not anchors:
        anchors = container.find_elements(By.CSS_SELECTOR, "a[href$='.html']")

    for a in anchors:
        try:
            u = a.get_attribute("href")
            if not u:
                continue

            if re.search(r"^https://cellphones\.com\.vn/laptop-[^/]+\.html$", u) is None:
                continue

            if u not in links:
                links.append(u)

        except StaleElementReferenceException:
            continue

    return links

def _click_show_more_button():
    try:
        btn = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "button.button.load-more-btn")))
        driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
        time.sleep(0.10)
        driver.execute_script("arguments[0].click();", btn)
        return True
    except Exception as e:
        print("Không click được nút 'Xem thêm' (load-more-btn):", e)
        return False

def collect_links_from_search(limit, max_click_rounds=1200):
    _ensure_on_search_page()

    # load file links
    links = []
    if os.path.exists(OUT_LINKS_FILE):
        try:
            with open(OUT_LINKS_FILE, "r", encoding="utf-8") as f:
                links = json.load(f) or []
            print(f"Nạp {len(links)} link từ file: {OUT_LINKS_FILE}")
        except Exception as e:
            print("Không thể nạp file links, sẽ thu lại:", e)
            links = []

    if len(links) >= limit:
        return links[:limit]

    # đảm bảo DOM có sản phẩm
    try:
        wait.until(lambda d: len(_get_product_links_in_order()) > 0)
    except:
        pass

    stable_rounds = 0
    last_dom = 0

    for r in range(max_click_rounds):
        if len(links) >= limit:
            print(f"Đã đủ {len(links)}/{limit} link.")
            break

        dom_before = len(_get_product_links_in_order())
        # FIX: stable dựa theo DOM, không dựa theo len(links) khi resume
        print(f"[Round {r+1}] DOM hiện có: {dom_before} | File links: {len(links)} | Need: {limit}")

        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(0.15)

        clicked = _click_show_more_button()
        if not clicked:
            print("Không click được 'Xem thêm'. Dừng.")
            break

        time.sleep(SLEEP_AFTER_CLICK)

        try:
            wait.until(lambda d: len(_get_product_links_in_order()) > dom_before)
        except TimeoutException:
            pass
        except WebDriverException as e:
            if is_invalid_session(e):
                print("Session chết khi chờ DOM tăng. Restart driver và quay lại trang...")
                restart_driver()
                _ensure_on_search_page()
            else:
                raise

        dom_after_links = _get_product_links_in_order()
        dom_after = len(dom_after_links)

        # merge
        before_merge_len = len(links)
        for u in dom_after_links:
            if u not in links:
                links.append(u)

        # save
        try:
            with open(OUT_LINKS_FILE, "w", encoding="utf-8") as f:
                json.dump(links, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print("Không lưu được OUT_LINKS_FILE:", e)

        # FIX: stable dựa theo dom growth
        if dom_after <= dom_before or dom_after == last_dom:
            stable_rounds += 1
        else:
            stable_rounds = 0

        last_dom = dom_after

        # nếu DOM không tăng nhiều vòng thì mới dừng
        if stable_rounds >= 10:
            print("DOM không tăng nữa sau nhiều vòng. Dừng.")
            break

    return links[:limit]

# =========================
# MAIN
# =========================
try:
    print(f"--- Bắt đầu: Cần tổng {TOTAL_NEEDED} link để lấy từ vị trí {START_INDEX} ---")

    all_links = collect_links_from_search(limit=TOTAL_NEEDED)
    links_to_crawl = all_links[START_INDEX: TOTAL_NEEDED]
    print(f"--- Kết quả: Có {len(all_links)} link. Sẽ crawl {len(links_to_crawl)} link ---")

    results = []
    for idx, link in enumerate(links_to_crawl):
        real_idx = START_INDEX + idx + 1
        print(f"[{idx+1}/{len(links_to_crawl)}] (Total Index: {real_idx}) Processing: {link}")

        ok = False
        last_err = None

        for attempt in range(1, MAX_RETRY_PER_PRODUCT + 1):
            try:
                info = extract_one_product(link)
                results.append(info)
                print(f"   -> Success: {info.get('title','')} - {info.get('price_sale','')}")
                ok = True
                break
            except (WebDriverException, Exception) as e:
                last_err = e
                if is_invalid_session(e):
                    print(f"   -> Driver/session chết (attempt {attempt}). Restart driver rồi thử lại...")
                    restart_driver()
                    fast_sleep(0.30, 0.55)
                    continue
                else:
                    print(f"   -> Error (attempt {attempt}): {e}")
                    fast_sleep(0.25, 0.50)

        if not ok:
            print(f"   -> FAILED sau {MAX_RETRY_PER_PRODUCT} lần: {last_err}")
            results.append({"link": link, "title": "", "price_sale": "", "price_base": "", "brand": "",
                            "cpu_brand": "", "cpu_detail": "", "cpu_cores": "", "cpu_threads": "",
                            "ram_size": "", "ram_type": "", "storage_size": "", "storage_type": "",
                            "gpu_info": "", "screen_size": "", "screen_res": "", "screen_tech": "",
                            "battery": "", "raw_specs_count": 0})

    df = pd.DataFrame(results)
    df.to_csv(OUT_FILE, index=False, encoding="utf-8-sig", sep=SEP)
    print(f"\nĐã lưu file: {OUT_FILE}")
    if not df.empty:
        print(df[['title', 'cpu_detail', 'ram_size', 'storage_size', 'gpu_info']].head())

finally:
    if KEEP_BROWSER_AFTER_RUN:
        print("KEEP_BROWSER_AFTER_RUN=True - giữ Chrome mở để lần sau tiếp tục.")
        print("Nếu muốn lần sau dùng đúng tab cũ, hãy dùng chế độ ATTACH_EXISTING_CHROME=True và mở Chrome bằng remote debugging.")
    else:
        try:
            if driver:
                driver.quit()
        except:
            pass


--- Bắt đầu: Cần tổng 7400 link để lấy từ vị trí 6400 ---
Nạp 6454 link từ file: collected_links.json
[Round 1] DOM hiện có: 18 | File links: 6454 | Need: 7400
[Round 2] DOM hiện có: 38 | File links: 6454 | Need: 7400
[Round 3] DOM hiện có: 58 | File links: 6454 | Need: 7400
[Round 4] DOM hiện có: 78 | File links: 6454 | Need: 7400
[Round 5] DOM hiện có: 116 | File links: 6454 | Need: 7400
[Round 6] DOM hiện có: 151 | File links: 6454 | Need: 7400
[Round 7] DOM hiện có: 171 | File links: 6454 | Need: 7400
[Round 8] DOM hiện có: 231 | File links: 6454 | Need: 7400
[Round 9] DOM hiện có: 267 | File links: 6454 | Need: 7400
[Round 10] DOM hiện có: 305 | File links: 6454 | Need: 7400
[Round 11] DOM hiện có: 325 | File links: 6454 | Need: 7400
[Round 12] DOM hiện có: 343 | File links: 6454 | Need: 7400
[Round 13] DOM hiện có: 362 | File links: 6454 | Need: 7400
[Round 14] DOM hiện có: 397 | File links: 6454 | Need: 7400
[Round 15] DOM hiện có: 453 | File links: 6454 | Need: 7400
[Round 16] 

KeyboardInterrupt: 