# Setup and Utilities

In [1]:
!pip install crawl4ai aiohttp nest_asyncio pymongo pillow -q
import nest_asyncio
nest_asyncio.apply()

# Utilities

In [2]:
import re
import os
import json
from datetime import datetime, timezone
from bson import ObjectId


def get_current_mongo_date():
    return {"$date": datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace("+00:00", "Z")}

def get_slug(url):
    match = re.search(r'category/([a-z0-9-]+)-c\d+', url)
    return match.group(1) if match else None

def save_json(data, path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def append_json_list(data, path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    if os.path.exists(path):
        with open(path, "r+", encoding="utf-8") as f:
            existing = json.load(f)
            f.seek(0)
            json.dump(existing + data, f, ensure_ascii=False, indent=2)
    else:
        save_json(data, path)

def load_json_file(filepath):
    if os.path.exists(filepath):
        with open(filepath, 'r', encoding='utf-8') as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                return []
    return []


def save_json_file(filepath, data):
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def append_csv_row(writer, row):
    writer.writerow(row)

def find_or_create_document(name, data_list, json_path, csv_writer):
    # Tìm tên có sẵn (so sánh lowercase, strip)
    name_normalized = name.lower().strip()
    for doc in data_list:
        if doc['name'].lower().strip() == name_normalized:
            return doc['_id']
    
    # Nếu không có, tạo mới
    new_id = str(ObjectId())
    now = get_current_mongo_date()
    new_doc = {
        "_id": {"$oid": new_id},
        "name": name.strip(),
        "createdAt": now,
        "updatedAt": now
    }
    data_list.append(new_doc)
    save_json_file(json_path, data_list)
    
    # Ghi vào CSV
    append_csv_row(csv_writer, {
        "_id": new_id,
        "name": name.strip(),
        "createdAt": now["$date"],
        "updatedAt": now["$date"]
    })

    return {"$oid": new_id}

# Crawl Root Parent Category Link

In [3]:
import asyncio
import os
import json
import csv
from datetime import datetime
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from bson import ObjectId
import re

URL = "https://www.lottemart.vn"

CATEGORIES_CSV_FILE = "output/data/categories_data.csv"
CATEGORIES_DATA_FILE = "output/data/categories_data.json"
CATEGORY_PATHS_JSON = "output/data/category_paths.json"
CATEGORY_PATHS_CSV = "output/data/category_paths.csv"


def get_slug(url):
    match = re.search(r"category/([a-z0-9-]+)-c\d+", url)
    return match.group(1) if match else None


# Hàm hỗ trợ lấy thời gian hiện tại theo định dạng MongoDB
def get_current_mongo_date():
    return {"$date": datetime.utcnow().isoformat(timespec="milliseconds") + "Z"}


# Ghi thêm vào file CSV (nếu chưa có thì tạo mới)
def append_csv(data: list, file_path: str, fieldnames: list):
    if not data:
        return

    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    existing_records = set()

    if os.path.exists(file_path):
        with open(file_path, "r", encoding="utf-8") as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                if "ancestor" in row and "descendant" in row:
                    existing_records.add((row["ancestor"], row["descendant"]))
                elif "_id" in row:
                    existing_records.add(row["_id"])

    new_rows = []
    for row in data:
        if "ancestor" in row and "descendant" in row:
            key = (row["ancestor"], row["descendant"])
            if key not in existing_records:
                new_rows.append(row)
        elif "_id" in row:
            if row["_id"] not in existing_records:
                new_rows.append(row)

    if not new_rows:
        return

    file_exists = os.path.exists(file_path)

    with open(file_path, mode="a", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()
        for row in new_rows:
            writer.writerow(row)


async def crawl_root_parent_category_links(crawler):
    config = CrawlerRunConfig(
        css_selector=".col-menu > ul > li > a",
    )

    os.makedirs("output/links", exist_ok=True)
    os.makedirs(os.path.dirname(CATEGORIES_DATA_FILE), exist_ok=True)
    os.makedirs(os.path.dirname(CATEGORY_PATHS_JSON), exist_ok=True)

    result = await crawler.arun(url=URL, config=config)

    formatted_data = []
    csv_data = []
    closure_json_data = []
    closure_csv_data = []

    now = get_current_mongo_date()

    if result.links and "internal" in result.links:
        for link_obj in result.links["internal"]:
            href = link_obj.get("href")
            name = link_obj.get("text", "").strip()
            slug = get_slug(href)

            if href and name and slug:
                object_id = ObjectId()
                object_id_str = str(object_id)

                # Category document
                doc = {
                    "_id": {"$oid": object_id_str},
                    "href": href,
                    "slug": slug,
                    "name": name,
                    "parentCategory": None,
                    "createdAt": now,
                    "updatedAt": now,
                }
                formatted_data.append(doc)

                csv_data.append(
                    {
                        "_id": object_id_str,
                        "name": name,
                        "slug": slug,
                        "parent_category": "",
                    }
                )

                # Closure Table: Root categories chỉ có self-reference với depth = 0
                closure_json_data.append(
                    {
                        "ancestor": {"$oid": object_id_str},
                        "descendant": {"$oid": object_id_str},
                        "depth": 0,
                    }
                )

                closure_csv_data.append(
                    {"ancestor": object_id_str, "descendant": object_id_str, "depth": 0}
                )

    # Ghi JSON gốc và cũng là file tổng
    with open(CATEGORIES_DATA_FILE, "w", encoding="utf-8") as f:
        json.dump(formatted_data, f, ensure_ascii=False, indent=2)

    # Ghi thêm file JSON riêng nếu muốn tách riêng
    with open("output/links/root_parent_cat_links.json", "w", encoding="utf-8") as f:
        json.dump(formatted_data, f, ensure_ascii=False, indent=2)

    # Ghi CSV categories
    append_csv(
        csv_data, CATEGORIES_CSV_FILE, ["_id", "name", "slug", "parent_category"]
    )

    # Ghi Closure Table files
    with open(CATEGORY_PATHS_JSON, "w", encoding="utf-8") as f:
        json.dump(closure_json_data, f, ensure_ascii=False, indent=2)

    append_csv(
        closure_csv_data, CATEGORY_PATHS_CSV, ["ancestor", "descendant", "depth"]
    )

    print(f"Collected {len(formatted_data)} parent categories")
    print(f"Saved to: {CATEGORIES_DATA_FILE} and {CATEGORIES_CSV_FILE}")
    print(f"Closure table saved to: {CATEGORY_PATHS_JSON} and {CATEGORY_PATHS_CSV}")


async def main():
    async with AsyncWebCrawler() as crawler:
        await crawl_root_parent_category_links(crawler)


if __name__ == "__main__":
    asyncio.run(main())

Collected 27 parent categories
Saved to: output/data/categories_data.json and output/data/categories_data.csv
Closure table saved to: output/data/category_paths.json and output/data/category_paths.csv


# Crawl All Category Links

In [None]:
import asyncio
import os
import json
import csv
import signal
import time
from datetime import datetime, timezone as UTC
from bson import ObjectId
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, BrowserConfig

# ====== Files (giữ tên/đường dẫn như notebook hiện có) ======
ROOT_PARENT_CATEGORIES_FILE = "output/links/root_parent_cat_links.json"
CATEGORIES_DATA_FILE        = "output/data/categories_data.json"
LAST_CATEGORIES_FILE        = "output/links/last_categories_data.json"
CATEGORIES_CSV_FILE         = "output/data/categories_data.csv"
CATEGORY_PATHS_JSON         = "output/data/category_paths.json"
CATEGORY_PATHS_CSV          = "output/data/category_paths.csv"

# ====== Checkpoint/Progress files ======
PROGRESS_FILE              = "output/links/crawl_progress_point.json"
COMPLETED_ROOTS_FILE       = "output/links/completed_roots.json"

# ====== Browser config ======
browser_config = BrowserConfig(headless=True, verbose=True)

# ====== Tham số watchdog ======
STALL_SECONDS   = 120          # nếu quá thời gian này không có tiến triển -> checkpoint & dừng
SLEEP_BEFORE_EXIT = 5          # ngủ vài giây trước khi thoát để tránh spam
PRINT_EVERY     = 50           # in log mỗi khi ghi thêm ~50 categories

def load_json(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)

def append_csv(data, path, fieldnames):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    
    if not data:
        return

    write_header = not os.path.exists(path)
    with open(path, "a", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if write_header:
            writer.writeheader()
        writer.writerows(data)

def get_selector(level, has_children):
    if level == 0:
        return (
            ".aside-left.offcanvas-body > .f-item.opened .all-cates > li.sub > a"
            if has_children else
            ".aside-left.offcanvas-body > .f-item.opened .all-cates > li:not(.sub) > a"
        )
    else:
        return (
            ".all-cates.ps-3 li.sub.active > .subcate > .inner > ul > li.sub > a"
            if has_children else
            ".all-cates.ps-3 li.sub.active > .subcate > .inner > ul > li:not(.sub) > a"
        )

async def crawl_category_links(crawler, url, level, has_children):
    """Crawl categories với selector cụ thể (có hoặc không có children)"""
    selector = get_selector(level, has_children)
    
    # Wait condition: đợi menu category được load
    wait_condition = """() => {
      const menu = document.querySelector('.aside-left.offcanvas-body .cate-menu-list');
      return menu !== null && menu.children.length > 0;
    }"""

    config = CrawlerRunConfig(
        css_selector=selector,
        page_timeout=60000,  # 60 giây (đủ cho trang load)
        wait_for=f"js:{wait_condition}",
        delay_before_return_html=2.0,  # Đợi 2 giây sau khi condition đạt để đảm bảo menu render xong
        stream=False,  # Không cần stream cho crawl links đơn giản
        remove_overlay_elements=True,  # Loại bỏ popup/overlay có thể che menu
        js_code=[
            # Scroll để trigger lazy load nếu có
            "window.scrollTo(0, 300);",
            "await new Promise(r => setTimeout(r, 500));",
        ],
    )
    
    try:
        result = await crawler.arun(url=url, config=config)
        
        links = []
        if result.links and 'internal' in result.links:
            for link_obj in result.links['internal']:
                href = link_obj.get("href")
                name = link_obj.get("text", "").strip()
                if href and name:
                    links.append({"name": name, "href": href})
        
        return links
    
    except Exception as e:
        print(f"Error crawling {url} at level {level}: {str(e)}")
        return []

def _oid_to_str(_id):
    """Chuyển _id có thể là dict {'$oid': ...} hoặc ObjectId hoặc str -> str."""
    if isinstance(_id, dict) and "$oid" in _id:
        return _id["$oid"]
    if isinstance(_id, ObjectId):
        return str(_id)
    return str(_id)

def _ensure_files():
    for p in [
        ROOT_PARENT_CATEGORIES_FILE, CATEGORIES_DATA_FILE, LAST_CATEGORIES_FILE,
        CATEGORIES_CSV_FILE, CATEGORY_PATHS_JSON, CATEGORY_PATHS_CSV
    ]:
        os.makedirs(os.path.dirname(p), exist_ok=True)

    # Tập tin completed_roots/progress có thể chưa tồn tại
    if not os.path.exists(COMPLETED_ROOTS_FILE):
        save_json([], COMPLETED_ROOTS_FILE)

def _load_completed_roots():
    try:
        return set(load_json(COMPLETED_ROOTS_FILE) or [])
    except Exception:
        return set()

def _save_completed_roots(s):
    save_json(sorted(list(s)), COMPLETED_ROOTS_FILE)

def _load_progress():
    if not os.path.exists(PROGRESS_FILE):
        return None
    try:
        return load_json(PROGRESS_FILE)
    except Exception:
        return None

def _save_progress(state):
    save_json(state, PROGRESS_FILE)
    print(f"[checkpoint] Saved progress to {PROGRESS_FILE} (root_idx={state.get('root_idx')}, stack={len(state.get('stack', []))})")

def _clear_progress():
    if os.path.exists(PROGRESS_FILE):
        os.remove(PROGRESS_FILE)

def _mk_category_doc(href, name, slug, parent_oid):
    now = get_current_mongo_date()
    oid = ObjectId()
    return {
        "_id": {"$oid": str(oid)},
        "href": href,
        "slug": slug,
        "name": name,
        "parentCategory": {"$oid": parent_oid} if parent_oid else None,
        "createdAt": now,
        "updatedAt": now,
    }

def _mk_closure_rows(parent_closures, child_oid):
    """
    Từ parent_closures (danh sách dict {'ancestor': {'$oid': ...}, 'depth': int})
    tạo ra closure mới cho child: tất cả depth +1, kèm self-depth=0.
    Trả về (closures_json, closures_csv_rows)
    """
    closures_json = []
    closures_csv = []

    # self reference depth=0
    closures_json.append({"ancestor": {"$oid": child_oid}, "descendant": {"$oid": child_oid}, "depth": 0})
    closures_csv.append({"ancestor": child_oid, "descendant": child_oid, "depth": 0})

    # kế thừa từ parent, tăng depth
    for c in parent_closures:
        anc = _oid_to_str(c["ancestor"]["$oid"]) if isinstance(c["ancestor"], dict) else _oid_to_str(c["ancestor"])
        depth = int(c.get("depth", 0)) + 1
        closures_json.append({"ancestor": {"$oid": anc}, "descendant": {"$oid": child_oid}, "depth": depth})
        closures_csv.append({"ancestor": anc, "descendant": child_oid, "depth": depth})
    return closures_json, closures_csv

def _now_iso():
    return datetime.now(UTC).isoformat()

async def crawl_all_categories_with_checkpoint():
    _ensure_files()

    if not os.path.exists(ROOT_PARENT_CATEGORIES_FILE):
        print(f"[error] Missing root categories: {ROOT_PARENT_CATEGORIES_FILE}")
        return

    # 1) Tải danh sách root
    root_categories = load_json(ROOT_PARENT_CATEGORIES_FILE) or []
    if not root_categories:
        print("[warn] No root categories found.")
        return

    # 2) Khôi phục checkpoint nếu có
    progress = _load_progress()
    completed_roots = _load_completed_roots()

    if progress is None:
        # Khởi tạo state mới
        start_root_idx = 0
        stack = []   # mỗi item: {"id","href","name","level","closures":[...]}
        seen = set() # tránh lặp trong 1 phiên
        categories_written = 0
        last_progress_ts = time.time()
    else:
        start_root_idx     = int(progress.get("root_idx", 0))
        stack              = progress.get("stack", [])
        seen               = set(progress.get("seen_ids", []))
        categories_written = int(progress.get("categories_written", 0))
        last_progress_ts   = progress.get("last_progress_ts", time.time())
        if isinstance(last_progress_ts, str):
            # nếu lưu chuỗi, convert thành epoch  (safe fallback)
            try:
                last_progress_ts = datetime.fromisoformat(last_progress_ts.replace("Z", "+00:00")).timestamp()
            except Exception:
                last_progress_ts = time.time()
        print(f"[resume] Resuming at root_idx={start_root_idx}, stack={len(stack)}, seen={len(seen)}")

    # 3) Crawler 1 phiên
    async with AsyncWebCrawler(config=browser_config) as crawler:
        # Duyệt từng root
        for root_idx in range(start_root_idx, len(root_categories)):
            root_cat = root_categories[root_idx]

            root_oid = _oid_to_str(root_cat["_id"])
            if root_oid in completed_roots:
                continue

            # Nếu stack trống (tức là mới root này) -> push root
            if not stack:
                stack = [{
                    "id": root_oid,
                    "href": root_cat.get("href"),
                    "name": root_cat.get("name", "Unknown"),
                    "level": 0,
                    "closures": [{"ancestor": {"$oid": root_oid}, "depth": 0}],
                }]

            print(f"[root {root_idx+1}/{len(root_categories)}] Start: {root_cat.get('name', 'Unknown')} | stack={len(stack)}")

            while stack:
                # Watchdog: nếu stall -> checkpoint và thoát
                if (time.time() - last_progress_ts) > STALL_SECONDS:
                    state = {
                        "root_idx": root_idx,          # vẫn đang ở root này
                        "stack": stack,
                        "seen_ids": sorted(list(seen)),
                        "categories_written": categories_written,
                        "last_progress_ts": _now_iso(),
                    }
                    _save_progress(state)
                    print(f"[stall] No progress > {STALL_SECONDS}s. Pausing run. (wrote={categories_written})")
                    await asyncio.sleep(SLEEP_BEFORE_EXIT)
                    return

                node = stack.pop()  # DFS
                node_id   = node["id"]
                node_href = node["href"]
                node_name = node["name"]
                level     = node["level"]
                closures  = node["closures"]

                if not node_href:
                    continue
                if node_id in seen:
                    continue
                seen.add(node_id)

                # Crawl con (có sub và không có sub)
                children_with_sub    = await crawl_category_links(crawler, node_href, level=level, has_children=True)
                children_without_sub = await crawl_category_links(crawler, node_href, level=level, has_children=False)

                children = []
                if children_with_sub:
                    children.extend(children_with_sub)
                if children_without_sub:
                    children.extend(children_without_sub)

                # Normalize children (loại None, trùng)
                uniq = {}
                for ch in children or []:
                    href = ch.get("href")
                    name = ch.get("name", "").strip()
                    slug = get_slug(href) if href else None
                    if href and name and slug:
                        uniq[href] = (href, name, slug)

                if not uniq:
                    # Leaf
                    append_json_list([{
                        "_id": {"$oid": node_id},
                        "href": node_href,
                        "name": node_name,
                    }], LAST_CATEGORIES_FILE)
                    continue

                # Có con -> tạo doc + closure cho từng child, append file
                cat_rows_csv = []
                cat_rows_json = []
                closure_rows_json = []
                closure_rows_csv  = []

                pushed = 0
                for href, (href, name, slug) in uniq.items():
                    # Lập doc child
                    doc = _mk_category_doc(href, name, slug, parent_oid=node_id)
                    child_oid = _oid_to_str(doc["_id"])

                    # Closure cho child
                    cjson, ccsv = _mk_closure_rows(closures, child_oid)
                    closure_rows_json.extend(cjson)
                    closure_rows_csv.extend(ccsv)

                    # Lưu category
                    cat_rows_json.append(doc)
                    cat_rows_csv.append({
                        "_id": child_oid, "href": href, "slug": slug, "name": name,
                        "parentCategory": node_id,
                        "createdAt": doc["createdAt"]["$date"] if isinstance(doc["createdAt"], dict) else doc["createdAt"],
                        "updatedAt": doc["updatedAt"]["$date"] if isinstance(doc["updatedAt"], dict) else doc["updatedAt"],
                    })

                    # Push child vào stack để duyệt tiếp
                    child_closures = [{"ancestor": {"$oid": child_oid}, "depth": 0}] + [
                        {"ancestor": {"$oid": _oid_to_str(c["ancestor"]["$oid"]) if isinstance(c["ancestor"], dict) else _oid_to_str(c["ancestor"])},
                         "depth": int(c.get("depth", 0)) + 1}
                        for c in closures
                    ]
                    stack.append({
                        "id": child_oid,
                        "href": href,
                        "name": name,
                        "level": level + 1,
                        "closures": child_closures,
                    })
                    pushed += 1

                # Ghi file (streaming)
                if cat_rows_json:
                    append_json_list(cat_rows_json, CATEGORIES_DATA_FILE)
                if cat_rows_csv:
                    append_csv(cat_rows_csv, CATEGORIES_CSV_FILE,
                               ["_id","href","slug","name","parentCategory","createdAt","updatedAt"])
                if closure_rows_json:
                    append_json_list(closure_rows_json, CATEGORY_PATHS_JSON)
                if closure_rows_csv:
                    append_csv(closure_rows_csv, CATEGORY_PATHS_CSV, ["ancestor","descendant","depth"])

                categories_written += len(cat_rows_json)
                last_progress_ts = time.time()
                if categories_written % PRINT_EVERY == 0 and categories_written > 0:
                    print(f"[progress] wrote {categories_written} categories. stack={len(stack)}")

                # Cập nhật checkpoint nhẹ (tối thiểu) theo nhịp,
                # tránh ghi quá nhiều: chỉ khi stack lớn hoặc vừa ghi nhiều
                if categories_written % (PRINT_EVERY * 4) == 0 or len(stack) > 500:
                    state = {
                        "root_idx": root_idx,
                        "stack": stack,
                        "seen_ids": sorted(list(seen)),
                        "categories_written": categories_written,
                        "last_progress_ts": _now_iso(),
                    }
                    _save_progress(state)

            # Hết stack => root hoàn tất
            completed_roots.add(root_oid)
            _save_completed_roots(completed_roots)
            # Xóa progress vì root này xong rồi
            _clear_progress()
            print(f"[root done] {root_cat.get('name','Unknown')}")

    # 4) Nếu đến đây nghĩa là toàn bộ root đã xong
    _clear_progress()
    print("[done] All root categories are fully crawled to leaf.")

# ——— Entry Point ———
if __name__ == "__main__":
    try:
        asyncio.run(crawl_all_categories_with_checkpoint())
    except KeyboardInterrupt:
        # Nếu người dùng bấm STOP, vẫn cố lưu checkpoint tối thiểu
        print("\n[interrupt] KeyboardInterrupt detected, saving checkpoint...")
        # Không có state cục bộ ở đây; nếu muốn “bắt” state live,
        # có thể chuyển state ra phạm vi toàn cục. Ở phiên bản này,
        # rely vào checkpoint đã lưu theo nhịp ở trên.
        time.sleep(SLEEP_BEFORE_EXIT)
        raise

[done] All root categories are fully crawled to leaf.


In [None]:
# Kiểm tra file CSV (đếm chay 25/9/2025 là 296 categories)
count = 0
with open("output/data/categories_data.csv", "r", encoding="utf-8") as f:
    for _ in f:
        count += 1

print("Tổng số dòng (bao gồm header) =", count)
print("Số categories (bỏ header)    =", count - 1)

Tổng số dòng (bao gồm header) = 420
Số categories (bỏ header)    = 419


# Crawl All Product Links

In [None]:
import asyncio
import os
import json
from urllib.parse import urlparse
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode, BrowserConfig

LAST_CATEGORIES_FILE = 'output/links/last_categories_data.json'
MAX_RETRIES = 3

async def crawl_products_in_category(crawler, category_doc):
    category_url = category_doc.get('href')
    category_id = category_doc.get('_id', {}).get('$oid')

    if not category_url or not category_id:
        print("[SKIPPED] Missing href or _id in category document.")
        return

    js_code = """
    (function() {
        if (!window._crawl4ai_scrollIntervalStarted) {
            window._crawl4ai_scrollIntervalStarted = true;

            window.scrollTo(0, document.body.scrollHeight);
            console.log('Initial scroll executed.');

            window._crawl4ai_scrollIntervalId = setInterval(() => {
                const currentHeight = document.body.scrollHeight;
                window.scrollTo(0, currentHeight);
            }, 2000);
        }
    })();
    """

    wait_condition = """() => {
        const totalElement = document.querySelector('.txt-number .red-color');
        const totalText = totalElement ? totalElement.textContent.trim() : '0';
        const totalExpected = Number(totalText) || 0;

        const loadedItems = document.querySelectorAll('.proudct-list .col'); 
        const loadedCount = loadedItems.length;

        if (totalExpected > 0 && loadedCount >= totalExpected) {
            if (window._crawl4ai_scrollIntervalId) {
                clearInterval(window._crawl4ai_scrollIntervalId);
                delete window._crawl4ai_scrollIntervalId;
            }
            if (window._crawl4ai_scrollIntervalStarted) {
                delete window._crawl4ai_scrollIntervalStarted;
            }
            return true;
        }
        return false;
    }"""

    config = CrawlerRunConfig(
        cache_mode=CacheMode.DISABLED,
        js_code=js_code,
        # page_timeout=60 * 60 * 1000,
        wait_for=f"js:{wait_condition}",
        exclude_external_links=True,
        
        stream=True
    )

    retries = 0
    while retries <= MAX_RETRIES:
        try:
            result = await crawler.arun(url=category_url, config=config)
            await asyncio.sleep(2)

            hrefs = []
            if result.links and 'internal' in result.links:
                for link_obj in result.links['internal']:
                    href = link_obj.get('href')
                    if href and '/product/' in href:
                        hrefs.append(href)

            print(f"[OK] Crawled {len(hrefs)} product links from: {category_url}")

            # === Lưu vào thư mục riêng theo _id ===
            category_output_dir = os.path.join('output/links/products', category_id)
            os.makedirs(category_output_dir, exist_ok=True)

            output_file_path = os.path.join(category_output_dir, 'product_links.json')
            with open(output_file_path, 'w', encoding='utf-8') as f:
                json.dump(hrefs, f, ensure_ascii=False, indent=2)

            return

        except Exception as e:
            retries += 1
            print(f"[Retry {retries}/{MAX_RETRIES}] Failed to crawl {category_url}: {e}")
            await asyncio.sleep(2)

    print(f"[FAILED] Giving up on {category_url} after {MAX_RETRIES} retries.")

browser_config = BrowserConfig(
    headless=False,
    verbose=True
)


async def main():
    async with AsyncWebCrawler(config=browser_config) as crawler:
        with open(LAST_CATEGORIES_FILE, 'r', encoding='utf-8') as f:
            categories_data = json.load(f)

        for category in categories_data:
            await crawl_products_in_category(crawler, category)

    print("[DONE] Finished crawling all categories.")

if __name__ == "__main__":
    asyncio.run(main())

# Crawl Product Detail

In [None]:
import os
import json
import csv
import re
import time
import asyncio
from urllib.parse import urlparse
from datetime import datetime
from bson import ObjectId

from crawl4ai import (
    AsyncWebCrawler,
    CrawlerRunConfig,
    CacheMode,
    MemoryAdaptiveDispatcher,
    BrowserConfig,
)
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy

# ====== PATHS ======
BASE_PRODUCTS_LINKS_FOLDER = "output/links/products"  # folder chứa <category_id>/product_links.json
BASE_OUTPUT_FOLDER         = "output/data"
BRANDS_JSON_PATH           = os.path.join(BASE_OUTPUT_FOLDER, "brands_data.json")
COUNTRIES_JSON_PATH        = os.path.join(BASE_OUTPUT_FOLDER, "countries_of_origin.json")
PROGRESS_FILE              = os.path.join(BASE_OUTPUT_FOLDER, "crawl_progress.json")

PRODUCTS_CSV_FILE          = os.path.join(BASE_OUTPUT_FOLDER, "products_data.csv")
PRODUCTS_JSONL_FILE        = os.path.join(BASE_OUTPUT_FOLDER, "products_data.jsonl")
SPECS_JSONL_FILE           = os.path.join(BASE_OUTPUT_FOLDER, "specifications.jsonl")

# ====== RUNTIME CONFIG ======
CHUNK_SIZE            = 50     # số URL xử lý mỗi batch (giảm nếu RAM thấp)
CHECKPOINT_INTERVAL   = 25     # checkpoint sau mỗi N sản phẩm thành công
STALL_SECONDS         = 180    # nếu không có tiến triển trong N giây -> checkpoint & dừng
PAGE_TIMEOUT_MS       = 45000  # timeout mỗi trang
DELAY_BEFORE_RETURN   = 1.2
MAX_SESSIONS          = 3      # đồng thời (giảm về 1 nếu vẫn hết RAM)

# ====== SCHEMA (tuỳ layout trang) ======
product_schema = {
    "name": "Product Details",
    "baseSelector": "div.product-custom.product-single",
    "fields": [
        {"name": "name", "selector": "h2.field-name",  "type": "text"},
        {"name": "price", "selector": "div.field-price","type": "text"},
        {"name": "description", "selector": "div.short-desc", "type": "text"},
        {
            "name": "specifications",
            "type": "list",
            "selector": "table > tbody > tr",
            "fields": [
                {"name": "name",  "selector": "th", "type": "text"},
                {"name": "value", "selector": "td", "type": "text"},
            ],
        }
    ],
}

# ====== UTILS ======
def ensure_dirs():
    os.makedirs(BASE_OUTPUT_FOLDER, exist_ok=True)

def normalize_url(u):
    try:
        p = urlparse(u)
        return f"{p.scheme}://{p.netloc}{p.path}".rstrip("/")
    except:
        return u

def extract_barcode(url):
    path = urlparse(url).path
    # ví dụ: /vi/product/sua-chua-xyz-123456.html -> tách "-123456" hoặc theo cấu trúc bạn đã dùng
    parts = path.rstrip("/").split("-")
    return parts[-2] if len(parts) >= 2 else None

def clean_price(raw_price: str) -> int:
    if not raw_price:
        return 0
    s = re.sub(r"-\d+%", "", raw_price)
    s = re.sub(r"[^\d]", "", s)
    return int(s) if s else 0

def normalize_numeric_value(value_str):
    if not value_str or not isinstance(value_str, str):
        return value_str
    s = re.sub(r"[^\d.]", "", value_str)
    try:
        v = float(s)
        return int(v) if v.is_integer() else float(f"{v:.10f}".rstrip("0").rstrip("."))
    except:
        return value_str

def append_jsonl(path, obj):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def load_or_init_list(path):
    data = load_json_file(path)
    return data if isinstance(data, list) else []

def find_or_create_document(name, data_list, json_path, csv_writer):
    if not name or not name.strip():
        return None
    key = name.lower().strip()
    for doc in data_list:
        if doc.get("name","").lower().strip() == key:
            return {"$oid": doc["_id"]["$oid"]}

    oid = str(ObjectId())
    now = get_current_mongo_date()
    doc = {"_id":{"$oid": oid}, "name": name.strip(), "createdAt": now, "updatedAt": now}
    data_list.append(doc)
    save_json_file(json_path, data_list)

    if csv_writer is not None:
        try:
            csv_writer.writerow({"_id": oid, "name": name.strip()})
        except Exception as e:
            print(f"[WARN] CSV write failed ({json_path}): {e}")

    return {"$oid": oid}

# ====== PROGRESS (checkpoint) ======
def load_progress():
    dflt = {
        "processed_urls": [],          # list[str] các URL đã xử lý (đã chuẩn hoá)
        "current_category": None,      # category_id hiện tại
        "cursor": 0,                   # index link đang xử lý trong category hiện tại
        "total_processed": 0,
        "total_failed": 0,
    }
    data = load_json_file(PROGRESS_FILE)
    if not data:
        return dflt
    # đảm bảo kiểu dữ liệu
    for k in dflt:
        data.setdefault(k, dflt[k])
    if not isinstance(data["processed_urls"], list):
        data["processed_urls"] = []
    if not isinstance(data["cursor"], int):
        data["cursor"] = 0
    if not isinstance(data["total_processed"], int):
        data["total_processed"] = 0
    if not isinstance(data["total_failed"], int):
        data["total_failed"] = 0
    return data

def save_progress(pg):
    save_json_file(PROGRESS_FILE, pg)

# ====== PARSER ======
def parse_extracted_to_docs(result, category_id, brands_writer, countries_writer, brands_data, countries_data):
    """
    Nhận result từ arun_many (đã có .extracted_content), trả về:
      - products_csv_row (dict)
      - product_json (dict) -> để ghi vào JSONL
      - specs_json (dict)   -> để ghi vào JSONL
    """
    data = json.loads(result.extracted_content or "[]")
    if not data:
        return None, None, None

    item = data[0]
    barcode     = extract_barcode(result.url)
    name        = item.get("name", "") or ""
    price       = clean_price(item.get("price", ""))
    description = item.get("description", "") or ""
    specs       = item.get("specifications", []) or []

    # tách brand & country_of_origin
    brand_name = ""
    country_name = ""
    for s in specs:
        sn = (s.get("name") or "").lower().strip()
        sv = (s.get("value") or "")
        if sn in ["nhãn hiệu", "thương hiệu", "brand", "nhà sản xuất"]:
            brand_name = sv; continue
        if sn in ["nơi sản xuất", "xuất xứ", "quốc gia", "origin", "country of origin"]:
            country_name = sv; continue

    brand_id = find_or_create_document(brand_name,  brands_data, BRANDS_JSON_PATH,   brands_writer)   if brand_name   else None
    ctry_id  = find_or_create_document(country_name, countries_data, COUNTRIES_JSON_PATH, countries_writer) if country_name else None

    # lọc specs còn lại + normalize
    rest_specs = []
    for s in specs:
        sn = (s.get("name") or "").lower().strip()
        sv = s.get("value")
        if sn in ["nhãn hiệu", "thương hiệu", "brand", "nhà sản xuất", "nơi sản xuất", "xuất xứ", "quốc gia", "origin", "country of origin"]:
            continue
        rest_specs.append({"name": s.get("name",""), "value": normalize_numeric_value(sv)})

    now = get_current_mongo_date()
    pid = str(ObjectId())

    # CSV row (lean fields)
    csv_row = {
        "_id": pid,
        "barcode": barcode,
        "name": name,
        "price": price,
        "description": description,
        "brand": (brand_id or {}).get("$oid"),
        "country_of_origin": (ctry_id or {}).get("$oid"),
        "category": category_id,
    }

    product_json = {
        "_id": {"$oid": pid},
        "barcode": barcode,
        "name": name,
        "price": price,
        "description": description,
        "brand": brand_id,
        "countryOfOrigin": ctry_id,
        "category": {"$oid": category_id},
        "attributes": rest_specs,
        "createdAt": now,
        "updatedAt": now,
    }

    specs_json = {
        "product": {"$oid": pid},
        "specifications": rest_specs
    }

    return csv_row, product_json, specs_json

async def iter_arun_many(crawler, urls, config, dispatcher=None):
    res = crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher)
    # Nếu là async generator (có __aiter__), duyệt trực tiếp
    if hasattr(res, "__aiter__"):
        async for r in res:
            yield r
    else:
        # Nếu là coroutine -> await để lấy list, rồi duyệt for thường
        results = await res
        for r in results:
            yield r

# ====== CRAWL 1 CATEGORY (theo batch, streaming IO, checkpoint) ======
async def crawl_products_for_category(category_id, products_writer, brands_writer, countries_writer,
                                     brands_data, countries_data, progress):

    product_links_path = os.path.join(BASE_PRODUCTS_LINKS_FOLDER, category_id, "product_links.json")
    if not os.path.exists(product_links_path):
        print(f"[SKIP] Missing product_links.json for {category_id}")
        return

    with open(product_links_path, "r", encoding="utf-8") as f:
        all_links = json.load(f) or []

    processed = set(progress["processed_urls"])
    links = [u for u in all_links if normalize_url(u) not in processed]

    start_idx = 0
    if progress["current_category"] == category_id:
        start_idx = min(progress["cursor"], len(links))

    print(f"[CATEGORY] {category_id}: {len(links)} pending (cursor={start_idx})")

    for start in range(start_idx, len(links), CHUNK_SIZE):
        batch = links[start:start+CHUNK_SIZE]

        # checkpoint vị trí
        progress["current_category"] = category_id
        progress["cursor"] = start
        save_progress(progress)

        browser_config = BrowserConfig(headless=True, verbose=False)
        run_config = CrawlerRunConfig(
            cache_mode=CacheMode.ENABLED,
            extraction_strategy=JsonCssExtractionStrategy(product_schema),
            page_timeout=PAGE_TIMEOUT_MS,
            delay_before_return_html=DELAY_BEFORE_RETURN,
            stream=False,
        )
        dispatcher = MemoryAdaptiveDispatcher(max_session_permit=MAX_SESSIONS)

        async with AsyncWebCrawler(config=browser_config) as crawler:
            async for result in iter_arun_many(crawler, batch, run_config, dispatcher):
                url_norm = normalize_url(result.url)
                if result.success and result.extracted_content and result.extracted_content != "[]":
                    rows = parse_extracted_to_docs(
                        result,
                        category_id,
                        brands_writer,
                        countries_writer,
                        brands_data,
                        countries_data
                    )
                    if rows:
                        csv_row, product_json, specs_json = rows
                        products_writer.writerow(csv_row)
                        append_jsonl(PRODUCTS_JSONL_FILE, product_json)
                        append_jsonl(SPECS_JSONL_FILE,  specs_json)

                        progress["processed_urls"].append(url_norm)
                        progress["total_processed"] += 1
                        if (progress["total_processed"] % CHECKPOINT_INTERVAL) == 0:
                            save_progress(progress)
                else:
                    progress["total_failed"] += 1


        # checkpoint sau batch + dọn RAM
        save_progress(progress)
        import gc; gc.collect()
        await asyncio.sleep(0.5)

    print(f"[DONE CAT] {category_id}")

# ====== MAIN ======
async def main():
    ensure_dirs()

    # mở các file CSV/JSONL ở chế độ append (resume-friendly)
    products_csv_exists = os.path.exists(PRODUCTS_CSV_FILE)
    brands_json = load_or_init_list(BRANDS_JSON_PATH)
    countries_json = load_or_init_list(COUNTRIES_JSON_PATH)

    # liệt kê category
    if not os.path.isdir(BASE_PRODUCTS_LINKS_FOLDER):
        print(f"Missing folder: {BASE_PRODUCTS_LINKS_FOLDER}")
        return
    category_ids = [d for d in os.listdir(BASE_PRODUCTS_LINKS_FOLDER)
                    if os.path.isdir(os.path.join(BASE_PRODUCTS_LINKS_FOLDER, d))]
    if not category_ids:
        print("No category folders found.")
        return

    progress = load_progress()
    print(f"[RESUME] processed_urls={len(progress['processed_urls'])}, total_processed={progress['total_processed']}, total_failed={progress['total_failed']}")

    # Nếu có current_category, resume từ đó trong order category_ids
    start_cat_idx = 0
    if progress["current_category"] and progress["current_category"] in category_ids:
        start_cat_idx = category_ids.index(progress["current_category"])

    with open(PRODUCTS_CSV_FILE, "a", encoding="utf-8", newline="") as f_products, \
         open(os.path.join(BASE_OUTPUT_FOLDER, "brands_data.csv"), "a", encoding="utf-8", newline="") as f_brands_csv, \
         open(os.path.join(BASE_OUTPUT_FOLDER, "countries_of_origin.csv"), "a", encoding="utf-8", newline="") as f_countries_csv:

        products_writer = csv.DictWriter(
            f_products,
            fieldnames=["_id","barcode","name","price","description","brand","country_of_origin","category"],
            quoting=csv.QUOTE_MINIMAL
        )
        if not products_csv_exists:
            products_writer.writeheader()

        brands_writer    = csv.DictWriter(f_brands_csv,    fieldnames=["_id","name"])
        countries_writer = csv.DictWriter(f_countries_csv, fieldnames=["_id","name"])
        # headers cho brands/countries nếu file trống
        if f_brands_csv.tell() == 0:    brands_writer.writeheader()
        if f_countries_csv.tell() == 0: countries_writer.writeheader()

        # duyệt category
        for cat_id in category_ids[start_cat_idx:]:
            progress["current_category"] = cat_id
            progress["cursor"] = 0
            save_progress(progress)

            await crawl_products_for_category(
                category_id=cat_id,
                products_writer=products_writer,
                brands_writer=brands_writer,
                countries_writer=countries_writer,
                brands_data=brands_json,
                countries_data=countries_json,
                progress=progress,
            )

            # sau khi xong cat -> reset vị trí, lưu brands/countries
            progress["current_category"] = None
            progress["cursor"] = 0
            save_progress(progress)
            save_json_file(BRANDS_JSON_PATH,    brands_json)
            save_json_file(COUNTRIES_JSON_PATH, countries_json)

    print(f"Total_processed: {progress['total_processed']}")
    print(f"Total_failed:    {progress['total_failed']}")
    print(f"Brands:          {len(brands_json)}")
    print(f"Countries:       {len(countries_json)}")
    print("Done!")

if __name__ == "__main__":
    asyncio.run(main())
