In [1]:
#CELL 1

import os, re, json, time, asyncio, random, hashlib
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlsplit, urlunsplit

import httpx
import pandas as pd
import aiosqlite
from dateutil import parser as dateparser
from langgraph.graph import StateGraph, START, END

# =========================
# Config / Tunables (minimal)
# =========================
INPUT_PATH = os.environ.get("SHOP_CRAWLER_INPUT", "D:/museai/data/Indian d2c brands.xlsx")
OUT_DIR    = os.environ.get("SHOP_CRAWLER_OUT", "./out")
DB_PATH    = os.environ.get("SHOP_CRAWLER_DB", "D:/museai/data/db/crawler_meta.db")

# LLM — compulsory
LLM_MODEL       = os.environ.get("LLM_MODEL", "gpt-5-nano")
OPENAI_API_KEY  = os.environ.get("OPENAI_API_KEY")

# Politeness / concurrency
GLOBAL_CONCURRENCY   = int(os.environ.get("GLOBAL_CONCURRENCY", 20))
PER_HOST_CONCURRENCY = int(os.environ.get("PER_HOST_CONCURRENCY", 2))
HOST_GAP_SECONDS     = float(os.environ.get("HOST_GAP_SECONDS", 1.0))
REQUEST_TIMEOUT      = float(os.environ.get("REQUEST_TIMEOUT", 18.0))

# Agent3 specifics (safe defaults)
AGENT3_CONCURRENCY       = int(os.environ.get("AGENT3_CONCURRENCY", "6"))
SHOPIFY_DELAY_SECONDS    = float(os.environ.get("SHOPIFY_DELAY_SECONDS", "3.0"))
WORDPRESS_DELAY_SECONDS  = float(os.environ.get("WORDPRESS_DELAY_SECONDS", "1.2"))
FRESH_START_AGENT3       = int(os.environ.get("FRESH_START_AGENT3", "0"))  # 0=merge, 1=truncate latest tables first
A2_ONLY_NEW = int(os.environ.get("A2_ONLY_NEW", "1"))
# Debug / logging
VERBOSE = True
PRINT_EVERY = 1                    # log every site
DEBUG_MAX_BRANDS = 10              # limit to first 10 brands for debug

def log(*a):
    if VERBOSE:
        print(*a, flush=True)

UA_POOL = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36",
]
BASE_HEADERS = {
    "Accept": "application/xml, text/xml;q=0.9, text/plain;q=0.7, */*;q=0.6",
    "Accept-Language": "en-US,en;q=0.8",
    "Accept-Encoding": "gzip, deflate",
    "Cache-Control": "no-cache",
    "Pragma": "no-cache",
}

ROBOTS_PATH = "/robots.txt"

# --- ensure folders exist (Windows-safe when DB_PATH has a directory) ---
_db_dir = os.path.dirname(DB_PATH)
if _db_dir:
    os.makedirs(_db_dir, exist_ok=True)
os.makedirs(OUT_DIR, exist_ok=True)


In [2]:
# CELL 2 — Utils + DB (with safe migrations for legacy NOT NULL columns) + PoliteFetcher

# =========================
# Utils
# =========================

def now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def _looks_like_domain_or_url(val: str) -> bool:
    if not isinstance(val, str): return False
    s = val.strip()
    if not s or " " in s: return False
    return bool(re.search(r"([a-z0-9-]+\.)+[a-z]{2,}", s, re.I))

def _norm_root(u: str) -> Optional[str]:
    if not isinstance(u, str): return None
    s = u.strip()
    if not s: return None
    if not re.match(r"^https?://", s, re.I):
        s = "https://" + s
    parts = urlsplit(s)
    if not parts.netloc: return None
    return urlunsplit((parts.scheme, parts.netloc, "", "", ""))

def _with_www(base: str) -> Optional[str]:
    if not base: return None
    parts = urlsplit(base)
    host = parts.netloc
    if host.startswith("www."): return base
    return urlunsplit((parts.scheme, "www."+host, "", "", ""))

def _without_www(base: str) -> Optional[str]:
    if not base: return None
    parts = urlsplit(base)
    host = parts.netloc
    if host.startswith("www."):
        return urlunsplit((parts.scheme, host[4:], "", "", ""))
    return base


# =========================
# Minimal DB (+ product_sitemaps migrations)
# =========================
SCHEMA_SQL = r"""
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;

CREATE TABLE IF NOT EXISTS sites (
  canonical_base TEXT PRIMARY KEY,
  brand          TEXT,
  first_seen_at  TEXT,
  last_seen_at   TEXT
);

CREATE TABLE IF NOT EXISTS discoveries (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  base_url        TEXT NOT NULL,
  robots_url      TEXT,
  robots_status   INTEGER,
  llm_model       TEXT,
  primary_sitemap TEXT,
  sitemaps_json   TEXT,
  platform        TEXT,
  success         INTEGER,
  reason          TEXT,
  extracted_at    TEXT
);

CREATE INDEX IF NOT EXISTS idx_disc_base ON discoveries(base_url);

/* One row per site for product sitemap expansion (new shape) */
CREATE TABLE IF NOT EXISTS product_sitemaps (
  site                   TEXT,
  platform               TEXT,
  primary_sitemap        TEXT,
  sitemap_kind           TEXT,
  product_sitemaps_json  TEXT,   -- JSON array of product-sitemap URLs (deduped)
  urls_count             INTEGER,
  fetched_at             TEXT,
  status                 INTEGER, -- 1=ok, 0=fail, 2=skipped
  reason                 TEXT
);

CREATE TABLE IF NOT EXISTS product_urls (
  site          TEXT,
  platform      TEXT,
  sitemap_url   TEXT,
  url           TEXT,
  lastmod       TEXT,
  changefreq    TEXT,
  images_json   TEXT,
  fetched_at    TEXT,
  PRIMARY KEY(site, url)        -- de-dupe across retries; URL belongs to a site
);
CREATE INDEX IF NOT EXISTS idx_product_urls_url ON product_urls(url);
CREATE INDEX IF NOT EXISTS idx_product_urls_site_time ON product_urls(site, fetched_at);
CREATE INDEX IF NOT EXISTS idx_product_urls_site_sitemap ON product_urls(site, sitemap_url);

CREATE TABLE IF NOT EXISTS product_urls_sitemaps (
  sitemap_url    TEXT PRIMARY KEY,
  site           TEXT,
  platform       TEXT,
  fetched_at     TEXT,
  http_status    INTEGER,
  content_type   TEXT,
  content_length INTEGER,
  etag           TEXT,
  last_modified  TEXT,
  urls_found     INTEGER,
  status         INTEGER,   -- 1=ok, 0=fail
  reason         TEXT
);

"""

class DB:
    def __init__(self, path: str):
        self.path = path
        self._db: Optional[aiosqlite.Connection] = None
        self._ps_cols: Dict[str, Dict[str, Any]] = {}  # column_name -> {notnull:int, dflt:str|None}

    async def __aenter__(self):
        self._db = await aiosqlite.connect(self.path)
        await self._db.executescript(SCHEMA_SQL)
        await self._db.commit()
        await self._migrate_product_sitemaps()
        await self._migrate_agent4_tables()   # <-- add this
        return self

    async def _migrate_agent4_tables(self):
        await self._db.execute("""
        CREATE TABLE IF NOT EXISTS product_urls (
          site TEXT, platform TEXT, sitemap_url TEXT, url TEXT,
          lastmod TEXT, changefreq TEXT, images_json TEXT, fetched_at TEXT,
          PRIMARY KEY(site, url)
        )""")
        await self._db.execute("CREATE INDEX IF NOT EXISTS idx_product_urls_url ON product_urls(url)")
        await self._db.execute("CREATE INDEX IF NOT EXISTS idx_product_urls_site_time ON product_urls(site, fetched_at)")
        await self._db.execute("CREATE INDEX IF NOT EXISTS idx_product_urls_site_sitemap ON product_urls(site, sitemap_url)")

        await self._db.execute("""
        CREATE TABLE IF NOT EXISTS product_urls_sitemaps (
          sitemap_url TEXT PRIMARY KEY,
          site TEXT, platform TEXT, fetched_at TEXT,
          http_status INTEGER, content_type TEXT, content_length INTEGER,
          etag TEXT, last_modified TEXT, urls_found INTEGER,
          status INTEGER, reason TEXT
        )""")
        await self._db.commit()

    async def upsert_product_urls_bulk(self, site: str, platform: str, sitemap_url: str, rows: List[Dict[str, Any]]):
        """rows: [{url,lastmod,changefreq,images_json}]"""
        if not rows:
            return 0
        now = now_iso()
        await self._db.execute("BEGIN")
        await self._db.executemany(
            """
            INSERT INTO product_urls(site, platform, sitemap_url, url, lastmod, changefreq, images_json, fetched_at)
            VALUES(?,?,?,?,?,?,?,?)
            ON CONFLICT(site, url) DO UPDATE SET
              platform=excluded.platform,
              sitemap_url=excluded.sitemap_url,
              lastmod=COALESCE(excluded.lastmod, product_urls.lastmod),
              changefreq=COALESCE(excluded.changefreq, product_urls.changefreq),
              images_json=COALESCE(NULLIF(excluded.images_json,''), product_urls.images_json),
              fetched_at=excluded.fetched_at
            """,
            [
                (site, platform, sitemap_url, r.get("url"), r.get("lastmod"),
                 r.get("changefreq"), r.get("images_json") or "", now)
                for r in rows if r.get("url")
            ]
        )
        await self._db.commit()
        return len(rows)

    async def mark_product_sitemap(self, sitemap_url: str, site: str, platform: str,
                                   http_status: int, content_type: str, content_length: int,
                                   etag: str, last_modified: str,
                                   urls_found: int, status: int, reason: str):
        await self._db.execute("""
            INSERT INTO product_urls_sitemaps(sitemap_url, site, platform, fetched_at,
              http_status, content_type, content_length, etag, last_modified,
              urls_found, status, reason)
            VALUES(?,?,?,?,?,?,?,?,?,?,?,?)
            ON CONFLICT(sitemap_url) DO UPDATE SET
              site=excluded.site,
              platform=excluded.platform,
              fetched_at=excluded.fetched_at,
              http_status=excluded.http_status,
              content_type=excluded.content_type,
              content_length=excluded.content_length,
              etag=excluded.etag,
              last_modified=excluded.last_modified,
              urls_found=excluded.urls_found,
              status=excluded.status,
              reason=excluded.reason
        """, (
            sitemap_url, site, platform, now_iso(),
            http_status, content_type or "", int(content_length or 0),
            etag or "", last_modified or "",
            int(urls_found or 0), int(status or 0), (reason or "")[:240]
        ))
        await self._db.commit()


    async def __aexit__(self, exc_type, exc, tb):
        if self._db:
            await self._db.commit()
            await self._db.close()

    async def _migrate_product_sitemaps(self):
        """Ensure product_sitemaps has required columns, a UNIQUE(site), and capture legacy cols."""
        # Ensure table exists
        await self._db.execute("""
        CREATE TABLE IF NOT EXISTS product_sitemaps (
          site TEXT, platform TEXT, primary_sitemap TEXT, sitemap_kind TEXT,
          product_sitemaps_json TEXT, urls_count INTEGER, fetched_at TEXT,
          status INTEGER, reason TEXT
        )""")
        await self._db.commit()

        # Add missing modern columns
        required = {
            "site":"TEXT","platform":"TEXT","primary_sitemap":"TEXT","sitemap_kind":"TEXT",
            "product_sitemaps_json":"TEXT","urls_count":"INTEGER","fetched_at":"TEXT",
            "status":"INTEGER","reason":"TEXT"
        }
        cur = await self._db.execute("PRAGMA table_info(product_sitemaps)")
        info = await cur.fetchall()  # cid, name, type, notnull, dflt_value, pk
        have = {r[1] for r in info}
        for col, coltype in required.items():
            if col not in have:
                await self._db.execute(f"ALTER TABLE product_sitemaps ADD COLUMN {col} {coltype}")
        await self._db.commit()

        # Refresh info and cache notnull/defaults for dynamic UPSERTs
        cur = await self._db.execute("PRAGMA table_info(product_sitemaps)")
        info = await cur.fetchall()
        self._ps_cols = {r[1]: {"notnull": int(r[3] or 0), "dflt": r[4]} for r in info}

        # Ensure UNIQUE index on site for ON CONFLICT(site)
        # Deduplicate existing rows that would violate uniqueness
        cur = await self._db.execute("""
            SELECT site, MIN(rowid) AS keep_id, COUNT(*) AS c
            FROM product_sitemaps
            WHERE site IS NOT NULL AND TRIM(site) <> ''
            GROUP BY site
            HAVING c > 1
        """)
        dups = await cur.fetchall()
        for site, keep_id, _ in dups:
            await self._db.execute("DELETE FROM product_sitemaps WHERE site=? AND rowid<>?", (site, keep_id))
        await self._db.commit()

        # Create UNIQUE index if missing
        cur = await self._db.execute("PRAGMA index_list(product_sitemaps)")
        idx_rows = await cur.fetchall()  # seq, name, unique, origin, partial
        has_unique_on_site = False
        for _, idx_name, is_unique, *_ in idx_rows:
            if is_unique:
                cur2 = await self._db.execute(f"PRAGMA index_info({idx_name})")
                cols = [r[2] for r in await cur2.fetchall()]
                if cols == ["site"]:
                    has_unique_on_site = True
                    break
        if not has_unique_on_site:
            await self._db.execute("CREATE UNIQUE INDEX IF NOT EXISTS ux_product_sitemaps_site ON product_sitemaps(site)")
            await self._db.commit()

        # Friendly secondary index
        await self._db.execute("CREATE INDEX IF NOT EXISTS idx_ps_site ON product_sitemaps(site)")
        await self._db.commit()

    async def upsert_site(self, canonical_base: str, brand: str):
        now = now_iso()
        await self._db.execute(
            """
            INSERT INTO sites(canonical_base, brand, first_seen_at, last_seen_at)
            VALUES(?,?,?,?)
            ON CONFLICT(canonical_base) DO UPDATE SET
              brand=excluded.brand,
              last_seen_at=excluded.last_seen_at
            """,
            (canonical_base, brand, now, now),
        )
        await self._db.commit()

    async def insert_discovery(self, rec: Dict[str, Any]):
        cols = [
            "base_url","robots_url","robots_status","llm_model","primary_sitemap",
            "sitemaps_json","platform","success","reason","extracted_at"
        ]
        vals = [rec.get(c) for c in cols]
        placeholders = ",".join(["?"]*len(cols))
        await self._db.execute(
            f"INSERT INTO discoveries({','.join(cols)}) VALUES({placeholders})",
            tuple(vals)
        )
        await self._db.commit()

    async def merge_product_sitemaps(
        self,
        site: str,
        platform: Optional[str],
        primary_sitemap: Optional[str],
        sitemap_kind: Optional[str],
        new_urls: List[str],
        status: int,
        reason: str
    ):
        """Upsert one row per site; merge/dedupe arrays. Compatible with legacy NOT NULL columns."""
        site = (site or "").rstrip("/")

        # Load existing URLs
        cur = await self._db.execute("SELECT product_sitemaps_json FROM product_sitemaps WHERE site=?", (site,))
        row = await cur.fetchone()
        old_urls: List[str] = []
        if row and row[0]:
            try:
                old_urls = json.loads(row[0])
            except Exception:
                old_urls = []

        merged, seen = [], set()
        for u in (old_urls + new_urls):
            u = (u or "").strip()
            if u and u not in seen:
                merged.append(u); seen.add(u)

        arr_json = json.dumps(merged, ensure_ascii=False)
        now = now_iso()

        # ---------- Dynamic column handling (legacy-safe) ----------
        # Base set we always write
        insert_cols = [
            "site","platform","primary_sitemap","sitemap_kind",
            "product_sitemaps_json","urls_count","fetched_at","status","reason"
        ]
        insert_vals = [
            site, platform, primary_sitemap, sitemap_kind,
            arr_json, len(merged), now, status, reason[:240]
        ]

        # If legacy NOT NULL columns exist (e.g., 'product_sitemap', 'url'), populate them
        def legacy_value(col: str) -> Any:
            if col == "product_sitemap":
                # fill with primary_sitemap (or first product sitemap) to satisfy NOT NULL
                return primary_sitemap or (merged[0] if merged else "")
            if col == "url":
                # old schemas often had a single 'url' — store primary or first
                return primary_sitemap or (merged[0] if merged else "")
            # sensible defaults for other unknown NOT NULL columns
            return ""  # empty string keeps NOT NULL happy without meaning

        for col, meta in self._ps_cols.items():
            if col in insert_cols:
                continue
            if int(meta.get("notnull", 0)) == 1:
                insert_cols.append(col)
                insert_vals.append(legacy_value(col))

        # Build UPDATE list mirroring what we inserted (excluding PK)
        update_assignments = [f"{c}=excluded.{c}" for c in insert_cols if c != "site"]

        sql_upsert = f"""
            INSERT INTO product_sitemaps({', '.join(insert_cols)})
            VALUES({', '.join(['?']*len(insert_cols))})
            ON CONFLICT(site) DO UPDATE SET
              {', '.join(update_assignments)}
        """

        try:
            await self._db.execute(sql_upsert, tuple(insert_vals))
        except Exception:
            # Fallback for very old DBs with no UNIQUE(site): manual update/insert
            set_clause = ", ".join([f"{c}=?" for c in insert_cols if c != "site"])
            params_update = [v for c, v in zip(insert_cols, insert_vals) if c != "site"] + [site]
            await self._db.execute(f"UPDATE product_sitemaps SET {set_clause} WHERE site=?", params_update)
            cur2 = await self._db.execute("SELECT changes()")
            changed = (await cur2.fetchone() or [0])[0]
            if not changed:
                await self._db.execute(
                    f"INSERT INTO product_sitemaps({', '.join(insert_cols)}) VALUES({', '.join(['?']*len(insert_cols))})",
                    tuple(insert_vals)
                )

        await self._db.commit()


# =========================
# HTTP client (polite & 429-safe)
# =========================
# =========================
# HTTP client (polite & 429-safe)
# =========================
# =========================
# HTTP client (polite & 429-safe)
# =========================
class PoliteFetcher:
    def __init__(
        self,
        global_concurrency: Optional[int]=None,
        per_host_concurrency: Optional[int]=None,
        force_http2: Optional[bool]=None,
        timeout: Optional[float]=None,
    ):
        self._client: Optional[httpx.AsyncClient] = None
        self._gsem = asyncio.Semaphore(global_concurrency or GLOBAL_CONCURRENCY)
        self._per_host_conc = per_host_concurrency or PER_HOST_CONCURRENCY
        self._host_sem: Dict[str, asyncio.Semaphore] = {}
        self._host_next: Dict[str, float] = {}
        self._lock = asyncio.Lock()
        self._http2 = True if force_http2 is None else bool(force_http2)
        self._timeout = timeout or REQUEST_TIMEOUT

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            headers={**BASE_HEADERS, "User-Agent": random.choice(UA_POOL)},
            timeout=self._timeout,
            follow_redirects=True,
            http2=self._http2,
        )
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if self._client:
            await self._client.aclose()

    async def _throttle(self, base: str):
        async with self._lock:
            now = time.monotonic()
            next_allowed = self._host_next.get(base)
            if next_allowed is not None and now < next_allowed:
                await asyncio.sleep(next_allowed - now)
            self._host_next[base] = time.monotonic() + HOST_GAP_SECONDS + random.uniform(0, 0.35)

    async def get(self, url: str, *, headers: Optional[Dict[str, str]] = None, allow_redirects: bool = True) -> httpx.Response:
        parts = urlsplit(url)
        base = urlunsplit((parts.scheme, parts.netloc, "", "", ""))
        if base not in self._host_sem:
            self._host_sem[base] = asyncio.Semaphore(self._per_host_conc)
        async with self._gsem, self._host_sem[base]:
            await self._throttle(base)
            try:
                # start from client defaults, rotate UA, then let per-call headers override
                per_req_headers = dict(self._client.headers)
                per_req_headers["User-Agent"] = random.choice(UA_POOL)
                if headers:
                    per_req_headers.update(headers)
                return await self._client.get(
                    url,
                    headers=per_req_headers,
                    follow_redirects=allow_redirects,  # map allow_redirects -> httpx follow_redirects
                )
            except Exception as e:
                # preserve the exception message in the response body for debugging
                return httpx.Response(0, request=httpx.Request("GET", url), content=str(e).encode("utf-8"))

    async def get_with_backoff(
        self,
        url: str,
        *,
        retries: int = 3,
        headers: Optional[Dict[str, str]] = None,
        allow_redirects: bool = True,
    ) -> httpx.Response:
        r = await self.get(url, headers=headers, allow_redirects=allow_redirects)
        if r.status_code in (429, 403, 503, 520, 522) and retries > 0:
            ra = r.headers.get("Retry-After")
            try:
                delay = float(ra) if ra and re.match(r"^[0-9.]+$", ra) else random.uniform(3.5, 8.0)
            except Exception:
                delay = random.uniform(3.5, 8.0)
            await asyncio.sleep(delay)
            try:
                if self._client:
                    self._client.headers["User-Agent"] = random.choice(UA_POOL)
            except Exception:
                pass
            return await self.get_with_backoff(
                url,
                retries=retries - 1,
                headers=headers,
                allow_redirects=allow_redirects,
            )
        return r



In [3]:
#CELL 3

# =========================
# LLM wrapper (compulsory) — Chat Completions only, with origin + hints
# =========================
class LLM:
    def __init__(self, model: str, api_key: Optional[str]):
        if not api_key:
            raise RuntimeError("OPENAI_API_KEY required: LLM is compulsory for extraction")
        self.model = model
        from openai import OpenAI
        self.client = OpenAI(api_key=api_key)

    def _make_hints(self, robots_text: str) -> List[str]:
        urls = re.findall(r"https?://[^\s#]+", robots_text, flags=re.I)
        seen = set(); out = []
        for u in urls:
            u = u.strip().rstrip(").,;")
            if u not in seen:
                out.append(u); seen.add(u)
        return out[:25]

    async def extract(self, robots_text: str, origin_base: str) -> Dict[str, Any]:
        hints = self._make_hints(robots_text)
        sys_prompt = (
            "You are a precise extractor for robots.txt files.\n"
            "Output ONLY JSON with keys: primary_sitemap (string|null), sitemaps (array), "
            "platform ('shopify'|'wordpress'|'other'), success (bool), reason (string).\n"
            "Prefer https://<host>/sitemap.xml if present; for WordPress prefer product sitemaps when available.\n"
            "Do not invent URLs; resolve relative paths using the provided ORIGIN."
        )
        user_block = f"ORIGIN: {origin_base}\nROBOTS.TXT:\n{robots_text[:48000]}\n"
        if hints:
            user_block += "HINTS:\n" + "\n".join(hints)

        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "system", "content": sys_prompt},
                      {"role": "user", "content": user_block}],
            response_format={"type": "json_object"},
        )
        out_text = resp.choices[0].message.content or "{}"
        m = re.search(r"\{.*\}", out_text, re.S)
        data = json.loads(m.group(0)) if m else json.loads(out_text)

        def _abs(u: str) -> bool:
            return bool(re.match(r"^https?://", u, re.I))

        sitemaps = [u.strip() for u in (data.get("sitemaps") or []) if isinstance(u, str) and _abs(u.strip())]
        primary = data.get("primary_sitemap")
        if not (isinstance(primary, str) and _abs(primary.strip())):
            primary = None
        else:
            primary = primary.strip()

        platform = str(data.get("platform") or "other").lower()
        if platform not in {"shopify", "wordpress", "other"}:
            platform = "other"

        success = bool(data.get("success")) and (primary is not None or len(sitemaps) > 0)
        reason = str(data.get("reason") or ("ok" if success else "no sitemap lines"))[:240]

        return {
            "primary_sitemap": primary,
            "sitemaps": sitemaps,
            "platform": platform,
            "success": success,
            "reason": reason,
        }

    async def extract_product_sitemaps(self, sitemap_text: str, origin_base: str) -> List[str]:
        """
        Return an array of absolute URLs that are PRODUCT sitemap files only.
        Shopify: sitemap_products_*.xml
        WordPress: product-sitemap.xml, product-*.xml (exclude product_tag, product_cat, product_variation, attributes)
        For mixed or index sitemaps, return only the product sitemap files.
        """
        sys_prompt = (
            "You are a precise extractor for XML/HTML sitemaps. "
            "Output ONLY JSON with keys: products (array of absolute URLs), success (bool), reason (string).\n"
            "Include ONLY product sitemap files:\n"
            "- Shopify: URLs like 'sitemap_products_*.xml'.\n"
            "- WordPress: URLs like 'product-sitemap.xml' or 'product-*.xml'.\n"
            "EXCLUDE category/tag/attribute/variation sitemaps (e.g., product_cat, product-tag, product_tag, "
            "product-attribute, product_variation). Do NOT invent URLs. Resolve relative URLs against ORIGIN."
        )
        user = f"ORIGIN: {origin_base}\nSITEMAP CONTENT (raw):\n{sitemap_text[:48000]}\n"

        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "system", "content": sys_prompt},
                      {"role": "user", "content": user}],
            response_format={"type": "json_object"},
        )
        out_text = resp.choices[0].message.content or "{}"
        try:
            data = json.loads(out_text)
        except Exception:
            m = re.search(r"\{.*\}", out_text, re.S)
            data = json.loads(m.group(0)) if m else {"products":[]}

        products = []
        for u in (data.get("products") or []):
            if isinstance(u, str) and re.match(r"^https?://", u.strip(), re.I):
                products.append(u.strip())
        # de-dupe
        seen = set(); out = []
        for u in products:
            if u not in seen:
                out.append(u); seen.add(u)
        return out


In [4]:
#CELL 4

# =========================
# Agent 1: load brands
# =========================
async def agent1_load_brands(db: DB, path: str) -> List[Dict[str,str]]:
    if not os.path.exists(path):
        raise FileNotFoundError(f"INPUT_PATH not found: {path}")
    if path.lower().endswith('.csv'):
        df = pd.read_csv(path)
    else:
        df = pd.read_excel(path)
    found: List[Dict[str,str]] = []
    for _, row in df.iterrows():
        cells = [str(v) for v in row.to_dict().values() if pd.notna(v)]
        url_val = None
        for c in cells:
            if _looks_like_domain_or_url(c):
                url_val = c.strip(); break
        base = _norm_root(url_val) if url_val else None
        if not base: continue
        brand = None
        for cname in df.columns:
            if 'brand' in str(cname).lower():
                v = row[cname]
                brand = str(v).strip() if pd.notna(v) else None
                break
        brand = brand or base
        found.append({"brand": brand, "base": base})
    # dedupe by base
    uniq, seen = [], set()
    for r in found:
        if r["base"] not in seen:
            uniq.append(r); seen.add(r["base"])
    for r in uniq:
        await db.upsert_site(r["base"], r["brand"])
    return uniq

# =========================
# Helper: resolve canonical base (follows redirects)
# =========================
async def resolve_canonical_base(fetch: PoliteFetcher, base: str) -> str:
    try:
        r = await fetch.get_with_backoff(base.rstrip('/'), retries=2)
        if r.status_code and str(r.url):
            p = urlsplit(str(r.url))
            if p.scheme and p.netloc:
                canon = urlunsplit((p.scheme, p.netloc, "", "", ""))
                log(f"[A2] canonical: {base} → {canon}")
                return canon
    except Exception:
        pass
    return base

def make_variants(canon: str) -> List[str]:
    p = urlsplit(canon)
    https_host = p.netloc
    https_base = urlunsplit(("https", https_host, "", "", ""))

    with_www = https_base if https_host.startswith("www.") else urlunsplit(("https", "www."+https_host, "", "", ""))
    without_www = https_base if not https_host.startswith("www.") else urlunsplit(("https", https_host[4:], "", "", ""))

    https_trials = [
        https_base.rstrip('/') + ROBOTS_PATH,
        with_www.rstrip('/') + ROBOTS_PATH,
        without_www.rstrip('/') + ROBOTS_PATH,
    ]
    http_trials = [u.replace("https://", "http://", 1) for u in https_trials]

    seen = set(); all_u = []
    for u in https_trials + http_trials:
        if u not in seen:
            all_u.append(u); seen.add(u)
    return all_u

# =========================
# Agent 2: robots → LLM extract (ONLY)
# =========================
async def fetch_robots(fetch: PoliteFetcher, base: str) -> Tuple[str, int, Optional[str], Dict[str,str]]:
    canon = await resolve_canonical_base(fetch, base)
    trials = make_variants(canon)

    last_status, last_url = 0, trials[0]
    for u in trials:
        log(f"[A2] robots try: {u}")
        r = await fetch.get_with_backoff(u, retries=2)
        ct = r.headers.get("content-type", "")
        clen = r.headers.get("content-length", "")
        log(f"[A2] robots status: {u} → {r.status_code} (ct={ct}, len={clen})")
        if r.status_code == 200:
            preview = (r.text or "")[:240].replace("\r", "\\r").replace("\n", "\\n")
            log(f"[A2] robots head: {preview}")
            return u, r.status_code, (r.text or ""), {"content-type": ct}
        last_status, last_url = r.status_code, u

    return last_url, last_status, None, {"content-type": ""}

async def agent2_probe(db: DB, rows: List[Dict[str,str]]):
    if not OPENAI_API_KEY:
        raise RuntimeError("OPENAI_API_KEY is required — LLM is compulsory for extraction")
    llm = LLM(LLM_MODEL, OPENAI_API_KEY)

    if DEBUG_MAX_BRANDS > 0:
        rows = rows[:DEBUG_MAX_BRANDS]
    total = len(rows)
    log(f"[A2] Starting robots+LLM on {total} site(s)...")

    async with PoliteFetcher() as fetch:
        sem = asyncio.Semaphore(GLOBAL_CONCURRENCY)

        async def one(i: int, r: Dict[str,str]):
            base = r["base"]
            try:
                async with sem:
                    if (i % PRINT_EVERY) == 0:
                        log(f"[A2] [{i}/{total}] start: {base}")
                    robots_url, status, text, meta = await fetch_robots(fetch, base)
                    if status == 200 and text:
                        p = urlsplit(robots_url) if robots_url else urlsplit(base)
                        origin = f"{p.scheme}://{p.netloc}"
                        data = await llm.extract(text, origin_base=origin)
                        rec = {
                            "base_url": base,
                            "robots_url": robots_url,
                            "robots_status": status,
                            "llm_model": LLM_MODEL,
                            "primary_sitemap": data.get("primary_sitemap"),
                            "sitemaps_json": json.dumps(data.get("sitemaps") or [], ensure_ascii=False),
                            "platform": data.get("platform"),
                            "success": 1 if data.get("success") else 0,
                            "reason": data.get("reason"),
                            "extracted_at": now_iso(),
                        }
                        await db.insert_discovery(rec)
                        ok = "OK" if rec["success"] else "NO-SM"
                        log(f"[A2] [{i}/{total}] {ok}: {base} → platform={rec['platform']} "
                            f"primary={rec['primary_sitemap'] or '-'}")
                    else:
                        rec = {
                            "base_url": base,
                            "robots_url": robots_url,
                            "robots_status": status,
                            "llm_model": LLM_MODEL,
                            "primary_sitemap": None,
                            "sitemaps_json": json.dumps([], ensure_ascii=False),
                            "platform": None,
                            "success": 0,
                            "reason": f"robots_fetch_failed:{status}",
                            "extracted_at": now_iso(),
                        }
                        await db.insert_discovery(rec)
                        log(f"[A2] [{i}/{total}] FAIL: {base} → robots {status}")
            except Exception as e:
                rec = {
                    "base_url": base,
                    "robots_url": None,
                    "robots_status": None,
                    "llm_model": LLM_MODEL,
                    "primary_sitemap": None,
                    "sitemaps_json": json.dumps([], ensure_ascii=False),
                    "platform": None,
                    "success": 0,
                    "reason": f"exception:{type(e).__name__}:{str(e)[:180]}",
                    "extracted_at": now_iso(),
                }
                await db.insert_discovery(rec)
                log(f"[A2] [{i}/{total}] EXC: {base} → {type(e).__name__}: {e}")

        tasks = [one(i, r) for i, r in enumerate(rows, start=1)]
        await asyncio.gather(*tasks)

# LangGraph nodes
#CELL 4

from typing import TypedDict
import sqlite3

class AState(TypedDict, total=False):
    brands: List[Dict[str, str]]

# --- nodes ---
async def node_load(state: AState) -> AState:
    async with DB(DB_PATH) as db:
        rows = await agent1_load_brands(db, INPUT_PATH)
    print(f"[Agent1] Loaded brands: {len(rows)}", flush=True)
    for r in rows[:min(5, len(rows))]:
        print("   ↳", r["brand"], "→", r["base"], flush=True)
    return {"brands": rows}

async def node_probe(state: AState) -> AState:
    # Honor SKIP_A2
    if os.environ.get("SKIP_A2", "0") == "1":
        print("[A2] SKIP_A2=1 → skipping Agent2 (robots+LLM).", flush=True)
        return state

    brands = state.get("brands", []) or []
    total_all = len(brands)

    # Only-new mode (skip bases already in sites_latest)
    only_new = os.environ.get("A2_ONLY_NEW", "1") == "1"
    bases_in_latest = set()
    if only_new:
        try:
            with sqlite3.connect(DB_PATH) as conn:
                try:
                    cur = conn.execute("SELECT site FROM sites_latest")
                    bases_in_latest = { (row[0] or "").rstrip("/") for row in cur.fetchall() }
                except sqlite3.OperationalError:
                    bases_in_latest = set()
        except Exception:
            bases_in_latest = set()

    rows = []
    if only_new and bases_in_latest:
        for r in brands:
            b = (r["base"] or "").rstrip("/")
            if b not in bases_in_latest:
                rows.append(r)
        print(f"[A2] Only-new mode on → {len(rows)} site(s) to process (filtered from {total_all}).", flush=True)
        if not rows:
            print("[A2] Nothing to do (all sites already present in sites_latest).", flush=True)
            return state
    else:
        rows = brands
        print(f"[A2] Preparing to process {min(len(rows), DEBUG_MAX_BRANDS) if DEBUG_MAX_BRANDS>0 else len(rows)} site(s) (of {len(rows)})...", flush=True)

    async with DB(DB_PATH) as db:
        await agent2_probe(db, rows)

    print("[Agent2] Probing complete", flush=True)
    return state

# --- build & compile a fresh graph every time this cell runs ---
def build_graph():
    g = StateGraph(AState)
    g.add_node("load_brands", node_load)
    g.add_node("probe", node_probe)
    g.add_edge(START, "load_brands")
    g.add_edge("load_brands", "probe")
    g.add_edge("probe", END)
    return g.compile()

app = build_graph()



In [5]:
# CELL 5 — Agent 3: LLM-only expansion (skip product primaries)

import sqlite3, re, random, asyncio
from urllib.parse import urlsplit, urlunsplit

# Concurrency knobs
A3_CONCURRENCY = int(os.environ.get("AGENT3_CONCURRENCY", os.environ.get("A3_CONCURRENCY", "6")))
A3_PER_HOST    = int(os.environ.get("A3_PER_HOST", "1"))
A3_MAX_SITES   = int(os.environ.get("A3_MAX_SITES", "0"))   # 0 = all

# Fresh start switch (truncate product_sitemaps before writing)
# Fresh start switch (truncate product_sitemaps before writing)
# => FRESH_START_AGENT3="1" means DO fresh; "0" means incremental.
FRESH_START_AGENT3 = os.environ.get("FRESH_START_AGENT3", "0") == "1"

# Skip rows already OK (only relevant when NOT fresh)
A3_SKIP_IF_ALREADY_OK = (os.environ.get("A3_SKIP_IF_ALREADY_OK", "1") == "1") and (not FRESH_START_AGENT3)

# NEW: exclude product primaries entirely (default ON)
A3_EXCLUDE_PRODUCT_PRIMARIES = os.environ.get("A3_EXCLUDE_PRODUCT_PRIMARIES", "1") == "1"

def _origin_from_url(u: str) -> str:
    p = urlsplit(u)
    return urlunsplit((p.scheme, p.netloc, "", "", ""))

async def agent3_expand_product_sitemaps():
    # Ensure schema exists and optionally fresh-start
    async with DB(DB_PATH) as _db:
        if FRESH_START_AGENT3:
            await _db._db.execute("DELETE FROM product_sitemaps")
            await _db._db.commit()
            print(f"[A3] FRESH_START_AGENT3={int(FRESH_START_AGENT3)} → cleared product_sitemaps.", flush=True)
        else:
            print(f"[A3] FRESH_START_AGENT3={int(FRESH_START_AGENT3)} → incremental; "
                f"A3_SKIP_IF_ALREADY_OK={int(A3_SKIP_IF_ALREADY_OK)}.", flush=True)


    # Load exactly what's in sites_latest
    try:
        with sqlite3.connect(DB_PATH) as conn:
            cur = conn.execute("""
                SELECT site,
                       LOWER(COALESCE(platform,'other')) AS platform,
                       COALESCE(sitemap,'') AS sitemap,
                       LOWER(COALESCE(sitemap_kind,'')) AS kind
                FROM sites_latest
                WHERE sitemap IS NOT NULL
            """)
            rows = [(r[0] or "", r[1] or "other", r[2] or "", r[3] or "") for r in cur.fetchall()]
    except Exception as e:
        print(f"[A3] ERROR reading sites_latest: {e}", flush=True)
        return

    # Optional cap
    if A3_MAX_SITES > 0:
        rows = rows[:A3_MAX_SITES]

    # Optionally skip already-OK sites unless fresh
    if A3_SKIP_IF_ALREADY_OK:
        try:
            with sqlite3.connect(DB_PATH) as conn:
                done = { (s or "").rstrip("/") for (s,) in conn.execute(
                    "SELECT site FROM product_sitemaps WHERE status=1 AND urls_count>0"
                ).fetchall() }
            rows = [r for r in rows if (r[0] or "").rstrip("/") not in done]
        except Exception:
            pass

    # Split into: to-skip (already product primaries) vs to-expand
    if A3_EXCLUDE_PRODUCT_PRIMARIES:
        skip = [(s,p,sm,k) for (s,p,sm,k) in rows if k == "product"]
        todo = [(s,p,sm,k) for (s,p,sm,k) in rows if k != "product"]
    else:
        skip, todo = [], rows

    print(f"[A3] Expanding product sitemaps for {len(todo)} site(s) …", flush=True)
    if skip:
        # Record product primaries as-is (no fetch/LLM)
        async with DB(DB_PATH) as db:
            for s,p,sm,k in skip:
                await db.merge_product_sitemaps(
                    site=s, platform=p, primary_sitemap=sm, sitemap_kind=k,
                    new_urls=[sm], status=1, reason="primary already product sitemap; not expanded"
                )

    if not todo:
        print("[Agent3] Product sitemap expansion complete.", flush=True)
        return

    if not OPENAI_API_KEY:
        raise RuntimeError("OPENAI_API_KEY is required for LLM extraction in A3")
    llm = LLM(LLM_MODEL, OPENAI_API_KEY)

    # Use HTTP/1.1 to avoid some TLS/0 issues seen with certain CDNs
    async with PoliteFetcher(
        global_concurrency=A3_CONCURRENCY,
        per_host_concurrency=A3_PER_HOST,
        force_http2=False
    ) as fetch, DB(DB_PATH) as db:

        async def handle_one(idx: int, site: str, platform: str, primary: str, kind: str):
            label = f"[A3] [{idx}] {site}"
            primary = (primary or "").strip()

            # Validate primary; no guessing/alternates
            if not re.match(r"^https?://", primary, re.I):
                await db.merge_product_sitemaps(site, platform, primary, kind, [], 0, "invalid primary sitemap URL")
                print(f"{label} primary invalid: {primary!r}", flush=True)
                return

            # Gentle pacing by platform
            if platform == "shopify":
                await asyncio.sleep(SHOPIFY_DELAY_SECONDS + random.uniform(0.0, 0.6))
            elif platform == "wordpress":
                await asyncio.sleep(WORDPRESS_DELAY_SECONDS + random.uniform(0.0, 0.4))
            else:
                await asyncio.sleep(0.3 + random.uniform(0.0, 0.3))

            # Fetch exactly the stored primary
            r = await fetch.get_with_backoff(primary, retries=3)
            ct = r.headers.get("content-type", "")
            clen = r.headers.get("content-length", "")
            head = (r.text or "")[:200].replace("\n", " ").replace("\r", " ")
            print(f"{label} fetch: {primary} → {r.status_code} (ct={ct}, len={clen}) head='{head}'", flush=True)

            if r.status_code != 200 or not (r.text or "").strip():
                await db.merge_product_sitemaps(site, platform, primary, kind, [], 0, f"primary fetch failed: {r.status_code}")
                print(f"{label} primary fetch failed: {r.status_code}", flush=True)
                return

            # LLM-only extraction from this primary text
            origin = _origin_from_url(primary)
            products = await llm.extract_product_sitemaps(r.text, origin_base=origin)

            # De-dupe; no heuristics
            out, seen = [], set()
            for u in (products or []):
                u = (u or "").strip()
                if u and u not in seen:
                    out.append(u); seen.add(u)

            if out:
                await db.merge_product_sitemaps(site, platform, primary, kind, out, 1, "ok")
                print(f"{label} → product_sitemaps: {len(out)}", flush=True)
            else:
                await db.merge_product_sitemaps(site, platform, primary, kind, [], 0, "llm-empty")
                print(f"{label} → product_sitemaps: 0 (llm-empty)", flush=True)

        tasks = [handle_one(i, s, p, sm, sk) for i, (s,p,sm,sk) in enumerate(todo, start=1)]
        await asyncio.gather(*tasks)

    print("[Agent3] Product sitemap expansion complete.", flush=True)


In [6]:
# CELL 6 — Agent 4: collect product URLs from product_sitemaps (robust, keep-all fallback)

import asyncio, json, os, re, time, textwrap, sqlite3, random
from typing import List, Dict, Any, Tuple, Optional
from urllib.parse import urlsplit
from lxml import etree

# Tuning
A4_CONCURRENCY    = int(os.environ.get("A4_CONCURRENCY", "6"))
A4_PER_HOST       = int(os.environ.get("A4_PER_HOST", "1"))
A4_SKIP_IF_DONE   = os.environ.get("A4_SKIP_IF_DONE", "1") == "1"
A4_WRITE_NDJSON   = os.environ.get("A4_WRITE_NDJSON", "0") == "1"
# Empty default = keep ALL URLs (no products-only filter)
A4_URL_INCLUDE_RE = os.environ.get("A4_URL_INCLUDE_REGEX", "").strip()
A4_FOLLOW_CHILD   = os.environ.get("A4_FOLLOW_CHILDREN", "0") == "1"
# If a fetched "sitemap" is actually HTML (not XML), store the URL itself as a single product URL
A4_TREAT_NONXML_AS_SINGLE = os.environ.get("A4_TREAT_NONXML_AS_SINGLE", "1") == "1"

def _regex_keep() -> Optional[re.Pattern]:
    """Return a compiled regex if provided, else None (no filtering)."""
    return re.compile(A4_URL_INCLUDE_RE, re.I) if A4_URL_INCLUDE_RE else None

def _gzip_unwrap(b: bytes) -> bytes:
    if len(b) >= 2 and b[:2] == b"\x1f\x8b":
        import gzip
        try:
            return gzip.decompress(b)
        except Exception:
            return b
    return b

def _safe_xml_root(xml_text: str) -> Optional[etree._Element]:
    """Try to parse XML and return the root, or None on failure."""
    try:
        parser = etree.XMLParser(recover=True, huge_tree=True)
        return etree.fromstring(xml_text.encode("utf-8", errors="ignore"), parser=parser)
    except Exception:
        return None

def _looks_like_xml_sitemap(text: str, content_type: str) -> bool:
    """Heuristic: is this response likely an XML sitemap (urlset or sitemapindex)?"""
    ct = (content_type or "").lower()
    if "xml" in ct:
        return True
    # quick sniff in the first ~2KB
    head = text[:2048].lower()
    return ("<urlset" in head) or ("<sitemapindex" in head)

def _parse_urlset(xml_text: str) -> List[Dict[str, Any]]:
    """
    Return list of {url,lastmod,changefreq,images_json} from a standard <urlset>.
    If it's not a <urlset>, returns [].
    """
    out: List[Dict[str, Any]] = []
    root = _safe_xml_root(xml_text)
    if root is None:
        return out

    # ensure we're looking at a urlset-like doc
    ln = etree.QName(root.tag).localname.lower() if hasattr(root, "tag") else ""
    if ln != "urlset":
        return out

    urls = root.xpath("//*[local-name()='url']")
    for node in urls:
        locs = node.xpath("./*[local-name()='loc']/text()")
        if not locs:
            continue
        loc = (locs[0] or "").strip()
        lastmod  = (node.xpath("./*[local-name()='lastmod']/text()") or [None])[0]
        changefq = (node.xpath("./*[local-name()='changefreq']/text()") or [None])[0]
        imgs: List[Dict[str, Any]] = []
        for img in node.xpath(".//*[local-name()='image'] | .//*[local-name()='image:image']"):
            iloc   = (img.xpath("./*[local-name()='loc']/text()") or [None])[0]
            ititle = (img.xpath("./*[local-name()='title']/text()") or [None])[0]
            icap   = (img.xpath("./*[local-name()='caption']/text()") or [None])[0]
            if iloc:
                imgs.append({"loc": iloc, "title": ititle, "caption": icap})
        out.append({
            "url": loc,
            "lastmod": (lastmod or "").strip() or None,
            "changefreq": (changefq or "").strip() or None,
            "images_json": json.dumps(imgs, ensure_ascii=False) if imgs else "",
        })
    return out

def _parse_child_sitemaps(xml_text: str) -> List[str]:
    """When a product sitemap is itself an index; disabled by default unless A4_FOLLOW_CHILD=1."""
    kids: List[str] = []
    root = _safe_xml_root(xml_text)
    if root is None:
        return kids
    ln = etree.QName(root.tag).localname.lower()
    if ln != "sitemapindex":
        return kids
    for loc in root.xpath("//*[local-name()='sitemap']/*[local-name()='loc']/text()"):
        u = (loc or "").strip()
        if u:
            kids.append(u)
    return kids

def _host(u: str) -> str:
    try:
        return urlsplit(u).netloc
    except Exception:
        return "unknown"

async def _a4_load_tasks() -> List[Tuple[str, str, str]]:
    """
    Return list of (site, platform, sitemap_url) to process.
    Only from product_sitemaps where status=1, urls_count>0.
    If A4_SKIP_IF_DONE=1, skip sitemaps already processed successfully.
    """
    tasks: List[Tuple[str, str, str]] = []
    with sqlite3.connect(DB_PATH) as conn:
        cur = conn.execute("""
          SELECT site, COALESCE(platform,'other'), product_sitemaps_json
          FROM product_sitemaps
          WHERE status=1 AND urls_count>0 AND product_sitemaps_json IS NOT NULL
        """)
        rows = cur.fetchall()

        done = set()
        if A4_SKIP_IF_DONE:
            done = {
                r[0] for r in conn.execute(
                    "SELECT sitemap_url FROM product_urls_sitemaps WHERE status=1 AND urls_found>0"
                ).fetchall()
            }

        for site, platform, arr_json in rows:
            try:
                arr = json.loads(arr_json) if arr_json else []
            except Exception:
                arr = []
            for sm in arr:
                su = (sm or "").strip()
                if not su:
                    continue
                if A4_SKIP_IF_DONE and su in done:
                    continue
                tasks.append(((site or "").rstrip('/'), (platform or "other").lower(), su))
    return tasks

async def agent4_collect_product_urls():
    keep_re = _regex_keep()
    tasks = await _a4_load_tasks()
    if not tasks:
        print("[A4] Nothing to do (no new product sitemaps).", flush=True)
        return

    async with PoliteFetcher(
        global_concurrency=A4_CONCURRENCY,
        per_host_concurrency=A4_PER_HOST,
        force_http2=False
    ) as fetch, DB(DB_PATH) as db:

        async def handle_one(idx: int, site: str, platform: str, sm_url: str):
            label = f"[A4] [{idx}] {site} :: {sm_url}"
            try:
                # platform pacing
                if platform == "shopify":
                    await asyncio.sleep(SHOPIFY_DELAY_SECONDS + random.uniform(0.0, 0.6))
                elif platform == "wordpress":
                    await asyncio.sleep(WORDPRESS_DELAY_SECONDS + random.uniform(0.0, 0.4))
                else:
                    await asyncio.sleep(0.35 + random.uniform(0.0, 0.3))

                t0 = time.perf_counter()
                r = await fetch.get_with_backoff(sm_url, retries=3)
                body = _gzip_unwrap(r.content or b"")
                try:
                    text = body.decode("utf-8")
                except Exception:
                    try:
                        text = body.decode("latin-1")
                    except Exception:
                        text = body.decode("utf-8", errors="ignore")

                head = text.replace("\r", " ").replace("\n", " ")
                print(f"{label} → {r.status_code} ct={r.headers.get('content-type','')} len={len(body)} head='{textwrap.shorten(head, 240)}'", flush=True)

                if r.status_code != 200 or not text.strip():
                    await db.mark_product_sitemap(
                        sitemap_url=sm_url, site=site, platform=platform,
                        http_status=r.status_code, content_type=r.headers.get("content-type", ""),
                        content_length=len(body), etag=r.headers.get("etag", ""),
                        last_modified=r.headers.get("last-modified", ""),
                        urls_found=0, status=0, reason=f"fetch_failed:{r.status_code}"
                    )
                    return

                # Decide how to handle the payload
                is_xml_sitemap = _looks_like_xml_sitemap(text, r.headers.get("content-type", ""))

                # Optionally follow child sitemaps if this is a <sitemapindex>
                if is_xml_sitemap and A4_FOLLOW_CHILD:
                    for child in _parse_child_sitemaps(text):
                        tasks.append((site, platform, child))

                entries: List[Dict[str, Any]] = []
                if is_xml_sitemap:
                    # Standard sitemap <urlset>
                    entries = _parse_urlset(text)
                else:
                    # Not an XML sitemap (likely a product page) → store the URL itself
                    if A4_TREAT_NONXML_AS_SINGLE:
                        if (not keep_re) or (keep_re and keep_re.search(sm_url)):
                            entries = [{
                                "url": sm_url,
                                "lastmod": None,
                                "changefreq": None,
                                "images_json": "",
                            }]

                # Keep ALL URLs by default; ensure 'url' exists
                entries = [e for e in entries if e.get("url")]

                # Upsert URLs
                n = await db.upsert_product_urls_bulk(
                    site=site, platform=platform, sitemap_url=sm_url, rows=entries
                )

                # Optional sidecar NDJSON
                if A4_WRITE_NDJSON and n > 0:
                    nd_dir = os.path.join(OUT_DIR, "products")
                    os.makedirs(nd_dir, exist_ok=True)
                    nd_path = os.path.join(nd_dir, f"{_host(sm_url)}.ndjson")
                    with open(nd_path, "a", encoding="utf-8") as f:
                        for e in entries:
                            f.write(json.dumps({
                                "site": site, "platform": platform,
                                "sitemap": sm_url, "url": e["url"],
                                "lastmod": e.get("lastmod"), "changefreq": e.get("changefreq")
                            }, ensure_ascii=False) + "\n")

                dt_ms = int((time.perf_counter() - t0) * 1000)
                await db.mark_product_sitemap(
                    sitemap_url=sm_url, site=site, platform=platform,
                    http_status=r.status_code, content_type=r.headers.get("content-type", ""),
                    content_length=len(body), etag=r.headers.get("etag", ""),
                    last_modified=r.headers.get("last-modified", ""),
                    urls_found=len(entries), status=1, reason=f"ok,{dt_ms}ms"
                )
                print(f"{label} → URLs stored: {len(entries)}", flush=True)

            except Exception as e:
                # Never let one failure kill the whole batch
                try:
                    await db.mark_product_sitemap(
                        sitemap_url=sm_url, site=site, platform=platform,
                        http_status=0, content_type="", content_length=0,
                        etag="", last_modified="", urls_found=0, status=0,
                        reason=f"exception:{type(e).__name__}"
                    )
                finally:
                    print(f"{label} → ERROR: {type(e).__name__}: {e}", flush=True)

        jobs = [handle_one(i, s, p, sm) for i, (s, p, sm) in enumerate(tasks, start=1)]
        # Exceptions are handled inside handle_one; gather won't cancel siblings
        await asyncio.gather(*jobs)

    print("[Agent4] Product URL collection complete.", flush=True)


In [7]:
import os
import re
import json
import time
import asyncio
import sqlite3
import aiohttp
from urllib.parse import urlparse

# --- Config ---
DB_PATH = os.environ.get("DB_PATH", r"D:\museai\data\db\crawler_meta.db")
A4_SRC_TABLE = "product_sitemaps"
A4_POP_TABLE = "populated_product_sitemaps"
A4_LLM_MODEL = os.environ.get("A4_LLM_MODEL", "gpt-5-nano")
A4_CONCURRENCY = int(os.environ.get("A4_CONCURRENCY", "6"))
A4_LLM_CONCURRENCY = int(os.environ.get("A4_LLM_CONCURRENCY", "2"))
A4_LLM_MIN_DELAY_MS = int(os.environ.get("A4_LLM_MIN_DELAY_MS", "120"))
A4_LLM_MAX_RETRIES = int(os.environ.get("A4_LLM_MAX_RETRIES", "5"))
A4_MIN_CONFIDENCE = float(os.environ.get("A4_MIN_CONFIDENCE", "0.65"))
A4_OVERWRITE = os.environ.get("A4_OVERWRITE", "0") == "1"
HTTP_TIMEOUT = int(os.environ.get("A4_HTTP_TIMEOUT", "12"))

ALLOWED_CATS = [
    "Men", "Women", "Unisex", "Kidswear",
    "Accessories", "Jewellery", "Footwear", "Lingerie",
    "Beauty", "Home & Living", "Sportswear", "Electronics", "Other"
]
PRODUCT_CATS = {"Accessories","Jewellery","Footwear","Lingerie","Beauty","Home & Living","Sportswear","Electronics"}

# --- DB helpers ---
def _db(conn, sql, args=()):
    cur = conn.cursor()
    cur.execute(sql, args)
    conn.commit()
    return cur

def _get_columns(conn, table):
    rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
    return [r[1] for r in rows]

def _clone_populated_table_schema(conn):
    _db(conn, f"""
        CREATE TABLE IF NOT EXISTS {A4_POP_TABLE} AS
        SELECT *,
               CAST(NULL AS TEXT) AS website_for,
               CAST(NULL AS TEXT) AS tags_json,
               CAST(0 AS INTEGER) AS a4_status,
               CAST(NULL AS REAL) AS a4_confidence,
               CAST('' AS TEXT) AS a4_model,
               datetime('now') AS updated_at
        FROM {A4_SRC_TABLE} WHERE 0;
    """)
    for col, typ in [
        ("website_for", "TEXT"),
        ("tags_json", "TEXT"),
        ("a4_status", "INTEGER"),
        ("a4_confidence", "REAL"),
        ("a4_model", "TEXT"),
        ("updated_at", "TEXT"),
    ]:
        try:
            _db(conn, f"ALTER TABLE {A4_POP_TABLE} ADD COLUMN {col} {typ}")
        except sqlite3.OperationalError:
            pass
    _db(conn, f"""
        CREATE UNIQUE INDEX IF NOT EXISTS idx_{A4_POP_TABLE}_site_psmap
        ON {A4_POP_TABLE}(site, product_sitemap);
    """)

def _upsert_missing_rows(conn):
    source_cols = _get_columns(conn, A4_SRC_TABLE)
    target_cols = _get_columns(conn, A4_POP_TABLE)
    shared_cols = [c for c in source_cols if c in target_cols]
    cols_csv = ", ".join(shared_cols)
    _db(conn, f"""
        INSERT INTO {A4_POP_TABLE} ({cols_csv})
        SELECT {cols_csv}
        FROM {A4_SRC_TABLE} s
        WHERE s.status=1
          AND NOT EXISTS (
                SELECT 1
                FROM {A4_POP_TABLE} p
                WHERE p.site=s.site AND p.product_sitemap=s.product_sitemap
          );
    """)

def _backfill_new_rows_from_existing(conn):
    _db(conn, f"""
        UPDATE {A4_POP_TABLE} AS p
           SET website_for = (
                   SELECT website_for FROM {A4_POP_TABLE}
                    WHERE site=p.site AND a4_status=1 AND website_for IS NOT NULL
                    ORDER BY updated_at DESC LIMIT 1
               ),
               tags_json = (
                   SELECT tags_json FROM {A4_POP_TABLE}
                    WHERE site=p.site AND a4_status=1 AND tags_json IS NOT NULL
                    ORDER BY updated_at DESC LIMIT 1
               ),
               a4_status = 1,
               a4_confidence = (
                   SELECT a4_confidence FROM {A4_POP_TABLE}
                    WHERE site=p.site AND a4_status=1 AND a4_confidence IS NOT NULL
                    ORDER BY updated_at DESC LIMIT 1
               ),
               a4_model = (
                   SELECT a4_model FROM {A4_POP_TABLE}
                    WHERE site=p.site AND a4_status=1 AND a4_model IS NOT NULL
                    ORDER BY updated_at DESC LIMIT 1
               ),
               updated_at = datetime('now')
         WHERE (p.a4_status IS NULL OR p.a4_status=0)
           AND EXISTS (SELECT 1 FROM {A4_POP_TABLE} q WHERE q.site=p.site AND q.a4_status=1);
    """)

def _sites_to_process(conn):
    if A4_OVERWRITE:
        rows = conn.execute(f"""
            SELECT DISTINCT site
            FROM {A4_SRC_TABLE}
            WHERE status=1
            ORDER BY site
        """).fetchall()
    else:
        rows = conn.execute(f"""
            SELECT DISTINCT s.site
            FROM {A4_SRC_TABLE} s
            WHERE s.status=1
              AND NOT EXISTS (
                    SELECT 1 FROM {A4_POP_TABLE} p
                    WHERE p.site = s.site AND p.a4_status = 1
              )
            ORDER BY s.site
        """).fetchall()
    return [r[0] for r in rows]

def _write_site_result(conn, site, website_for, tags, status, confidence, model_used):
    tags_json = json.dumps(tags, ensure_ascii=False) if tags is not None else None
    _db(conn, f"""
        UPDATE {A4_POP_TABLE}
           SET website_for = ?,
               tags_json = ?,
               a4_status = ?,
               a4_confidence = ?,
               a4_model = ?,
               updated_at = datetime('now')
         WHERE site = ?;
    """, (website_for, tags_json, int(status or 0), confidence, model_used or "", site))

# --- HTTP helpers ---
def _normalize_home(site: str) -> str:
    if not site:
        return ""
    s = site.strip()
    if not s.startswith("http://") and not s.startswith("https://"):
        s = "https://" + s
    return s

def _candidate_paths():
    return ["/collections/all", "/shop", "/products"]

def _is_html_like(ct: str, body: str) -> bool:
    if ct and "html" in ct.lower():
        return True
    return "<html" in (body or "").lower()

async def _one_get(session: aiohttp.ClientSession, url: str):
    try:
        async with session.get(url, allow_redirects=True) as resp:
            ct = resp.headers.get("content-type", "")
            text = await resp.text(errors="ignore")
            return url, resp.status, ct, text, ""
    except Exception as e:
        return url, 0, "", "", f"{type(e).__name__}: {e}"

async def _fetch_best_html(session_https: aiohttp.ClientSession,
                           session_http: aiohttp.ClientSession,
                           site: str):
    host = site.replace("https://", "").replace("http://", "").strip("/")
    https_roots = [f"https://{host}", f"https://www.{host}"]
    http_roots  = [f"http://{host}",  f"http://www.{host}"]

    for base in https_roots + http_roots:
        url, st, ct, body, err = await _one_get(session_https if base.startswith("https") else session_http, base)
        if st == 200 and _is_html_like(ct, body):
            return url, st, ct, body, ""
        if st in (403, 503) and ("cloudflare" in body.lower() or "just a moment" in body.lower()):
            return url, st, ct, body, ""

    base = https_roots[0]
    tasks = [asyncio.create_task(_one_get(session_https, base.rstrip("/") + p)) for p in _candidate_paths()]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=HTTP_TIMEOUT)
    for d in done:
        url, st, ct, body, err = d.result()
        if (st == 200 and _is_html_like(ct, body)) or (st in (403, 503) and ("cloudflare" in body.lower() or "just a moment" in body.lower())):
            for p in pending: p.cancel()
            return url, st, ct, body, ""
    for p in pending:
        try: p.cancel()
        except Exception: pass
    return "", 0, "", "", "All attempts failed"

def _squash_html(html: str, max_chars=4000) -> str:
    if not html:
        return ""
    text = re.sub(r"(?is)<script.*?>.*?</script>", " ", html)
    text = re.sub(r"(?is)<style.*?>.*?</style>", " ", text)
    text = re.sub(r"(?is)<[^>]+>", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text[:max_chars]

# --- Category rules ---
def _enforce_single_main_category(raw_value):
    if not raw_value:
        return None
    parts = [p.strip() for p in str(raw_value).split(",") if p.strip()]
    parts = [p for p in parts if p in ALLOWED_CATS]
    if not parts:
        return None
    for p in parts:
        if p in PRODUCT_CATS:
            return p
    if "Unisex" in parts or ("Men" in parts and "Women" in parts):
        return "Unisex"
    if "Kidswear" in parts:
        return "Kidswear"
    if "Men" in parts:
        return "Men"
    if "Women" in parts:
        return "Women"
    return parts[0]

# --- LLM plumbing (rate-limit safe) ---
_llm_sem = asyncio.Semaphore(A4_LLM_CONCURRENCY)
_llm_lock = asyncio.Lock()
_last_llm_ts = 0.0

async def _llm_throttle():
    global _last_llm_ts
    async with _llm_lock:
        now = time.monotonic()
        wait = max(0.0, (_last_llm_ts + A4_LLM_MIN_DELAY_MS/1000.0) - now)
        if wait > 0:
            await asyncio.sleep(wait)
        _last_llm_ts = time.monotonic()

def _extract_json_loose(s: str):
    if not s:
        return None
    m = re.search(r'\{.*\}', s, flags=re.DOTALL)
    if not m:
        return None
    try:
        return json.loads(m.group(0))
    except Exception:
        return None

async def _llm_complete_json(messages):
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY not set")

    try:
        from openai import OpenAI
        client = OpenAI(api_key=api_key)
        use_modern = True
    except Exception:
        use_modern = False

    backoff = 0.8
    for attempt in range(1, A4_LLM_MAX_RETRIES + 1):
        await _llm_sem.acquire()
        try:
            await _llm_throttle()
            if use_modern:
                try:
                    resp = client.chat.completions.create(
                        model=A4_LLM_MODEL,
                        response_format={"type": "json_object"},
                        messages=messages,
                    )
                    content = resp.choices[0].message.content
                except Exception:
                    resp = client.chat.completions.create(
                        model=A4_LLM_MODEL,
                        messages=messages,
                    )
                    content = resp.choices[0].message.content
            else:
                import openai
                openai.api_key = api_key
                resp = openai.ChatCompletion.create(model=A4_LLM_MODEL, messages=messages)
                content = resp["choices"][0]["message"]["content"]

            try:
                return json.loads(content)
            except Exception:
                data = _extract_json_loose(content)
                if data is not None:
                    return data
                raise RuntimeError("LLM returned non-JSON")
        except Exception:
            if attempt >= A4_LLM_MAX_RETRIES:
                raise
            base = backoff * (2 ** (attempt - 1))
            jitter = 0.15 + 0.25 * (attempt % 3)
            await asyncio.sleep(base + jitter)
        finally:
            _llm_sem.release()

# --- Prompts & classifiers ---
def _system_prompt():
    return (
        "You are a precise catalog classifier for Indian fashion/e-commerce brands.\n"
        "Decide with NO guesswork. If insufficient evidence, set status=0.\n\n"
        "Return exactly ONE main category for `website_for` using ONLY this set (case-sensitive):\n"
        f"{', '.join(ALLOWED_CATS)}\n\n"
        "Selection rules:\n"
        "1) If the brand is primarily a product category (Footwear, Accessories, Jewellery, Lingerie, Beauty, Home & Living, Sportswear, Electronics), choose that ONLY.\n"
        "2) Otherwise, audience: Men, Women, Unisex, Kidswear.\n"
        "3) If you cannot determine confidently, set status=0.\n\n"
        "Also provide 2–6 concise assortment `tags` (array of strings). No duplicates."
    )

def _examples_block():
    e1 = """{
  "status": 1,
  "website_for": "Footwear",
  "tags": ["Sneakers", "Casual Shoes", "Slip-ons"],
  "confidence": 0.9
}"""
    e2 = """{
  "status": 1,
  "website_for": "Unisex",
  "tags": ["Streetwear", "Oversized T-shirts", "Hoodies"],
  "confidence": 0.86
}"""
    e0 = """{
  "status": 0,
  "website_for": null,
  "tags": [],
  "confidence": 0.0
}"""
    return f"Valid examples:\n{e1}\n\n{e2}\n\nIf unsure:\n{e0}\n"

async def _classify_brand_only(site: str):
    host = urlparse(_normalize_home(site)).netloc or site
    user = (
        f"Brand/Homepage Host: {host}\n\n"
        "No page content provided.\n\n"
        "Output JSON ONLY (no code fences, no extra keys) with this schema:\n"
        "{\n"
        '  "status": 0 or 1,\n'
        '  "website_for": string from allowed set or null,\n'
        '  "tags": array of 2-6 short strings,\n'
        '  "confidence": float 0..1\n'
        "}\n\n"
        f"{_examples_block()}"
        "Constraints:\n"
        "- ONE value for website_for only.\n"
        "- status=0 if unsure; do not guess.\n"
        "- Use your general knowledge of Indian brands if applicable; if still unsure, return status=0.\n"
    )
    msgs = [{"role": "system", "content": _system_prompt()},
            {"role": "user", "content": user}]
    try:
        data = await _llm_complete_json(msgs)
    except Exception:
        return dict(status=0, website_for=None, tags=[], confidence=None, model=A4_LLM_MODEL)

    status = int(data.get("status", 0))
    website_for = _enforce_single_main_category(data.get("website_for"))
    tags = data.get("tags") or []
    if isinstance(tags, str):
        try:
            maybe = json.loads(tags)
            tags = maybe if isinstance(maybe, list) else [t.strip() for t in tags.split(",") if t.strip()]
        except Exception:
            tags = [t.strip() for t in tags.split(",") if t.strip()]
    if not isinstance(tags, list):
        tags = []
    tags = [str(t).strip() for t in tags if str(t).strip()]
    if len(tags) > 6:
        tags = tags[:6]
    try:
        conf = float(data.get("confidence")) if data.get("confidence") is not None else None
    except Exception:
        conf = None

    if website_for not in ALLOWED_CATS:
        status, website_for, tags, conf = 0, None, [], None
    if status not in (0, 1):
        status, website_for, tags, conf = 0, None, [], None

    return dict(status=status, website_for=website_for, tags=tags, confidence=conf, model=A4_LLM_MODEL)

async def _classify_with_html(site: str, homepage_html: str):
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        return dict(status=0, website_for=None, tags=[], confidence=None, model=A4_LLM_MODEL)

    squashed = _squash_html(homepage_html, 4000)
    host = urlparse(_normalize_home(site)).netloc or site
    have_html = bool(squashed.strip())
    html_block = f"Homepage text (truncated):\n{squashed}\n\n" if have_html else "Homepage text unavailable.\n\n"

    user = (
        f"Brand/Homepage Host: {host}\n\n"
        f"{html_block}"
        "Output JSON ONLY (no code fences, no extra keys) with this schema:\n"
        "{\n"
        '  "status": 0 or 1,\n'
        '  "website_for": string from allowed set or null,\n'
        '  "tags": array of 2-6 short strings,\n'
        '  "confidence": float 0..1\n'
        "}\n\n"
        f"{_examples_block()}"
        "Constraints:\n"
        "- ONE value for website_for only.\n"
        "- status=0 if unsure; do not guess.\n"
        "- You MAY use both the provided text and your general knowledge; if still unsure, return status=0.\n"
    )
    msgs = [{"role": "system", "content": _system_prompt()},
            {"role": "user", "content": user}]
    try:
        data = await _llm_complete_json(msgs)
    except Exception:
        return dict(status=0, website_for=None, tags=[], confidence=None, model=A4_LLM_MODEL)

    status = int(data.get("status", 0))
    website_for = _enforce_single_main_category(data.get("website_for"))
    tags = data.get("tags") or []
    if isinstance(tags, str):
        try:
            maybe = json.loads(tags)
            tags = maybe if isinstance(maybe, list) else [t.strip() for t in tags.split(",") if t.strip()]
        except Exception:
            tags = [t.strip() for t in tags.split(",") if t.strip()]
    if not isinstance(tags, list):
        tags = []
    tags = [str(t).strip() for t in tags if str(t).strip()]
    if len(tags) > 6:
        tags = tags[:6]
    try:
        conf = float(data.get("confidence")) if data.get("confidence") is not None else None
    except Exception:
        conf = None

    if website_for not in ALLOWED_CATS:
        status, website_for, tags, conf = 0, None, [], None
    if status not in (0, 1):
        status, website_for, tags, conf = 0, None, [], None

    return dict(status=status, website_for=website_for, tags=tags, confidence=conf, model=A4_LLM_MODEL)

# --- Orchestrator ---
async def agent4_populate_audience_tags():
    print("[A4] Starting Agent 4 (LLM-first audience & tag classifier)…", flush=True)

    with sqlite3.connect(DB_PATH) as conn:
        _clone_populated_table_schema(conn)
        _upsert_missing_rows(conn)
        _backfill_new_rows_from_existing(conn)
        sites = _sites_to_process(conn)

    if not sites:
        print("[A4] Nothing to classify (either empty or already populated).", flush=True)
        return

    headers = {
        "Accept-Encoding": "gzip, deflate",
        "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                       "AppleWebKit/537.36 (KHTML, like Gecko) "
                       "Chrome/118.0.0.0 Safari/537.36"),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-IN,en;q=0.9",
        "Cache-Control": "no-cache",
    }
    timeout = aiohttp.ClientTimeout(sock_connect=6, sock_read=10)
    connector_https = aiohttp.TCPConnector(limit_per_host=6, ttl_dns_cache=300)
    connector_http  = aiohttp.TCPConnector(ssl=False, limit_per_host=6, ttl_dns_cache=300)

    sem_sites = asyncio.Semaphore(A4_CONCURRENCY)

    async with aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector_https) as session_https, \
               aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector_http) as session_http:

        async def handle_site(site):
            async with sem_sites:
                brand_only = await _classify_brand_only(site)
                accepted = (brand_only["status"] == 1 and
                            brand_only.get("website_for") in ALLOWED_CATS and
                            (brand_only.get("confidence") or 0) >= A4_MIN_CONFIDENCE)
                if accepted:
                    result = brand_only
                    mode = "brand-only"
                else:
                    url, st, ct, body, err = await _fetch_best_html(session_https, session_http, site)
                    if st == 0:
                        print(f"[A4] {site} fetch issue: status=0 ct='' err='{err}'", flush=True)
                    elif st != 200 or (ct and "html" not in ct.lower()):
                        print(f"[A4] {site} fetch non-HTML/blocked: status={st} ct='{ct}' url={url}", flush=True)
                    result = await _classify_with_html(site, body)
                    mode = "html-fallback"

                with sqlite3.connect(DB_PATH) as conn:
                    _write_site_result(
                        conn,
                        site=site,
                        website_for=result.get("website_for"),
                        tags=result.get("tags"),
                        status=result.get("status", 0),
                        confidence=result.get("confidence"),
                        model_used=result.get("model"),
                    )

                label = result.get("website_for") or "UNKNOWN"
                conf  = result.get("confidence")
                print(f"[A4] {site} [{mode}] → status={result.get('status',0)} website_for={label} conf={conf} tags={result.get('tags')}", flush=True)

        await asyncio.gather(*(handle_site(s) for s in sites))

    print("[A4] Agent 4 done.", flush=True)

# --- Main (Agent 5 intentionally skipped) ---
# def _run_main(coro):
#     try:
#         loop = asyncio.get_running_loop()
#     except RuntimeError:
#         loop = None
#     if loop and loop.is_running():
#         import nest_asyncio; nest_asyncio.apply()
#         return asyncio.create_task(coro)
#     else:
#         asyncio.run(coro)
#         return None



# if __name__ == "__main__":
#     t = _run_main(agent4_populate_audience_tags())
#     await t
    

# ---------------------- Main (Agent 5 temporarily skipped) ----------------------

In [1]:
# agent5_fresh_start_patched.ipynb cell
# ------------------------------------------------------------
# Agent 5 — Fresh-start, simple & loud progress. (PATCHED)
# Reads:  populated_product_sitemaps (status=1)
# Uses:   ONLY product_sitemaps_json
# Skips:  website_for LIKE '%Kidswear%'
# Writes: product_urls (upsert) + a5_processed_candidates (resume)
# Exports: D:\museai\data\exports\A5_snapshot.json
#
# Patch highlights:
# - Fixes the "https:// <gap>" bug via aggressive whitespace stripping.
# - Tries www/no-www variants automatically for sitemap fetches.
# - Adds WordPress discovery: wp-sitemap.xml, sitemap_index.xml, wp-sitemap-posts-product-*.xml,
#   Yoast/RankMath product maps (product-sitemap*.xml).
# - Site-level fallback: if a site has < MIN_GOOD (default=5) product URLs in DB after Phase 2,
#   we run discovery (robots/common/WP) and ingest from those sitemaps to "replace" the weak coverage
#   with good product URLs (non-destructive upsert).
# - Keeps your concurrency + heartbeat style and your table layout.
# ------------------------------------------------------------

import os, re, json, time, random, asyncio, sqlite3, aiohttp
import nest_asyncio; nest_asyncio.apply()
from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode, urljoin
from collections import defaultdict, Counter
from datetime import datetime
from typing import Dict, List, Tuple, Any, Optional
from xml.etree import ElementTree as ET

# ==============================
# Config (override via env if needed)
# ==============================
DB_PATH = os.getenv("CRAWLER_DB", os.getenv("DB_PATH", r"D:\museai\data\db\crawler_meta.db"))

POP_TABLE = os.getenv("A5_POP_TABLE", "populated_product_sitemaps")
PRODUCT_URLS_TABLE = os.getenv("A5_PRODUCT_URLS_TABLE", "product_urls")
PROCESSED_TABLE = os.getenv("A5_PROCESSED_TABLE", "a5_processed_candidates")

EXPORT_DIR = os.getenv("A5_EXPORT_DIR", r"D:\museai\data\exports")
SNAPSHOT_FILE = os.path.join(EXPORT_DIR, "A5_snapshot.json")

# Pacing (balanced: visible progress, low 429s)
A5_CONCURRENCY       = int(os.getenv("A5_CONCURRENCY", "8"))
A5_PER_HOST_MIN_S    = float(os.getenv("A5_PER_HOST_MIN_S", "0.9"))
A5_MAX_RETRIES       = int(os.getenv("A5_MAX_RETRIES", "3"))
A5_CONNECT_TIMEOUT_S = float(os.getenv("A5_CONNECT_TIMEOUT_S", "12"))
A5_READ_TIMEOUT_S    = float(os.getenv("A5_READ_TIMEOUT_S", "25"))
# Fallback (for heavy/slow sitemaps only)
A5_FALLBACK_CONNECT_S = float(os.getenv("A5_FALLBACK_CONNECT_S", "20"))
A5_FALLBACK_READ_S    = float(os.getenv("A5_FALLBACK_READ_S", "90"))


# Heartbeat
HEARTBEAT_SEC        = int(os.getenv("A5_HEARTBEAT_SEC", "20"))

# Filters
SKIP_WEBSITE_FOR = os.getenv("A5_SKIP_WEBSITE_FOR", "Kidswear").lower().strip()
ONLY_SITES = {s.strip().lower() for s in os.getenv("A5_ONLY_SITES", "").split(",") if s.strip()}
SKIP_SITES = {s.strip().lower() for s in os.getenv("A5_SKIP_SITES", "").split(",") if s.strip()}

# Minimum "good" URLs per site; if below this we run discovery fallback
MIN_GOOD = int(os.getenv("A5_MIN_GOOD", "5"))

_HEADERS = {
    "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                   "AppleWebKit/537.36 (KHTML, like Gecko) "
                   "Chrome/127.0.0.0 Safari/537.36"),
    "Accept": "application/xml,text/xml,application/xhtml+xml,text/html;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Cache-Control": "no-cache",
    "Pragma": "no-cache",
}

TRANSIENT_STATUSES = {429, 503}  # skip now, retry on next run
FORBIDDEN_STATUSES = {403}

# ==============================
# Logging
# ==============================
def log(msg: str):
    print(f"[A5 {datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True)

# ==============================
# FS & DB
# ==============================
def ensure_dirs():
    os.makedirs(EXPORT_DIR, exist_ok=True)

def connect_db() -> sqlite3.Connection:
    conn = sqlite3.connect(DB_PATH)
    conn.execute("PRAGMA journal_mode=WAL;")
    conn.execute("PRAGMA synchronous=NORMAL;")
    conn.execute("PRAGMA foreign_keys=ON;")
    return conn

def table_columns(conn: sqlite3.Connection, table: str) -> Dict[str, str]:
    cols = {}
    for cid, name, ctype, notnull, dflt, pk in conn.execute(f"PRAGMA table_info({table});"):
        cols[name] = (ctype or "").upper()
    return cols

def migrate_add_missing_columns(conn: sqlite3.Connection, table: str, required: Dict[str, str]):
    existing = table_columns(conn, table)
    for col, decl in required.items():
        if col not in existing:
            conn.execute(f"ALTER TABLE {table} ADD COLUMN {decl};")
    conn.commit()

def ensure_product_urls_table(conn: sqlite3.Connection):
    conn.execute(f"""
        CREATE TABLE IF NOT EXISTS {PRODUCT_URLS_TABLE}(
            site TEXT NOT NULL,
            url TEXT NOT NULL,
            source TEXT NOT NULL DEFAULT '',
            is_required INTEGER NOT NULL DEFAULT 1,
            discovered_at TEXT NOT NULL,
            PRIMARY KEY(site, url)
        );
    """)
    migrate_add_missing_columns(conn, PRODUCT_URLS_TABLE, {
        "source":       "source TEXT NOT NULL DEFAULT ''",
        "is_required":  "is_required INTEGER NOT NULL DEFAULT 1",
        "discovered_at":"discovered_at TEXT"
    })
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{PRODUCT_URLS_TABLE}_site ON {PRODUCT_URLS_TABLE}(site);")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{PRODUCT_URLS_TABLE}_src ON {PRODUCT_URLS_TABLE}(source);")
    conn.commit()

def ensure_processed_table(conn: sqlite3.Connection):
    conn.execute(f"""
        CREATE TABLE IF NOT EXISTS {PROCESSED_TABLE}(
            site TEXT NOT NULL,
            url TEXT NOT NULL,
            processed_ok INTEGER,
            status INTEGER,
            note TEXT NOT NULL DEFAULT '',
            processed_at TEXT,
            PRIMARY KEY(site, url)
        );
    """)
    migrate_add_missing_columns(conn, PROCESSED_TABLE, {
        "processed_ok": "processed_ok INTEGER",
        "status":       "status INTEGER",
        "note":         "note TEXT NOT NULL DEFAULT ''",
        "processed_at": "processed_at TEXT"
    })
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{PROCESSED_TABLE}_site ON {PROCESSED_TABLE}(site);")
    conn.commit()

def has_products_for_source(conn: sqlite3.Connection, site: str, source: str) -> bool:
    if not source:
        return False
    cur = conn.execute(f"SELECT 1 FROM {PRODUCT_URLS_TABLE} WHERE site=? AND source=? LIMIT 1;", (site, source))
    return cur.fetchone() is not None

def already_processed(conn: sqlite3.Connection, site: str, url: str) -> bool:
    if not url:
        return False
    # normalize url key
    url = canonicalize_url(url)
    cur = conn.execute(f"SELECT 1 FROM {PROCESSED_TABLE} WHERE site=? AND url=? LIMIT 1;", (site, url))
    if cur.fetchone() is not None:
        return True
    if has_products_for_source(conn, site, url):
        return True
    return False

def mark_processed(conn: sqlite3.Connection, site: str, url: str, ok: int, status: int, note: str):
    url = canonicalize_url(url)
    conn.execute(f"""
        INSERT INTO {PROCESSED_TABLE}(site, url, processed_ok, status, note, processed_at)
        VALUES(?,?,?,?,?,datetime('now'))
        ON CONFLICT(site, url) DO UPDATE SET
            processed_ok=excluded.processed_ok,
            status=excluded.status,
            note=excluded.note,
            processed_at=excluded.processed_at;
    """, (site, url, ok, status, note))
    conn.commit()

def upsert_product_url(conn: sqlite3.Connection, site: str, url: str, source: str, is_required: int = 1):
    if not url:
        return
    url = canonicalize_url(url)
    source = canonicalize_url(source or site)
    conn.execute(f"""
        INSERT INTO {PRODUCT_URLS_TABLE}(site, url, source, is_required, discovered_at)
        VALUES(?,?,?,?,datetime('now'))
        ON CONFLICT(site, url) DO UPDATE SET
            source=excluded.source,
            is_required=excluded.is_required,
            discovered_at=excluded.discovered_at;
    """, (site.rstrip("/"), url, source or site, int(is_required)))
    conn.commit()

def count_products_for_site(conn: sqlite3.Connection, site: str) -> int:
    cur = conn.execute(f"SELECT COUNT(1) FROM {PRODUCT_URLS_TABLE} WHERE site=?", (site,))
    row = cur.fetchone()
    return int(row[0] or 0)

# ==============================
# URL utils
# ==============================
_SPACE_RE = re.compile(r"\s+")
_DROP_PARAMS = {"utm_source","utm_medium","utm_campaign","utm_term","utm_content",
                "gclid","fbclid","igshid","mc_cid","mc_eid","yclid","pid","cid","affid","ref"}

def strip_spaces(u: Optional[str]) -> str:
    if not u:
        return ""
    # aggressive: remove all whitespace anywhere (fixes "https:// kisah.in")
    return _SPACE_RE.sub("", str(u))

def canonicalize_url(url: Optional[str]) -> str:
    if not url:
        return ""
    if not isinstance(url, str):
        url = str(url)
    try:
        url = strip_spaces(url).strip()
        p = urlsplit(url)
        qs = parse_qsl(p.query, keep_blank_values=True)
        qs = [(k, v) for (k, v) in qs
              if k not in _DROP_PARAMS and not k.lower().startswith("utm_") and k.lower() != "variant"]
        new_q = urlencode(qs, doseq=True)
        path = re.sub(r"/+", "/", p.path)
        # strip trailing slash except root
        if path != "/" and path.endswith("/"):
            path = path[:-1]
        return urlunsplit((p.scheme or "https", p.netloc.lower(), path, new_q, ""))  # strip fragment
    except Exception:
        return strip_spaces(url).strip()

def host_of(u: Optional[str]) -> str:
    try:
        return urlsplit(strip_spaces(u) or "").netloc.lower()
    except Exception:
        return ""

def same_site(url: Optional[str], site: Optional[str]) -> bool:
    h1, h2 = host_of(url), host_of(site)
    if not h1 or not h2:
        return False
    return (h1 == h2) or h1.endswith("." + h2) or h2.endswith("." + h1)

def toggle_www(u: str) -> str:
    u = canonicalize_url(u)
    p = urlsplit(u)
    host = p.netloc
    if host.startswith("www."):
        host = host[4:]
    else:
        host = "www." + host
    return urlunsplit((p.scheme or "https", host, p.path or "/", p.query, ""))

_PRODUCT_URL_RE = re.compile(
    r"(/products?/)|"
    r"(/collections/[^/]+/products/)|"
    r"(/p/)|(/item/)|(/shop/[^/?#]+$)|"
    r"(/product-[^/?#]+$)|"
    r"(\.html?\b)",
    re.I
)
def looks_like_product_page_url(url: str) -> bool:
    return bool(url and _PRODUCT_URL_RE.search(url))

# ==============================
# XML parsing
# ==============================
def strip_ns(tag: str) -> str:
    return tag.split("}", 1)[1] if "}" in tag else tag

def parse_xml(text: str) -> Tuple[str, List[str]]:
    try:
        root = ET.fromstring(text)
    except Exception:
        return "", []
    tag = strip_ns(root.tag).lower()
    locs: List[str] = []
    if tag == "sitemapindex":
        for sm in root.iter():
            if strip_ns(sm.tag).lower() == "loc" and sm.text:
                locs.append(sm.text.strip())
        return "sitemapindex", locs
    elif tag == "urlset":
        for el in root.iter():
            if strip_ns(el.tag).lower() == "loc" and el.text:
                locs.append(el.text.strip())
        return "urlset", locs
    return "", []

def xmlish(text: str, ctype: str) -> bool:
    if ctype and "xml" in ctype.lower():
        return True
    head = (text or "")[:1024].lower()
    return ("<urlset" in head) or ("<sitemapindex" in head) or head.strip().startswith("<?xml")

# ==============================
# Throttle / HTTP
# ==============================
class DomainThrottler:
    def __init__(self, min_gap_s: float):
        self.min_gap_s = max(0.0, float(min_gap_s))
        self._locks: Dict[str, asyncio.Lock] = {}
        self._last: Dict[str, float] = {}

    async def wait(self, host: str):
        if host not in self._locks:
            self._locks[host] = asyncio.Lock()
        async with self._locks[host]:
            now = time.monotonic()
            last = self._last.get(host, 0.0)
            wait_s = self.min_gap_s - (now - last)
            if wait_s > 0:
                await asyncio.sleep(wait_s + random.uniform(0.05, 0.25))
            self._last[host] = time.monotonic()

async def fetch_text(session: aiohttp.ClientSession, url: str) -> Tuple[int, str, str]:
    """
    Basic fetch (kept for compatibility). Use get_xmlish() wrapper for sitemap work.
    """
    url = canonicalize_url(url)
    backoff = 0.9
    status, ctype, text = 0, "", ""
    for attempt in range(1, A5_MAX_RETRIES + 1):
        try:
            async with session.get(url, headers=_HEADERS, allow_redirects=True) as resp:
                status = resp.status
                ctype = resp.headers.get("content-type","")
                raw = await resp.read()
                try:    text = raw.decode("utf-8")
                except: text = raw.decode("latin-1", errors="ignore")
                if status in TRANSIENT_STATUSES and attempt < A5_MAX_RETRIES:
                    await asyncio.sleep(backoff * attempt + random.uniform(0, 0.6))
                    continue
                return status, ctype, text
        except Exception:
            if attempt < A5_MAX_RETRIES:
                await asyncio.sleep(backoff * attempt + random.uniform(0, 0.6))
                continue
            return 0, "", ""
    return status, ctype, text

async def get_xmlish(session: aiohttp.ClientSession, url: str) -> Tuple[str, int, str, str]:
    """
    Try url, then www/no-www variant. Returns (final_url, status, ctype, text).
    """
    url = canonicalize_url(url)
    status, ctype, text = await fetch_text(session, url)
    if status == 200 and xmlish(text, ctype):
        return url, status, ctype, text
    # try toggle variant
    alt = toggle_www(url)
    if alt != url:
        status2, ctype2, text2 = await fetch_text(session, alt)
        if status2 == 200 and xmlish(text2, ctype2):
            return alt, status2, ctype2, text2
        # If original was OK (even if not xml), prefer original outcome
        if status == 200:
            return url, status, ctype, text
        return alt, status2, ctype2, text2
    return url, status, ctype, text

# ==============================
# Discovery helpers (fallback when site < MIN_GOOD)
# ==============================
async def robots_sitemaps(session: aiohttp.ClientSession, site: str) -> List[str]:
    base = canonicalize_url(site.rstrip("/"))
    robots = urljoin(base + "/", "robots.txt")
    u, status, ctype, text = await get_xmlish(session, robots)  # may not be XML, but we only want text
    out: List[str] = []
    if status == 200 and text:
        for line in text.splitlines():
            if line.lower().startswith("sitemap:"):
                raw = line.split(":", 1)[1].strip()
                if raw:
                    out.append(canonicalize_url(raw))
    return list(dict.fromkeys(out))

async def common_sitemaps(session: aiohttp.ClientSession, site: str) -> List[str]:
    base = canonicalize_url(site.rstrip("/"))
    candidates = [
        "sitemap.xml",
        "sitemap_index.xml",
        "sitemap-products.xml",
        "sitemap_product.xml",
        "sitemap_products_1.xml",
        "product-sitemap.xml",
        "product-sitemap1.xml",
    ]
    out: List[str] = []
    for path in candidates:
        sm = urljoin(base + "/", path)
        final, status, ctype, text = await get_xmlish(session, sm)
        if status == 200 and xmlish(text, ctype):
            out.append(final)
    # dedupe
    return list(dict.fromkeys(out))

async def wordpress_sitemaps(session: aiohttp.ClientSession, site: str) -> List[str]:
    base = canonicalize_url(site.rstrip("/"))
    seeds = [
        urljoin(base + "/", "wp-sitemap.xml"),
        urljoin(base + "/", "sitemap_index.xml"),
        urljoin(base + "/", "product-sitemap.xml"),
        urljoin(base + "/", "product-sitemap1.xml"),
    ]
    productish: List[str] = []
    seen = set()
    for sm in seeds:
        final, status, ctype, text = await get_xmlish(session, sm)
        if status != 200 or not xmlish(text, ctype):
            continue
        tag, locs = parse_xml(text)
        if tag == "sitemapindex":
            for loc in locs:
                ll = (loc or "").lower()
                if any(k in ll for k in ("product","wc-product","tulaproduct","woo","wp-sitemap-posts-product")):
                    loc_c = canonicalize_url(loc)
                    if loc_c not in seen:
                        seen.add(loc_c); productish.append(loc_c)
        elif tag == "urlset":
            if any(k in final.lower() for k in ("product","tulaproduct","wp-sitemap-posts-")):
                loc_c = canonicalize_url(final)
                if loc_c not in seen:
                    seen.add(loc_c); productish.append(loc_c)
    # Fallback: collect all wp-sitemap-posts-* urlsets (we'll filter URLs later)
    final, status, ctype, text = await get_xmlish(session, urljoin(base + "/", "wp-sitemap.xml"))
    if status == 200 and xmlish(text, ctype):
        tag, locs = parse_xml(text)
        for loc in locs:
            if "wp-sitemap-posts-" in (loc or ""):
                loc_c = canonicalize_url(loc)
                if loc_c not in seen:
                    seen.add(loc_c); productish.append(loc_c)
    return productish

# ==============================
# Core processing
# ==============================
async def process_urlset_into_products(conn: sqlite3.Connection, site: str, urlset_url: str,
                                       locs: List[str], source_hint: str) -> int:
    urls = [canonicalize_url(u) for u in locs if u and same_site(u, site)]
    # If the urlset filename doesn't scream "product", filter by path
    need_filter = not any(k in urlset_url.lower() for k in ("product", "products", "wp-sitemap-posts-product"))
    chosen = [u for u in urls if looks_like_product_page_url(u)] if need_filter else urls
    added = 0
    for pu in chosen:
        upsert_product_url(conn, site, pu, source=source_hint, is_required=1)
        added += 1
    return added

async def process_sitemap(conn: sqlite3.Connection, site: str, sitemap_url: str,
                          session: aiohttp.ClientSession,
                          throttler: DomainThrottler,
                          stats: Dict[str,int],
                          added_by_site: Dict[str,int]):
    if already_processed(conn, site, sitemap_url):
        stats["skipped"] += 1
        return

    host = host_of(sitemap_url)
    if host:
        await throttler.wait(host)

    final_url, status, ctype, text = await get_xmlish(session, sitemap_url)

    if status in TRANSIENT_STATUSES or status == 0:
        # skip for now; resume next run
        log(f"[skip] {site} {final_url} → transient {status}")
        return
    if status in FORBIDDEN_STATUSES or status >= 400:
        mark_processed(conn, site, final_url, 0, status, "unusable")
        stats["processed"] += 1
        log(f"[bad] {site} {final_url} → status={status} unusable")
        return

    if xmlish(text, ctype):
        tag, locs = parse_xml(text)
        locs = [canonicalize_url(u) for u in locs if u]
        if tag == "sitemapindex":
            children = [u for u in locs if same_site(u, site)]
            grabbed = 0
            for child in children:
                if already_processed(conn, site, child):
                    continue
                ch = host_of(child)
                if ch:
                    await throttler.wait(ch)
                c_final, s2, ct2, t2 = await get_xmlish(session, child)
                if s2 in TRANSIENT_STATUSES or s2 == 0:
                    log(f"[skip-child] {site} {c_final} → transient {s2}")
                    continue
                if s2 == 200 and xmlish(t2, ct2):
                    tag2, urls2 = parse_xml(t2)
                    urls2 = [u for u in urls2 if same_site(u, site)]
                    if tag2 == "urlset":
                        got = await process_urlset_into_products(conn, site, c_final, urls2, source_hint=c_final)
                        grabbed += got
                        mark_processed(conn, site, c_final, 1, 200, f"urlset:{got}")
                    else:
                        mark_processed(conn, site, c_final, 1, 200, f"{tag2}")
                else:
                    if s2 in FORBIDDEN_STATUSES or s2 >= 400:
                        mark_processed(conn, site, c_final, 0, s2, "child-unusable")
            mark_processed(conn, site, final_url, 1, 200, f"sitemapindex:{len(children)}")
            stats["processed"] += 1
            stats["added"] += grabbed
            added_by_site[site] += grabbed
            log(f"[index] {site} {final_url} → children={len(children)} products={grabbed}")
            return

        elif tag == "urlset":
            got = await process_urlset_into_products(conn, site, final_url, locs, source_hint=final_url)
            mark_processed(conn, site, final_url, 1, 200, f"urlset:{got}")
            stats["processed"] += 1
            stats["added"] += got
            added_by_site[site] += got
            log(f"[urlset] {site} {final_url} → +{got}")
            return

    # Non-XML
    mark_processed(conn, site, final_url, 0, status, "unusable")
    stats["processed"] += 1
    log(f"[bad] {site} {final_url} → status={status} unusable")

# ==============================
# FINAL PROBE (helper)
# ==============================
async def probe_sitemap(conn, site: str, sm_url: str, session: aiohttp.ClientSession, throttler: DomainThrottler) -> int:
    host = host_of(sm_url)
    if host:
        await throttler.wait(host)
    final, status, ctype, text = await get_xmlish(session, sm_url)
    if status != 200 or not xmlish(text, ctype):
        return 0
    tag, locs = parse_xml(text)
    if tag != "urlset" or not locs:
        return 0
    chosen = [u for u in locs if looks_like_product_page_url(u) and same_site(u, site)]
    added = 0
    for u in chosen:
        upsert_product_url(conn, site, u, source=sm_url, is_required=1)
        added += 1
    return added

# ==============================

# ==============================
# ==============================
# PROBE-style fetchers (httpx → aiohttp → curl) for fallback
# ==============================
import gzip, subprocess, shutil

HEADERS_ROTATION = [
    {
        "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                       "AppleWebKit/537.36 (KHTML, like Gecko) "
                       "Chrome/127.0.0.0 Safari/127.0.0.0"),
        "Accept": "application/xml,text/xml,application/xhtml+xml,text/html;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate",
        "Cache-Control": "no-cache",
        "Pragma": "no-cache",
        "Connection": "keep-alive",
        "DNT": "1",
    },
    {
        "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) "
                       "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15"),
        "Accept": "*/*",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate",
        "Connection": "keep-alive",
        "DNT": "1",
    },
]

def site_base(u: str) -> str:
    p = urlsplit(u or "")
    return (p.scheme or "https").lower() + "://" + (p.netloc or "").lower()

def canonical_keep_query(u: str) -> str:
    if not u: return ""
    p = urlsplit(strip_spaces(u))
    path = re.sub(r"/+", "/", p.path or "/")
    return urlunsplit((p.scheme or "https", (p.netloc or "").lower(), path, p.query, ""))

async def _fetch_text_httpx(url: str):
    try:
        import httpx
    except Exception as e:
        return None, f"httpx-missing:{type(e).__name__}", ""
    timeout = httpx.Timeout(connect=A5_CONNECT_TIMEOUT_S, read=A5_READ_TIMEOUT_S, write=30.0, pool=30.0)
    async with httpx.AsyncClient(http2=True, timeout=timeout, verify=True, trust_env=True, follow_redirects=True) as client:
        for attempt in range(1, A5_MAX_RETRIES+1):
            for headers in HEADERS_ROTATION:
                try:
                    r = await client.get(url, headers=headers)
                    content = r.content
                    try:    text = content.decode("utf-8")
                    except: text = content.decode("latin-1", errors="ignore")
                    if r.status_code in TRANSIENT_STATUSES and attempt < A5_MAX_RETRIES:
                        await asyncio.sleep(0.7*attempt + random.uniform(0,0.5))
                        continue
                    return r.status_code, r.headers.get("content-type",""), text
                except Exception as e:
                    last = f"httpx:{type(e).__name__}:{str(e)[:120]}"
                    if attempt < A5_MAX_RETRIES:
                        await asyncio.sleep(0.5*attempt + random.uniform(0,0.5))
                        continue
                    return None, last, ""
    return None, "httpx-unknown", ""

async def _fetch_text_aiohttp(url: str):
    timeout = aiohttp.ClientTimeout(total=None, connect=A5_CONNECT_TIMEOUT_S, sock_read=A5_READ_TIMEOUT_S)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        for attempt in range(1, A5_MAX_RETRIES+1):
            for headers in HEADERS_ROTATION:
                try:
                    async with session.get(url, headers=headers, allow_redirects=True) as resp:
                        raw = await resp.read()
                        ctype = resp.headers.get("content-type","")
                        if (urlsplit(url).path or "").lower().endswith(".gz"):
                            try:
                                raw = gzip.decompress(raw)
                                ctype = "application/xml"
                            except Exception:
                                pass
                        try:    text = raw.decode("utf-8")
                        except: text = raw.decode("latin-1", errors="ignore")
                        if resp.status in TRANSIENT_STATUSES and attempt < A5_MAX_RETRIES:
                            await asyncio.sleep(0.7*attempt + random.uniform(0,0.5))
                            continue
                        return resp.status, ctype, text
                except Exception as e:
                    last = f"aiohttp:{type(e).__name__}:{str(e)[:120]}"
                    if attempt < A5_MAX_RETRIES:
                        await asyncio.sleep(0.5*attempt + random.uniform(0,0.5))
                        continue
                    return None, last, ""
    return None, "aiohttp-unknown", ""

def _fetch_text_curl_blocking(url: str):
    if not shutil.which("curl"):
        return None, "curl-missing", ""
    for headers in HEADERS_ROTATION:
        args = ["curl","-LsS","--compressed","--max-time",str(int(A5_CONNECT_TIMEOUT_S + A5_READ_TIMEOUT_S))]
        for k,v in headers.items():
            args += ["-H", f"{k}: {v}"]
        args += [url]
        try:
            cp = subprocess.run(args, capture_output=True, check=False)
            body = cp.stdout or b""
            try:    text = body.decode("utf-8")
            except: text = body.decode("latin-1", errors="ignore")
            ctype = "application/xml" if xmlish(text, "application/xml") else ""
            if xmlish(text, ctype):
                return 200, ctype, text
            return 0, ctype, text
        except Exception as e:
            return None, f"curl:{type(e).__name__}:{str(e)[:120]}", ""

class _ProbeThrottle:
    def __init__(self, gap=A5_PER_HOST_MIN_S):
        self.gap = max(0.0, float(gap))
        self._locks = {}
        self._last  = {}

    async def wait(self, url: str):
        host = urlsplit(url).netloc.lower()
        if host not in self._locks:
            self._locks[host] = asyncio.Lock()
        async with self._locks[host]:
            now = time.monotonic()
            last = self._last.get(host, 0.0)
            wait = self.gap - (now - last)
            if wait > 0:
                await asyncio.sleep(wait + random.uniform(0.05, 0.25))
            self._last[host] = time.monotonic()

async def _fetch_text_probe(url: str, throttler: _ProbeThrottle):
    await throttler.wait(url)
    s, ctype, text = await _fetch_text_httpx(url)
    if s is None or s >= 400 or not xmlish(text, ctype or ""):
        s2, c2, t2 = await _fetch_text_aiohttp(url)
        if s2 and s2 < 400 and xmlish(t2, c2 or ""):
            return s2, c2, t2
        s3, c3, t3 = await asyncio.to_thread(_fetch_text_curl_blocking, url)
        return (s3 or 0), (c3 or ""), (t3 or "")
    return (s or 0), (ctype or ""), (text or "")

def _load_exact_sitemaps_for_site(conn: sqlite3.Connection, site: str) -> List[str]:
    base = site_base(site)
    rows = conn.execute(f"""
        SELECT site, COALESCE(product_sitemaps_json,'')
        FROM {POP_TABLE}
        WHERE status=1
    """).fetchall()
    out: List[str] = []
    for row_site, pjson in rows:
        if not row_site or not pjson:
            continue
        if not same_site(canonicalize_url(row_site), base):
            continue
        urls: List[str] = []
        try:
            data = json.loads(pjson)
            if isinstance(data, list):
                urls = [canonical_keep_query(u) for u in data if isinstance(u, str) and u.strip()]
            elif isinstance(data, dict):
                for key in ("sitemaps","product_sitemaps","urls","products"):
                    if key in data and isinstance(data[key], list):
                        urls += [canonical_keep_query(u) for u in data[key] if isinstance(u, str) and u.strip()]
        except Exception:
            urls = []
        out.extend(urls)
    # de-dupe and keep only same-site
    out = [u for u in dict.fromkeys(out) if same_site(u, base)]
    return out

async def _harvest_products_from_sitemap(site: str, sm_url: str, throttler: _ProbeThrottle) -> List[str]:
    status, ctype, txt = await _fetch_text_probe(sm_url, throttler)
    if status in FORBIDDEN_STATUSES:
        log(f"[fallback] {site} [{status}] forbidden → {sm_url}")
        return []
    if status == 0 and not txt:
        log(f"[fallback] {site} [0] network error → {sm_url}")
        return []
    if not xmlish(txt, ctype or ""):
        preview = (txt or "")[:120].replace("\n"," ")
        log(f"[fallback] {site} [{status}] not-xml/empty → {sm_url} body='{preview}'")
        return []

    tag, locs = parse_xml(txt)
    locs = [canonical_keep_query(u) for u in (locs or []) if u]
    products: List[str] = []

    if tag == "sitemapindex":
        children = [u for u in locs if same_site(u, site)]
        log(f"[fallback] {site} index children={len(children)} ← {sm_url}")
        for child in children:
            s2, ct2, t2 = await _fetch_text_probe(child, throttler)
            if s2 in FORBIDDEN_STATUSES or not xmlish(t2, ct2 or ""):
                continue
            ttag, urls2 = parse_xml(t2)
            if ttag == "urlset":
                urls2 = [canonical_keep_query(u) for u in urls2 if u and same_site(u, site)]
                chosen = [u for u in urls2 if looks_like_product_page_url(u)] or urls2
                products.extend(chosen)
        return list(dict.fromkeys(products))

    if tag == "urlset":
        urls = [u for u in locs if same_site(u, site)]
        chosen = [u for u in urls if looks_like_product_page_url(u)] or urls
        log(f"[fallback] {site} urlset +{len(chosen)} ← {sm_url}")
        return list(dict.fromkeys(chosen))

    log(f"[fallback] {site} unknown tag={tag} ← {sm_url}")
    return []

# ==============================
# Fallback discovery (patched with slow-session timings)

# ==============================
# ==============================
# Site-level fallback using PROBE logic and exact DB sitemaps
# ==============================
async def fallback_discovery_for_site(conn: sqlite3.Connection, site: str,
                                      session: aiohttp.ClientSession,   # unused but kept to match signature
                                      throttler: DomainThrottler,       # unused but kept to match signature
                                      stats: Dict[str,int],
                                      added_by_site: Dict[str,int]) -> int:
    # 1) Load EXACT sitemaps saved for this site (no guessing)
    sitemaps = _load_exact_sitemaps_for_site(conn, site)
    if not sitemaps:
        log(f"[fallback] {site} → no exact sitemaps in DB; skipping")
        return 0

    log(f"[fallback] {site} → probing {len(sitemaps)} DB sitemap(s) with httpx→aiohttp→curl")

    gained = 0
    per_host = _ProbeThrottle(gap=A5_PER_HOST_MIN_S)

    # 2) Sequential per-site (polite), but we still keep overall agent concurrency elsewhere
    for sm in sitemaps:
        products = await _harvest_products_from_sitemap(site, sm, per_host)
        if not products:
            mark_processed(conn, site, sm, 0, 200, "fallback-probe:0")
            continue
        # 3) Upsert products (source=that sitemap) and mark processed
        added_here = 0
        for pu in products:
            if same_site(pu, site):
                upsert_product_url(conn, site, pu, source=sm, is_required=1)
                gained += 1
                added_here += 1
        mark_processed(conn, site, sm, 1, 200, f"fallback-probe:+{added_here}")
        stats["processed"] += 1
        stats["added"] += added_here
        added_by_site[site] += added_here

    return gained





# ==============================
# Load candidates ONLY from product_sitemaps_json
# ==============================
def load_candidates(conn: sqlite3.Connection):
    """
    Returns two lists:
      direct_products:  List[(site, url)]             where sitemap_kind='product'
      sitemap_sources:  List[(site, sitemap_url)]     other kinds (treat as product sitemaps)
    """
    sql = f"""
        SELECT site,
               LOWER(COALESCE(sitemap_kind,'')) AS kind,
               COALESCE(product_sitemaps_json,'') AS pjson,
               COALESCE(website_for,'') AS website_for
        FROM {POP_TABLE}
        WHERE status = 1
    """
    rows = conn.execute(sql).fetchall()

    direct_products: List[Tuple[str, str]] = []
    sitemap_sources: List[Tuple[str, str]] = []

    for site, kind, pjson, website_for in rows:
        if not site:
            continue
        if SKIP_WEBSITE_FOR and website_for and SKIP_WEBSITE_FOR in website_for.lower():
            continue
        s_norm = canonicalize_url(site.rstrip("/"))
        if ONLY_SITES and s_norm.lower() not in ONLY_SITES:
            continue
        if s_norm.lower() in SKIP_SITES:
            continue

        urls: List[str] = []
        if pjson:
            try:
                data = json.loads(pjson)
                if isinstance(data, list):
                    urls = [canonicalize_url(u) for u in data if isinstance(u, str) and u.strip()]
                elif isinstance(data, dict):
                    for key in ("sitemaps","products","product_sitemaps","urls"):
                        if key in data and isinstance(data[key], list):
                            urls.extend([canonicalize_url(u) for u in data[key] if isinstance(u, str) and u.strip()])
            except Exception:
                urls = []

        if not urls:
            continue

        if (kind or "") == "product":
            for u in urls:
                cu = canonicalize_url(u)
                if cu:
                    direct_products.append((s_norm, cu))
        else:
            for u in urls:
                cu = canonicalize_url(u)
                if cu:
                    sitemap_sources.append((s_norm, cu))

    # de-dupe
    direct_products = list(dict.fromkeys(direct_products))
    sitemap_sources = list(dict.fromkeys(sitemap_sources))
    return direct_products, sitemap_sources

def choose_sitemaps_for_site(site: str, db_json: List[str], discovered: List[str]) -> List[str]:
    """
    Always prefer DB JSON product_sitemaps if present.
    Otherwise fallback to discovered sitemaps.
    """
    if db_json:
        return db_json
    return discovered


# ==============================
# Export (pretty JSON)
# ==============================
def export_snapshot():
    ensure_dirs()
    conn = connect_db()

    # latest meta per site
    rows = conn.execute(f"""
        SELECT site, COALESCE(platform,''), COALESCE(website_for,'')
        FROM {POP_TABLE}
        WHERE status=1
        ORDER BY COALESCE(updated_at, discovered_at, '') DESC
    """).fetchall()
    meta: Dict[str, Dict[str, str]] = {}
    for site, platform, website_for in rows:
        if site and site not in meta:
            if SKIP_WEBSITE_FOR and website_for and SKIP_WEBSITE_FOR in website_for.lower():
                continue
            meta[site] = {"platform": platform or "", "website_for": website_for or ""}

    rows = conn.execute(f"""
        SELECT site,
               CASE WHEN source IS NULL OR source=='' THEN site ELSE source END AS norm_source,
               is_required,
               url
        FROM {PRODUCT_URLS_TABLE}
        ORDER BY site, norm_source, url
    """).fetchall()

    grouped: Dict[str, Dict[str, Dict[str, Any]]] = defaultdict(lambda: defaultdict(lambda: {
        "is_required": 1, "products": []
    }))
    for site, source, is_required, url in rows:
        if site not in meta:
            continue
        grouped[site][source]["is_required"] = int(is_required or 1)
        if url:
            grouped[site][source]["products"].append(url)

    # map kind for sources from POP (best effort)
    kind_map: Dict[Tuple[str, str], str] = {}
    rows = conn.execute(f"""
        SELECT site, COALESCE(product_sitemaps_json,''), LOWER(COALESCE(sitemap_kind,'')) AS kind
        FROM {POP_TABLE}
        WHERE status=1
    """).fetchall()
    for site, pjson, kind in rows:
        if not pjson:
            continue
        try:
            data = json.loads(pjson)
            urls = data if isinstance(data, list) else []
            for u in urls:
                if isinstance(u, str) and u.strip():
                    kind_map[(site.rstrip("/"), canonicalize_url(u.strip()))] = (kind or "").lower()
        except Exception:
            continue

    out_sites = []
    for site in sorted(grouped.keys()):
        sitemaps_list = []
        for source, bundle in grouped[site].items():
            k = kind_map.get((site.rstrip("/"), canonicalize_url(source)), "")
            sitemaps_list.append({
                "sitemap_url": source,
                "kind": k or ("product" if canonicalize_url(source) == canonicalize_url(site) else ""),
                "is_required": bundle["is_required"],
                "product_count": len(bundle["products"]),
                "products": bundle["products"],
            })
        out_sites.append({
            "site": site,
            "platform": meta.get(site, {}).get("platform", ""),
            "website_for": meta.get(site, {}).get("website_for", ""),
            "sitemaps": sitemaps_list
        })

    payload = {
        "generated_at_utc": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
        "total_sites": len(out_sites),
        "sites": out_sites
    }
    with open(SNAPSHOT_FILE, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=False, indent=2)
    log(f"Wrote snapshot → {SNAPSHOT_FILE}")

# ==============================
# Main
# ==============================
async def run_agent5():
    ensure_dirs()
    conn = connect_db()
    ensure_product_urls_table(conn)
    ensure_processed_table(conn)

    # Load candidates only from product_sitemaps_json
    direct_products, sitemap_sources = load_candidates(conn)

    # Startup summary
    per_site_direct = Counter([s for s, _ in direct_products])
    per_site_maps   = Counter([s for s, _ in sitemap_sources])

    log("=== Fresh start ===")
    log(f"DB: {DB_PATH}")
    log(f"Sites with direct products (kind='product'): {len(per_site_direct)} (items: {len(direct_products)})")
    log(f"Sites with sitemaps from JSON:            {len(per_site_maps)} (items: {len(sitemap_sources)})")
    if ONLY_SITES: log(f"ONLY_SITES={sorted(list(ONLY_SITES))}")
    if SKIP_SITES: log(f"SKIP_SITES={sorted(list(SKIP_SITES))}")

    timeout   = aiohttp.ClientTimeout(total=None, connect=A5_CONNECT_TIMEOUT_S, sock_read=A5_READ_TIMEOUT_S)
    throttler = DomainThrottler(A5_PER_HOST_MIN_S)
    sem       = asyncio.Semaphore(A5_CONCURRENCY)

    # progress counters
    stats = {
        "processed": 0,
        "added": 0,
        "skipped": 0
    }
    added_by_site: Dict[str, int] = defaultdict(int)

    async def heartbeat():
        while True:
            await asyncio.sleep(HEARTBEAT_SEC)
            log(f"Heartbeat: processed={stats['processed']} added={stats['added']} skipped={stats['skipped']}")

    async def process_direct(site: str, product_url: str):
        # treat as trusted product (source=site)
        upsert_product_url(conn, site, product_url, source=site, is_required=1)
        mark_processed(conn, site, product_url, 1, 200, "direct-from-json")
        stats["processed"] += 1
        stats["added"] += 1
        added_by_site[site] += 1
        log(f"[direct] {site} → +1")

    async def process_map(site: str, sitemap_url: str, session: aiohttp.ClientSession):
        await process_sitemap(conn, site, sitemap_url, session, throttler, stats, added_by_site)

    # Phase 1: push all direct-products first (fast visible output)
    if direct_products:
        log("Phase 1: writing direct products...")
        for site, url in direct_products:
            await process_direct(site, url)

    # Phase 2: crawl sitemaps (concurrent)
    log("Phase 2: crawling product sitemaps...")
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async def _job(s, u):
            async with sem:
                await process_map(s, u, session)

        # Heartbeat task
        hb = asyncio.create_task(heartbeat())

        chosen_sitemaps = []
        for s, u in sitemap_sources:
            chosen = choose_sitemaps_for_site(s, [u], [])  # DB JSON always wins
            for cu in chosen:
                chosen_sitemaps.append((s, cu))

        tasks = [asyncio.create_task(_job(s, u))
                for (s, u) in chosen_sitemaps]

        if tasks:
            await asyncio.gather(*tasks)

        hb.cancel()
        try:
            await hb
        except asyncio.CancelledError:
            pass

                # --------- Site-level fallback (always if needed, respectful) ---------
        all_sites = sorted(set([s for s,_ in direct_products] + [s for s,_ in sitemap_sources]))
        low_sites = [s for s in all_sites if count_products_for_site(conn, s) < MIN_GOOD]

        if low_sites:
            log(f"[fallback-phase] sites_needing_help={len(low_sites)}")
            for s in low_sites:
                gained = await fallback_discovery_for_site(conn, s, session, throttler, stats, added_by_site)
                total = count_products_for_site(conn, s)
                log(f"[fallback-summary] {s} gained={gained} total_now={total}")



    # Per-site summary
    for site, n in sorted(added_by_site.items(), key=lambda kv: kv[1], reverse=True):
        log(f"Summary: {site} → +{n} products")

    export_snapshot()

# ==============================
# Runner
# ==============================
def _run_main(coro):
    try:
        loop = asyncio.get_running_loop()
        if loop.is_running():
            return asyncio.create_task(coro)
    except RuntimeError:
        pass
    return asyncio.run(coro)

if __name__ == "__main__":
    t = _run_main(run_agent5())
    if t is not None:
        try:
            await t
        except NameError:
            pass


[A5 15:12:10] === Fresh start ===
[A5 15:12:10] DB: D:\museai\data\db\crawler_meta.db
[A5 15:12:10] Sites with direct products (kind='product'): 5 (items: 758)
[A5 15:12:10] Sites with sitemaps from JSON:            124 (items: 241)
[A5 15:12:10] Phase 1: writing direct products...
[A5 15:12:10] [direct] https://www.clovia.com → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in → +1
[A5 15:12:10] [direct] https://theancestrystore.in

In [1]:
# ==============================
# Agent 6 — Polite product crawler (JSON-first, isolated, resumable)
# Reads:   A5 DB (READONLY): populated_product_sitemaps + product_urls
# Writes:  D:\museai\data\a6\...
# Selection: deterministic rank (Unisex > Men > Women > Accessories > Footwear > Jewellery)
# Crawl:   Shopify .js → .json → HTML JSON-LD → OG (strict Color/Size, variants/media full)
# Resumes: manifest.json (etag, last_modified, content_hash) + state.json
# Tracks:  registry/sites_status.json → completed sites auto-skipped + fill up to TOP_N
# Merges:  merged/structured_latest.json (site → products), cleaned of U+2028/U+2029
# ==============================

import os
import re
import json
import time
import random
import glob
import asyncio
import sqlite3
import hashlib
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, Any
from urllib.parse import urlsplit

import aiohttp

# ------------------------------
# Config (env)
# ------------------------------
DB_PATH                = os.getenv("CRAWLER_DB", os.getenv("DB_PATH", r"D:\museai\data\db\crawler_meta.db"))
POP_TABLE              = os.getenv("A5_POP_TABLE", "populated_product_sitemaps")
PRODUCT_URLS_TABLE     = os.getenv("A5_PRODUCT_URLS_TABLE", "product_urls")

A6_OUTPUT_ROOT         = os.getenv("A6_OUTPUT_DIR", r"D:\museai\data\a6")
A6_TOP_N               = int(os.getenv("A6_TOP_N", "15"))
A6_LIMIT_PER_SITE      = int(os.getenv("A6_LIMIT_PER_SITE", "0")) or None
A6_DRY_RUN             = os.getenv("A6_DRY_RUN", "false").strip().lower() in ("1","true","yes","y")
A6_BATCH_SIZE          = int(os.getenv("A6_BATCH_SIZE", "200"))
A6_ERROR_BUDGET        = float(os.getenv("A6_ERROR_BUDGET", "0.25"))
A6_USE_CONDITIONAL_GET = os.getenv("A6_USE_CONDITIONAL_GET", "true").strip().lower() in ("1","true","yes","y")

GLOBAL_CONCURRENCY     = int(os.getenv("A6_GLOBAL_CONCURRENCY", "40"))
PER_HOST_WORKERS       = int(os.getenv("A6_PER_HOST_WORKERS", "2"))
REQUEST_TIMEOUT        = int(os.getenv("A6_REQUEST_TIMEOUT", "18"))
RETRIES                = int(os.getenv("A6_RETRIES", "3"))
DEFAULT_RPS            = float(os.getenv("A6_DEFAULT_RPS", "1.0"))
MIN_RPS                = float(os.getenv("A6_MIN_RPS", "0.08"))
MAX_RPS                = float(os.getenv("A6_MAX_RPS", "2.0"))
BASE_JITTER            = float(os.getenv("A6_BASE_JITTER", "0.30"))
PROXY_URL              = os.environ.get("A6_PROXY_URL", "").strip() or None

# Selection filter
A6_REQUIRE_FLAG_ONLY   = os.getenv("A6_REQUIRE_FLAG_ONLY", "true").strip().lower() in ("1","true","yes","y")

# NEW: cap the number of 403s per site before skipping the rest
A6_FORBID_LIMIT        = int(os.getenv("A6_FORBID_LIMIT", "2"))  # stop a site after this many 403s

# Manual-done sites list
A6_MANUALLY_DONE_SITES = {
    re.sub(r"^\s*https?://", "", s.strip().lower())
    for s in os.getenv(
        "A6_MANUALLY_DONE_SITES",
        "www.sassafras.in, bonkerscorner.com, vastrado.com, trapin.co, offduty.in"
    ).split(",")
    if s.strip()
}

FAST_UA = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/127.0.0.0 Safari/537.36"
)

HEADERS_HTML = {
    "User-Agent": FAST_UA,
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Cache-Control": "no-cache",
    "Pragma": "no-cache",
}
HEADERS_JSON = {
    "User-Agent": FAST_UA,
    "Accept": "application/json,text/html;q=0.8,*/*;q=0.5",
    "Accept-Language": "en-US,en;q=0.9",
    "Cache-Control": "no-cache",
    "Pragma": "no-cache",
}

# ------------------------------
# Paths & small utils
# ------------------------------
A6_REGISTRY_DIR  = os.path.join(A6_OUTPUT_ROOT, "registry")
A6_SITES_STATUS  = os.path.join(A6_REGISTRY_DIR, "sites_status.json")

def now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def ts_for_filename() -> str:
    return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")

def log(msg: str):
    print(f"[A6 {datetime.now(timezone.utc).strftime('%H:%M:%S')}] {msg}", flush=True)

def ensure_dirs():
    os.makedirs(A6_OUTPUT_ROOT, exist_ok=True)
    os.makedirs(os.path.join(A6_OUTPUT_ROOT, "selection"), exist_ok=True)
    os.makedirs(os.path.join(A6_OUTPUT_ROOT, "sites"), exist_ok=True)
    os.makedirs(os.path.join(A6_OUTPUT_ROOT, "snapshots"), exist_ok=True)
    os.makedirs(os.path.join(A6_OUTPUT_ROOT, "merged"), exist_ok=True)
    os.makedirs(A6_REGISTRY_DIR, exist_ok=True)

def site_dir(host: str) -> str:
    safe = re.sub(r"[^A-Za-z0-9._-]", "_", host)
    p = os.path.join(A6_OUTPUT_ROOT, "sites", safe)
    os.makedirs(p, exist_ok=True)
    return p

def host_only(u: str) -> str:
    try:
        return urlsplit(u).netloc.lower()
    except Exception:
        return (u or "").lower()

def host_key(url: str) -> str:
    p = urlsplit(url)
    return f"{p.scheme}://{p.netloc}"

def sha1_of(obj: Any) -> str:
    try:
        raw = json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
    except Exception:
        raw = str(obj).encode("utf-8", errors="ignore")
    return hashlib.sha1(raw).hexdigest()

def product_key_from_url(url: str) -> str:
    try:
        path = urlsplit(url).path.strip("/")
        parts = [p for p in path.split("/") if p]
        for i, p in enumerate(parts):
            if p.lower() == "products" and i + 1 < len(parts):
                handle = parts[i + 1].split("?")[0].strip().replace(".html", "")
                if handle:
                    return f"shopify:{handle}"
    except Exception:
        pass
    return f"url:{url}"

def canonical_product_key(raw_product: dict, url: str) -> str:
    # Prefer Shopify handle; else fall back to URL-based key
    handle = (raw_product or {}).get("handle")
    if isinstance(handle, str) and handle.strip():
        return f"shopify:{handle.strip()}"
    return product_key_from_url(url)

# ------------------------------
# Registry helpers (completed sites)
# ------------------------------
def read_json(path: str, default):
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return default

def write_json(path: str, obj: Any):
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8", newline="\n") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)
    os.replace(tmp, path)

def append_jsonl(path: str, obj: Any):
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def load_sites_status() -> dict:
    return read_json(A6_SITES_STATUS, {})

def save_sites_status(d: dict):
    write_json(A6_SITES_STATUS, d)

def mark_site_completed(host: str, total: int, done: int, failed: int):
    st = load_sites_status()
    st[host] = {
        "completed": True,
        "last_completed_at": now_iso(),
        "total_urls": total,
        "done": done,
        "failed": failed,
    }
    save_sites_status(st)

def is_site_completed(host: str) -> bool:
    st = load_sites_status()
    return bool(st.get(host, {}).get("completed"))

# ------------------------------
# DB (read-only)
# ------------------------------
def connect_db_ro() -> sqlite3.Connection:
    uri = f"file:{DB_PATH}?mode=ro"
    return sqlite3.connect(uri, uri=True)

def load_sites_and_urls() -> Tuple[Dict[str, Dict[str, str]], Dict[str, List[str]]]:
    conn = connect_db_ro()
    cur = conn.cursor()

    site_meta: Dict[str, Dict[str, str]] = {}
    for site, platform, website_for in cur.execute(
        f"SELECT site, COALESCE(platform,''), COALESCE(website_for,'') FROM {POP_TABLE} WHERE status=1;"
    ):
        s = (site or "").rstrip("/")
        if not s:
            continue
        site_meta[s] = {"platform": platform or "", "website_for": website_for or ""}

    site_urls: Dict[str, List[str]] = defaultdict(list)
    if A6_REQUIRE_FLAG_ONLY:
        q = f"SELECT site, url FROM {PRODUCT_URLS_TABLE} WHERE is_required=1 ORDER BY site, url;"
    else:
        q = f"SELECT site, url FROM {PRODUCT_URLS_TABLE} ORDER BY site, url;"

    for site, url in cur.execute(q):
        if site and url:
            site_urls[site.rstrip("/")].append(url)

    conn.close()

    # de-dupe
    for s in list(site_urls.keys()):
        site_urls[s] = list(dict.fromkeys(site_urls[s]))

    if A6_REQUIRE_FLAG_ONLY:
        for s in list(site_urls.keys()):
            if not site_urls[s]:
                site_urls.pop(s, None)

    return site_meta, site_urls

# ------------------------------
# Ranking
# ------------------------------
CATEGORY_ORDER = ["unisex", "men", "women", "accessories", "footwear", "jewellery"]

def category_rank(website_for: str) -> int:
    w = (website_for or "").strip().lower()
    for i, cat in enumerate(CATEGORY_ORDER):
        if cat in w:
            return i
    if "apparel" in w or "clothing" in w:
        return 0
    return len(CATEGORY_ORDER)

def score_site(site: str, meta: Dict[str, str], urls: List[str]) -> Tuple:
    wfor = meta.get("website_for", "")
    plat = (meta.get("platform", "") or "").lower()
    cat = category_rank(wfor)
    n = len(urls)
    shopify_bonus = 1 if "shopify" in plat else 0
    host = urlsplit(site).netloc.lower()

    manual = 1 if (host in A6_MANUALLY_DONE_SITES or any(host == d or host.endswith("." + d) for d in A6_MANUALLY_DONE_SITES)) else 0
    return (manual, cat, -n, -shopify_bonus, host)

def build_selection(site_meta, site_urls, top_n: int) -> Dict[str, Any]:
    scored = []
    for s, urls in site_urls.items():
        if not urls:
            continue
        sc = score_site(s, site_meta.get(s, {}), urls)
        scored.append((sc, s, site_meta.get(s, {}), len(urls)))
    scored.sort(key=lambda x: x[0])

    ranked = []
    for idx, (sc, s, meta, count) in enumerate(scored, 1):
        ranked.append({
            "rank": idx,
            "site": s,
            "host": urlsplit(s).netloc,
            "platform": meta.get("platform", ""),
            "website_for": meta.get("website_for", ""),
            "product_count_in_db": count,
            "score_tuple": list(sc),
            "manual_done": bool(sc[0]),
        })

    # Choose up to top_n, **skipping completed**, keep pulling further down
    chosen_effective = []
    for row in ranked:
        host = row["host"].lower()
        if is_site_completed(host):
            continue
        chosen_effective.append(row)
        if len(chosen_effective) >= top_n:
            break

    return {
        "generated_at_utc": now_iso(),
        "db_path": DB_PATH,
        "requested_top_n": top_n,
        "total_sites_considered": len(ranked),
        "ranking_rule": {
            "order": CATEGORY_ORDER,
            "tie_breakers": ["manual_done(last)", "category", "product_count(desc)", "platform(shopify bonus)", "host(A–Z)"],
        },
        "ranked_sites": ranked,
        "chosen": chosen_effective,
    }

# ------------------------------
# Polite limiter
# ------------------------------
class AdaptiveHostLimiter:
    def __init__(self, default_rps=DEFAULT_RPS, min_rps=MIN_RPS, max_rps=MAX_RPS, base_jitter=BASE_JITTER):
        self.default_rps = float(default_rps)
        self.min_rps = float(min_rps)
        self.max_rps = float(max_rps)
        self.base_jitter = float(base_jitter)
        self._state: Dict[str, Dict[str, Any]] = {}

    def _ensure(self, host: str):
        if host not in self._state:
            self._state[host] = {"rps": self.default_rps, "next_at": 0.0, "cooldown_until": 0.0, "consec_429": 0, "last_status": None}
        return self._state[host]

    async def acquire(self, host: str):
        st = self._ensure(host)
        now = time.monotonic()
        if st["cooldown_until"] > now:
            await asyncio.sleep(st["cooldown_until"] - now)
            now = time.monotonic()
        min_delay = 1.0 / max(self.min_rps, st["rps"])
        jitter = random.uniform(0.0, self.base_jitter * min_delay)
        wait_until = max(st["next_at"], now)
        sleep_for = wait_until - now
        if sleep_for > 0:
            await asyncio.sleep(sleep_for)
            now = time.monotonic()
        st["next_at"] = now + min_delay + jitter

    def feedback(self, host: str, status: int, headers: Optional[dict]):
        st = self._ensure(host)
        st["last_status"] = status
        if 200 <= status < 300:
            st["consec_429"] = 0
            st["rps"] = min(self.max_rps, st["rps"] * 1.10)
            return
        if status in (0, 404):
            st["rps"] = max(self.min_rps, st["rps"] * 0.90)
            return
        if status in (429, 403, 503, 520, 522):
            cooldown = 0.0
            if headers:
                ra = headers.get("Retry-After") or headers.get("retry-after")
                if ra:
                    try:
                        cooldown = float(ra)
                    except:
                        pass
            if status == 429:
                st["consec_429"] += 1
                st["rps"] = max(self.min_rps, st["rps"] * 0.5)
                extra = 10.0 * st["consec_429"]
                cooldown = max(cooldown, min(90.0, 12.0 + extra))
            else:
                st["rps"] = max(self.min_rps, st["rps"] * 0.5)
                cooldown = max(cooldown, 30.0)
            st["cooldown_until"] = time.monotonic() + cooldown
            return
        st["rps"] = max(self.min_rps, st["rps"] * 0.95)

limiter = AdaptiveHostLimiter()

# ------------------------------
# HTTP + Shopify helpers
# ------------------------------
async def fetch_text(session, url: str, headers: dict, timeout=REQUEST_TIMEOUT, max_retries=RETRIES,
                     cond_etag: Optional[str]=None, cond_lastmod: Optional[str]=None) -> Tuple[int, dict, str]:
    host = host_key(url)
    last_status, last_headers, last_text = 0, {}, ""
    use_headers = dict(headers)
    if A6_USE_CONDITIONAL_GET:
        if cond_etag:
            use_headers["If-None-Match"] = cond_etag
        if cond_lastmod:
            use_headers["If-Modified-Since"] = cond_lastmod

    for _ in range(max_retries + 1):
        await limiter.acquire(host)
        try:
            async with session.get(
                url,
                headers=use_headers,
                timeout=aiohttp.ClientTimeout(total=timeout),
                proxy=PROXY_URL,
                allow_redirects=True,
            ) as r:
                last_status = r.status
                last_headers = dict(r.headers)
                txt = await r.text(errors="ignore")
                last_text = txt
                limiter.feedback(host, r.status, last_headers)
                if r.status in (200, 304):
                    return r.status, last_headers, txt
                if r.status in (429, 403, 503, 520, 522):
                    continue
                return r.status, last_headers, txt
        except Exception:
            limiter.feedback(host, 0, None)
            continue
    return last_status, last_headers, last_text

async def fetch_json(session, url: str, referer: Optional[str]=None,
                     cond_etag: Optional[str]=None, cond_lastmod: Optional[str]=None) -> Tuple[int, dict, Any]:
    headers = dict(HEADERS_JSON)
    if referer:
        headers["Referer"] = referer
    status, hdrs, txt = await fetch_text(session, url, headers, cond_etag=cond_etag, cond_lastmod=cond_lastmod)
    data = None
    if status == 200 and txt:
        try:
            data = json.loads(txt)
        except Exception:
            try:
                data = json.loads(txt.strip())
            except Exception:
                data = None
    return status, hdrs, data

def text_only(s: Optional[str]) -> Optional[str]:
    if s is None:
        return None
    try:
        return re.sub(r"<[^>]+>", "", s).strip()
    except Exception:
        return s

def safe_json_loads(s: str):
    try:
        return json.loads(s)
    except Exception:
        try:
            fixed = s.replace("\n", " ").strip()
            if fixed.startswith("{") and fixed.endswith("}"):
                fixed2 = fixed.replace("}{", "}|{")
                parts = fixed2.split("|")
                return [json.loads(p) for p in parts]
        except Exception:
            return None
    return None

def find_product_node(json_obj):
    def is_product_type(t):
        if isinstance(t, list): return any(str(x).lower().endswith("product") for x in t)
        if isinstance(t, str):  return str(t).lower().endswith("product")
        return False
    def walk(node):
        if isinstance(node, dict):
            if "@type" in node and is_product_type(node["@type"]):
                return node
            if "@graph" in node and isinstance(node["@graph"], list):
                for sub in node["@graph"]:
                    r = walk(sub)
                    if r: return r
            for v in node.values():
                r = walk(v)
                if r: return r
        elif isinstance(node, list):
            for it in node:
                r = walk(it)
                if r: return r
        return None
    return walk(json_obj)

def flatten_jsonld_product(node: dict) -> dict:
    out = {}
    try:
        def pick_first(x):
            return (x[0] if isinstance(x, list) and x else x)
        out["title"] = pick_first(node.get("name"))
        out["body_html"] = pick_first(node.get("description"))
        out["vendor"] = None
        out["product_type"] = pick_first(node.get("category"))
        img = node.get("image")
        if isinstance(img, list): out["image"] = {"src": pick_first(img)}
        elif isinstance(img, dict): out["image"] = {"src": pick_first(img.get("url") or img.get("contentUrl"))}
        else: out["image"] = {"src": pick_first(img)} if img else None
        offers = node.get("offers")
        if isinstance(offers, list): offers = offers[0]
        variants = []
        if isinstance(offers, dict):
            price = offers.get("price")
            if price is not None:
                variants = [{"price": price}]
        out["variants"] = variants
        out["options"] = []
        out["media"] = []
    except Exception:
        pass
    return out

def strict_colors_from_options(product: dict) -> List[str]:
    colors = []
    for opt in (product.get("options") or []):
        name = (opt.get("name") or "").strip().lower()
        if name in ("color", "colour") or "color" in name or "colour" in name:
            for v in (opt.get("values") or []):
                s = str(v).strip()
                if s: colors.append(s)
    seen=set(); out=[]
    for c in colors:
        k=c.lower()
        if k not in seen:
            out.append(c); seen.add(k)
    return out

def strict_sizes_from_options(product: dict) -> List[str]:
    sizes = []
    for opt in (product.get("options") or []):
        name = (opt.get("name") or "").strip().lower()
        if "size" in name:
            for v in (opt.get("values") or []):
                s = str(v).strip()
                if s: sizes.append(s)
    seen=set(); out=[]
    for s in sizes:
        k=s.lower()
        if k not in seen:
            out.append(s); seen.add(k)
    return out

def image_urls(product: dict) -> List[str]:
    out=[]
    for it in (product.get("images") or []):
        if isinstance(it, str):
            u = it.strip()
            if u: out.append(u)
        elif isinstance(it, dict):
            u = (it.get("src") or it.get("url") or "").strip()
            if u: out.append(u)
    for m in (product.get("media") or []):
        if isinstance(m, dict):
            if m.get("media_type") == "image" and m.get("src"):
                out.append(m["src"])
            elif isinstance(m.get("preview_image"), dict) and m["preview_image"].get("src"):
                out.append(m["preview_image"]["src"])
    seen=set(); ded=[]
    for u in out:
        if u and u not in seen:
            ded.append(u); seen.add(u)
    return ded

def variant_summary(variants: List[dict]) -> dict:
    if not isinstance(variants, list): variants=[]
    prices, cap_prices, skus, barcodes = [], [], [], []
    any_available = False
    for v in variants:
        try: prices.append(float(str(v.get("price","")).replace(",","")))
        except: pass
        try: cap_prices.append(float(str(v.get("compare_at_price","")).replace(",","")))
        except: pass
        if v.get("available") is True: any_available = True
        if v.get("sku"): skus.append(str(v.get("sku")))
        if v.get("barcode"): barcodes.append(str(v.get("barcode")))
    def _min_nonempty(a):
        a2=[x for x in a if x or x==0]
        return (min(a2) if a2 else None)
    def _max_nonempty(a):
        a2=[x for x in a if x or x==0]
        return (max(a2) if a2 else None)
    skus = list(dict.fromkeys(skus))
    barcodes = list(dict.fromkeys(barcodes))
    return {
        "variants_count": len(variants),
        "any_variant_available": any_available,
        "price_min": _min_nonempty(prices),
        "price_max": _max_nonempty(prices),
        "compare_at_price_min": _min_nonempty(cap_prices),
        "compare_at_price_max": _max_nonempty(cap_prices),
        "skus": skus or None,
        "barcodes": barcodes or None,
    }

def flatten_product_record(product: dict, url: str, site_name: str, via: str, http_meta: dict, lastmod_hint=None) -> dict:
    rec = {
        "site_name": site_name,
        "url": url,
        "fetched_at": now_iso(),
        "source": via,
        "status": 200,
        "lastmod": lastmod_hint,
        "source_sitemap": None,
    }
    for k in ["id","title","handle","body_html","vendor","product_type","tags",
              "template_suffix","published_scope","created_at","updated_at","published_at"]:
        rec[k] = product.get(k)
    rec["body_text"] = text_only(product.get("body_html") or "") if product.get("body_html") else None
    rec["option_names"] = [opt.get("name") for opt in (product.get("options") or [])]
    rec["options_json"] = json.dumps(product.get("options") or [], ensure_ascii=False)
    rec["colors"] = strict_colors_from_options(product) or None
    rec["sizes"] = strict_sizes_from_options(product) or None
    imgs = image_urls(product)
    rec["images"] = imgs or None
    rec["images_count"] = len(imgs)
    feat = product.get("image") or {}
    if isinstance(feat, dict):
        rec["featured_image"] = feat.get("src") or feat.get("url")
        rec["featured_alt"] = feat.get("alt")
    else:
        rec["featured_image"] = None
        rec["featured_alt"] = None
    rec["media_json"] = json.dumps(product.get("media") or [], ensure_ascii=False)
    rec["media_count"] = len(product.get("media") or [])
    variants = product.get("variants") or []
    rec["variants_json"] = json.dumps(variants, ensure_ascii=False)
    rec.update(variant_summary(variants))
    rec["http_etag"] = http_meta.get("etag")
    rec["http_last_modified"] = http_meta.get("last_modified")
    return rec

# ------------------------------
# Fetch product (Shopify-first)
# ------------------------------
async def fetch_shopify_product(session, product_url: str,
                                etag: Optional[str]=None, last_mod: Optional[str]=None) -> Tuple[str, int, dict | None, dict]:
    """Return (via, status, product_dict_or_none, http_meta). via in {'shopify.js','shopify.json','json-ld','opengraph','-','not-modified'}"""
    handle = None
    try:
        path = urlsplit(product_url).path.strip("/")
        parts = [p for p in path.split("/") if p]
        for i, p in enumerate(parts):
            if p.lower() == "products" and i + 1 < len(parts):
                handle = parts[i+1].split("?")[0].strip("/").replace(".html", "")
    except Exception:
        handle = None

    # 1) /products/{handle}.js/.json
    if handle:
        base = host_key(product_url)
        for ext in (".js", ".json"):
            api = f"{base}/products/{handle}{ext}"
            status, hdrs, data = await fetch_json(session, api, referer=product_url,
                                                  cond_etag=etag, cond_lastmod=last_mod)
            if status == 304:
                return "not-modified", 304, None, {"etag": hdrs.get("ETag"), "last_modified": hdrs.get("Last-Modified")}
            if status == 200 and isinstance(data, (dict, list)):
                if ext == ".js":
                    product = data if (isinstance(data, dict) and "title" in data and "variants" in data) else (data.get("product") if isinstance(data, dict) else None)
                    if isinstance(product, dict) and product.get("title"):
                        return "shopify.js", 200, product, {"etag": hdrs.get("ETag"), "last_modified": hdrs.get("Last-Modified")}
                else:
                    product = data.get("product") if isinstance(data, dict) else None
                    if isinstance(product, dict) and product.get("title"):
                        return "shopify.json", 200, product, {"etag": hdrs.get("ETag"), "last_modified": hdrs.get("Last-Modified")}

    # 2) HTML → JSON-LD → OG
    status, hdrs, html_text = await fetch_text(session, product_url, HEADERS_HTML,
                                               cond_etag=etag, cond_lastmod=last_mod)
    if status == 304:
        return "not-modified", 304, None, {"etag": hdrs.get("ETag"), "last_modified": hdrs.get("Last-Modified")}
    if status == 200 and html_text:
        try:
            product_node = None
            for blk in re.findall(r'<script[^>]+type=["\']application/ld\+json["\'][^>]*>(.*?)</script>',
                                  html_text, flags=re.I|re.S):
                parsed_any = safe_json_loads(blk.strip())
                if parsed_any is None:
                    continue
                cand = find_product_node(parsed_any)
                if cand is not None:
                    product_node = cand
                    break
            if product_node is not None:
                mapped = flatten_jsonld_product(product_node)
                return "json-ld", 200, mapped, {"etag": hdrs.get("ETag"), "last_modified": hdrs.get("Last-Modified")}
            # OG fallback
            og = {}
            for m in re.findall(r'<meta\s+(?:name|property)=["\'](og:[^"\']+)["\']\s+content=["\']([^"\']+)["\']', html_text, flags=re.I):
                og[m[0].lower()] = m[1]
            mapped = {
                "title": og.get("og:title"),
                "body_html": og.get("og:description"),
                "image": {"src": og.get("og:image")} if og.get("og:image") else None,
                "variants": [],
                "options": [],
                "media": [],
            }
            return "opengraph", 200, mapped, {"etag": hdrs.get("ETag"), "last_modified": hdrs.get("Last-Modified")}
        except Exception:
            pass
    return "-", status, None, {"etag": hdrs.get("ETag") if isinstance(hdrs, dict) else None,
                               "last_modified": hdrs.get("Last-Modified") if isinstance(hdrs, dict) else None}

# ------------------------------
# Crawl one site
# ------------------------------
# ------------------------------
# Crawl one site  (worker-queue; hard-stop after A6_FORBID_LIMIT×403)
# ------------------------------
# ------------------------------
# Crawl one site  (worker-queue; hard-stop after A6_FORBID_LIMIT×403; serialized writes)
# ------------------------------
async def crawl_site(site: str, urls: List[str], meta: Dict[str,str]):
    host = urlsplit(site).netloc
    sdir = site_dir(host)
    jsonl_path = os.path.join(sdir, f"products-{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl")
    manifest_path = os.path.join(sdir, "manifest.json")
    state_path = os.path.join(sdir, "state.json")
    grouped_path = os.path.join(sdir, "grouped-latest.json")

    manifest = read_json(manifest_path, default={})
    state = read_json(state_path, default={"pending": [], "done": [], "failed": [], "last_run": None})
    state["last_run"] = now_iso()

    already = set(state.get("pending", [])) | set(state.get("failed", [])) | set(state.get("done", []))
    fresh = [u for u in urls if u not in already]
    state["pending"].extend(fresh)

    if A6_LIMIT_PER_SITE:
        state["pending"] = state["pending"][:A6_LIMIT_PER_SITE]

    write_json(state_path, state)

    total_target = len(state["pending"])
    if total_target == 0:
        log(f"[site] {host} nothing to do.")
        return

    global_sem = asyncio.Semaphore(GLOBAL_CONCURRENCY)
    per_host_sem = asyncio.Semaphore(PER_HOST_WORKERS)

    # NEW: per-site hard-stop and counters
    stop_event = asyncio.Event()
    forbidden_count = 0
    error_count = 0
    processed = 0

    # NEW: serialize writes to JSON files to avoid Windows PermissionError
    write_lock = asyncio.Lock()

    # Work queue
    q: asyncio.Queue[str] = asyncio.Queue()
    for u in state["pending"]:
        q.put_nowait(u)

    async with aiohttp.ClientSession() as session:

        async def persist_state_and_manifest():
            # single place that writes both files under the lock
            async with write_lock:
                write_json(manifest_path, manifest)
                write_json(state_path, state)

        async def handle_one(u: str, idx: int):
            nonlocal error_count, processed, forbidden_count

            async with global_sem, per_host_sem:
                pkey = product_key_from_url(u)
                et = manifest.get(pkey, {}).get("etag") if A6_USE_CONDITIONAL_GET else None
                lm = manifest.get(pkey, {}).get("last_modified") if A6_USE_CONDITIONAL_GET else None

                # If you add a WP fallback later, call fetch_product_any here
                via, status, product, http_meta = await fetch_shopify_product(session, u, etag=et, last_mod=lm)

            # 403 handling & cap
            if status == 403:
                forbidden_count += 1
                state["failed"].append(u)
                error_count += 1
                log(f"[{host}] {idx}/{total_target} 403 (#{forbidden_count}) → {u}")
                if forbidden_count >= A6_FORBID_LIMIT and not stop_event.is_set():
                    log(f"[{host}] hit {A6_FORBID_LIMIT}×403 → skipping remainder of site")
                    stop_event.set()
                # persist quickly so state shows the failure
                await persist_state_and_manifest()
                return

            if status == 304:
                state["done"].append(u)
                processed += 1
                if idx % 25 == 1:
                    log(f"[{host}] {idx}/{total_target} 304 not modified → {u}")
                # light persistence every so often
                if processed % A6_BATCH_SIZE == 0:
                    await persist_state_and_manifest()
                return

            if status != 200 or product is None:
                error_count += 1
                state["failed"].append(u)
                if idx % 10 == 1 or status in (429,403,503):
                    log(f"[{host}] {idx}/{total_target} ERR status={status} via={via} → {u}")
                # persist error so we don’t re-try on crash
                await persist_state_and_manifest()
                return

            # Success path
            flat = flatten_product_record(product, u, site, via, http_meta, lastmod_hint=None)
            raw_obj = {"site_name": site, "url": u, "fetched_at": flat.get("fetched_at"), "raw_product": product}

            content_hash = sha1_of(product)
            pkey = product_key_from_url(u)
            prior_hash = manifest.get(pkey, {}).get("content_hash")
            if prior_hash != content_hash:
                # append_jsonl is atomic-ish; no lock needed here
                append_jsonl(jsonl_path, raw_obj)

            manifest[pkey] = {
                "url": u,
                "etag": http_meta.get("etag"),
                "last_modified": http_meta.get("last_modified"),
                "content_hash": content_hash,
                "last_seen": flat["fetched_at"],
                "status": 200,
            }

            state["done"].append(u)
            processed += 1
            if (idx % 25 == 1) or (via not in ("shopify.js","shopify.json")):
                log(f"[{host}] {idx}/{total_target} via={via} 200 → {u}")

            # Periodic persistence (serialized)
            if processed % A6_BATCH_SIZE == 0:
                await persist_state_and_manifest()

        async def worker(worker_id: int):
            # Pull next item while stop not set
            while not stop_event.is_set():
                try:
                    u = q.get_nowait()
                except asyncio.QueueEmpty:
                    return
                idx = total_target - q.qsize()  # progress-ish
                try:
                    await handle_one(u, idx)
                finally:
                    q.task_done()

                # Error budget guard:
                # - Ignore while we’re tripping the 403 cap (it has its own stop rule)
                # - Only apply once we have a minimum sample size to avoid 1/1 = 100%
                min_samples = max(20, PER_HOST_WORKERS * 4)
                total_seen = processed + error_count
                if (forbidden_count == 0) and (total_seen >= min_samples):
                    frac = (error_count) / max(1, total_seen)
                    if frac > A6_ERROR_BUDGET and not stop_event.is_set():
                        log(f"[{host}] error budget exceeded ({error_count}/{total_seen}); pausing")
                        stop_event.set()
                        return

        # Launch limited workers
        workers = [asyncio.create_task(worker(i+1)) for i in range(PER_HOST_WORKERS)]
        try:
            await asyncio.gather(*workers)
        finally:
            # Ensure last persisted state even on early stop
            await persist_state_and_manifest()

    # Recompute remaining
    remaining = [u for u in state["pending"] if u not in state["done"] and u not in state["failed"]]
    state["pending"] = remaining
    # Final write under lock (single-threaded here, but keep consistent)
    async def _final_persist():
        async with write_lock:
            write_json(manifest_path, manifest)
            write_json(state_path, state)
    # run the small coroutine to persist
    try:
        loop = asyncio.get_running_loop()
        if loop.is_running():
            await _final_persist()
    except RuntimeError:
        # fallback if not in a loop; unlikely here
        pass

    # Optional: mark completed if 403 cap tripped
    if stop_event.is_set() and forbidden_count >= A6_FORBID_LIMIT:
        mark_site_completed(host, total=len(urls), done=len(state.get("done", [])), failed=len(state.get("failed", [])))
        log(f"[site] {host} marked completed due to repeated 403s (done={len(state['done'])}, failed={len(state['failed'])})")

    # Grouped output
    if not A6_DRY_RUN:
        grouped = []
        try:
            with open(jsonl_path, "r", encoding="utf-8") as f:
                for line in f:
                    try:
                        obj = json.loads(line)
                        grouped.append({"url": obj.get("url"), "product": obj.get("raw_product")})
                    except Exception:
                        continue
            write_json(grouped_path, grouped)
        except FileNotFoundError:
            pass

    log(f"[site] {host} done={len(state['done'])} failed={len(state['failed'])} remaining={len(state['pending'])}")



# ------------------------------
# Structured merge (site → products JSON) with cleaning & upsert
# ------------------------------
LS = "\u2028"
PS = "\u2029"

def _clean_text(x):
    if isinstance(x, str):
        if LS in x or PS in x:
            x = x.replace(LS, " ").replace(PS, " ")
    return x

def _clean_obj(o):
    if isinstance(o, dict):
        return {k: _clean_obj(_clean_text(v)) for k, v in o.items()}
    if isinstance(o, list):
        return [_clean_obj(_clean_text(v)) for v in o]
    return _clean_text(o)

def run_structured_merge(a6_root=A6_OUTPUT_ROOT):
    merged_dir = os.path.join(a6_root, "merged")
    sites_dir  = os.path.join(a6_root, "sites")
    out_path   = os.path.join(merged_dir, "structured_latest.json")
    os.makedirs(merged_dir, exist_ok=True)

    # Read ALL historical per-site jsonl files
    all_rows = []
    for sdir in sorted(glob.glob(os.path.join(sites_dir, "*"))):
        for fp in sorted(glob.glob(os.path.join(sdir, "products-*.jsonl"))):
            try:
                with open(fp, "r", encoding="utf-8") as f:
                    for line in f:
                        try:
                            all_rows.append(json.loads(line))
                        except Exception:
                            continue
            except FileNotFoundError:
                continue

    # Upsert by canonical product key (prefer handle, else URL); keep newest fetched_at
    latest_by_key: Dict[str, dict] = {}
    for row in all_rows:
        url = row.get("url") or ""
        key = canonical_product_key(row.get("raw_product") or {}, url)
        ts  = row.get("fetched_at") or ""
        prev = latest_by_key.get(key)
        if (not prev) or (ts > prev.get("fetched_at", "")):
            latest_by_key[key] = row

    # Group by site
    per_site: Dict[str, dict] = {}
    for row in latest_by_key.values():
        site = row.get("site_name") or ""
        host = host_only(site or row.get("url") or "")
        bucket = per_site.setdefault(site, {"site": site, "host": host, "products": []})
        bucket["products"].append({"url": row.get("url"), "product": row.get("raw_product")})

    # Sort products within each site for stable output
    for b in per_site.values():
        b["products"].sort(key=lambda x: (x.get("url") or ""))

    out = {
        "generated_at_utc": now_iso(),
        "total_sites": len(per_site),
        "total_products": sum(len(b["products"]) for b in per_site.values()),
        "sites": list(per_site.values()),
    }
    out = _clean_obj(out)

    with open(out_path, "w", encoding="utf-8", newline="\n") as f:
        json.dump(out, f, ensure_ascii=False, indent=2)
    log(f"Structured merge → {out_path}")

# ------------------------------
# Top-level Agent 6
# ------------------------------
async def run_agent6():
    ensure_dirs()
    site_meta, site_urls = load_sites_and_urls()

    selection = build_selection(site_meta, site_urls, top_n=A6_TOP_N)
    sel_ts = ts_for_filename()
    selection_path = os.path.join(A6_OUTPUT_ROOT, "selection", f"run_{sel_ts}_selection.json")
    write_json(selection_path, selection)
    log(f"Selection saved → {selection_path}")

    chosen_hosts = [e["site"] for e in selection["chosen"]]
    if not chosen_hosts:
        log("No sites chosen (all completed or none available).")
        run_structured_merge(A6_OUTPUT_ROOT)
        return

    log(f"Chosen {len(chosen_hosts)} site(s) to crawl (completed skipped, filled to TOP_N if possible):")
    for e in selection["chosen"]:
        log(f"  #{e['rank']:>2} {e['host']}  products={e['product_count_in_db']}  manual={e['manual_done']}")

    if A6_DRY_RUN:
        log("DRY RUN enabled — not fetching or writing site products.")
        run_structured_merge(A6_OUTPUT_ROOT)
        return

    # Crawl each selected site (skip manual-done but mark completed)
    for site in chosen_hosts:
        urls = site_urls.get(site, [])
        host = host_only(site)
        if not urls:
            log(f"[site] {site} has 0 URLs; skipping")
            continue

        if host in A6_MANUALLY_DONE_SITES:
            sdir = site_dir(host)
            state_path = os.path.join(sdir, "state.json")
            state = read_json(state_path, default={"pending": [], "done": [], "failed": [], "last_run": None})
            already_done = set(state.get("done", []))
            new_done = [u for u in urls if u not in already_done]
            state["done"] = list(already_done.union(new_done))
            state["pending"] = [u for u in state.get("pending", []) if u not in state["done"]]
            state["last_run"] = now_iso()
            write_json(state_path, state)

            manifest_path = os.path.join(sdir, "manifest.json")
            manifest = read_json(manifest_path, default={})
            write_json(manifest_path, manifest)

            mark_site_completed(host, total=len(urls), done=len(state["done"]), failed=len(state["failed"]))
            log(f"[site] {host} marked as DONE (manual). done={len(state['done'])} pending={len(state['pending'])}")
            continue

        await crawl_site(site, urls, site_meta.get(site, {}))

        # Mark completed if nothing pending
        state = read_json(os.path.join(site_dir(host), "state.json"), default={})
        if not state.get("pending"):
            mark_site_completed(host, total=len(urls), done=len(state.get("done", [])), failed=len(state.get("failed", [])))

    # Snapshot
    snap = {
        "generated_at_utc": now_iso(),
        "db_path": DB_PATH,
        "requested_top_n": A6_TOP_N,
        "dry_run": A6_DRY_RUN,
        "sites_processed": chosen_hosts,
    }
    snap_path = os.path.join(A6_OUTPUT_ROOT, "snapshots", f"A6_run_{sel_ts}_summary.json")
    write_json(snap_path, snap)
    log(f"Wrote A6 snapshot → {snap_path}")

    # Final structured merge (site → products)
    run_structured_merge(A6_OUTPUT_ROOT)

# ------------------------------
# Runner shim
# ------------------------------
def _run_main(coro):
    try:
        loop = asyncio.get_running_loop()
        if loop.is_running():
            return asyncio.create_task(coro)
    except RuntimeError:
        pass
    return asyncio.run(coro)

if __name__ == "__main__":
    t = _run_main(run_agent6())
    if t is not None:
        try:
            await t
        except NameError:
            pass


[A6 18:53:37] Selection saved → D:\museai\data\a6\selection\run_20250916T185337Z_selection.json
[A6 18:53:37] Chosen 15 site(s) to crawl (completed skipped, filled to TOP_N if possible):
[A6 18:53:37]   #28 www.jaypore.com  products=24757  manual=False
[A6 18:53:37]   #29 www.libas.in  products=17634  manual=False
[A6 18:53:37]   #30 www.berrylush.com  products=15716  manual=False
[A6 18:53:37]   #31 wforwoman.com  products=15074  manual=False
[A6 18:53:37]   #32 www.koskii.com  products=9796  manual=False
[A6 18:53:37]   #33 okhai.org  products=8928  manual=False
[A6 18:53:37]   #34 www.superkicks.in  products=7301  manual=False
[A6 18:53:37]   #35 www.kazo.com  products=6521  manual=False
[A6 18:53:37]   #36 www.styched.in  products=6466  manual=False
[A6 18:53:37]   #37 www.tjori.com  products=6106  manual=False
[A6 18:53:37]   #38 suta.in  products=5071  manual=False
[A6 18:53:37]   #39 torani.in  products=4949  manual=False
[A6 18:53:37]   #40 www.campussutra.com  products=4333  m

CancelledError: 