In [1]:
import re, csv, json, asyncio, time, urllib.parse
from typing import Optional, Dict, List, Tuple
import httpx
from bs4 import BeautifulSoup

# ---------------- CONFIG ----------------
FORUM_BASE   = "https://www.agnoshealth.com"
START_ID     = 1          
MISS_GAP     = 300        # stop after this many consecutive misses AFTER the last hit
CONCURRENCY  = 16         # concurrent requests (tune if needed)
REQ_TIMEOUT  = 25
PAUSE_EACH   = 0.05       # small pause per request for politeness
OUTPUT_JSON  = "agnos_forum_threads.json"
OUTPUT_CSV   = "agnos_forum_threads.csv"

UA = {
    "User-Agent": "AgnosRAGBot/0.6 (+contact: you@example.com)",
    "Accept-Language": "th,en;q=0.8",
}

# ------------- helpers -------------
def clean(s: Optional[str]) -> Optional[str]:
    if s is None: return None
    return re.sub(r"\s+", " ", s).strip() or None

def url_for_id(tid: int) -> str:
    return f"{FORUM_BASE}/forums/{tid}"

def extract_thread_id_from_url(url: str) -> str:
    up = urllib.parse.urlparse(url)
    segs = [s for s in up.path.split("/") if s]
    return segs[-1] if segs else url

def parse_thread_html(html: str, url: str) -> Optional[Dict]:
    """
    Return dict if this is a valid thread page (has either category or title),
    otherwise None. Uses your exact selectors.
    """
    soup = BeautifulSoup(html, "lxml")

    # 1) thread_category: <span class="text-primary_blue-500 line-clamp-1">…</span>
    cat_el = soup.select_one("span.text-primary_blue-500.line-clamp-1")
    thread_category = clean(cat_el.get_text(" ", strip=True)) if cat_el else None

    # 2) title: first <p class="text-sm text-gray-500 line-clamp-3">…</p>
    title = None
    for el in soup.select("p.text-sm.text-gray-500.line-clamp-3"):
        title = clean(el.get_text(" ", strip=True))
        break

    # Not a valid thread if neither marker present
    if not (thread_category or title):
        return None

    # 3) answer_by_doctor: first <p class="mt-4"> after label; else first p.mt-4
    answer_by_doctor = None
    label_node = soup.find(string=re.compile(r"คำตอบโดยแพทย์ผู้เชี่ยวชาญ"))
    if label_node and getattr(label_node, "parent", None):
        after_label_p = label_node.parent.find_next("p", class_="mt-4")
        if after_label_p:
            answer_by_doctor = clean(after_label_p.get_text(" ", strip=True))
    if not answer_by_doctor:
        any_p = soup.find("p", class_="mt-4")
        if any_p:
            answer_by_doctor = clean(any_p.get_text(" ", strip=True))

    out = {
        "url": url,
        "thread_id": extract_thread_id_from_url(url),
        "title": title,
        "thread_category": thread_category,
        "answer_by_doctor": answer_by_doctor,
    }
    out["title_category"] = clean(f"{title or ''} {thread_category or ''}")
    return out


# ------------- save utils -------------
def save_json(data: List[Dict], path: str):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def save_csv(data: List[Dict], path: str):
    fields = ["url","thread_id","title","thread_category","answer_by_doctor","title_category"]
    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fields)
        w.writeheader()
        for row in data:
            w.writerow({k: row.get(k) for k in fields})



# ------------- async HTTP -------------
async def fetch_text(client: httpx.AsyncClient, url: str) -> Optional[str]:
    try:
        r = await client.get(url, timeout=REQ_TIMEOUT, follow_redirects=True)
        if r.status_code != 200:
            return None
        return r.text
    except Exception:
        return None

async def fetch_and_parse(client: httpx.AsyncClient, tid: int) -> Optional[Dict]:
    url = url_for_id(tid)
    html = await fetch_text(client, url)
    if not html:
        return None
    return parse_thread_html(html, url)


# ------------- forward scan with miss-streak stop -------------
CHECKPOINT_EVERY_HITS = 50     # write partial JSON every N hits
HEARTBEAT_SECS        = 3.0    # print progress every N seconds
VERBOSE_HIT_LOGS      = True   # print a short line for each hit (can set False to quiet)

async def scan_all_threads(start_id: int = START_ID, miss_gap: int = MISS_GAP, concurrency: int = CONCURRENCY) -> List[Dict]:
    """
    Scan IDs upward from start_id. Stop after `miss_gap` consecutive misses *after* the last hit.
    Emits periodic progress and checkpoints.
    """
    results: List[Dict] = []
    last_hit_id = 0
    miss_streak = 0
    next_id = start_id
    processed = 0
    hits = 0
    t0 = time.time()
    last_beat = t0
    last_proc = 0

    sem = asyncio.Semaphore(concurrency)
    running: set[asyncio.Task] = set()
    lock = asyncio.Lock()              # to serialize console output/checkpoints
    stop_event = asyncio.Event()

    async with httpx.AsyncClient(headers=UA) as client:
        async def worker(tid: int):
            nonlocal last_hit_id, miss_streak, processed, hits
            async with sem:
                await asyncio.sleep(PAUSE_EACH)
                url = url_for_id(tid)
                html = await fetch_text(client, url)
                parsed = parse_thread_html(html, url) if html else None

                async with lock:
                    processed += 1
                    if parsed is not None:
                        # hit
                        hits += 1
                        if tid > last_hit_id:
                            last_hit_id = tid
                            miss_streak = 0
                        if VERBOSE_HIT_LOGS:
                            cat = (parsed.get("thread_category") or "")[:40]
                            ttl = (parsed.get("title") or "")[:40]
                            print(f"[hit] id={tid} cat='{cat}' title='{ttl}'")
                        # checkpoint
                        results.append(parsed)
                        if hits % CHECKPOINT_EVERY_HITS == 0:
                            save_json(results, "agnos_forum_threads_partial.json")
                    else:
                        # miss
                        if tid > last_hit_id:
                            miss_streak += 1

        async def heartbeat():
            nonlocal last_beat, last_proc
            while not stop_event.is_set():
                await asyncio.sleep(HEARTBEAT_SECS)
                async with lock:
                    now = time.time()
                    dt = now - last_beat
                    delta_proc = processed - last_proc
                    rps = (delta_proc / dt) if dt > 0 else 0.0
                    print(f"[{now - t0:6.1f}s] next_id={next_id} last_hit={last_hit_id} "
                          f"hits={hits} processed={processed} miss_streak={miss_streak} ~{rps:.1f} req/s")
                    last_beat, last_proc = now, processed

        hb_task = asyncio.create_task(heartbeat())

        # launch in bursts; stop when post-max miss gap reached
        try:
            while True:
                burst = []
                async with lock:
                    # stop condition (post-max gap)
                    if last_hit_id and (next_id - 1 - last_hit_id) >= miss_gap:
                        break
                for _ in range(concurrency * 8):
                    async with lock:
                        # stop condition inside loop too
                        if last_hit_id and (next_id - 1 - last_hit_id) >= miss_gap:
                            break
                        tid = next_id
                        next_id += 1
                    t = asyncio.create_task(worker(tid))
                    running.add(t)
                    t.add_done_callback(running.discard)
                if running:
                    await asyncio.gather(*list(running))
                    running.clear()
                else:
                    # nothing running -> done
                    break
        finally:
            stop_event.set()
            await hb_task

    # final sort & return
    results.sort(key=lambda r: int(r["thread_id"]))
    return results


async def run_full_scan_async():
    print(f"Scanning from id {START_ID} with miss gap {MISS_GAP} and concurrency {CONCURRENCY} …")
    t0 = time.time()
    rows = await scan_all_threads(start_id=START_ID, miss_gap=MISS_GAP, concurrency=CONCURRENCY)
    dt = time.time() - t0
    print(f"\nFound {len(rows)} threads in {dt:.1f}s.")
    if rows:
        print(f"ID range covered: {rows[0]['thread_id']} → {rows[-1]['thread_id']}")
        print("Sample row:")
        print(json.dumps(rows[0], ensure_ascii=False, indent=2))
    save_json(rows, OUTPUT_JSON)
    save_csv(rows, OUTPUT_CSV)
    print(f"\nSaved JSON: {OUTPUT_JSON}")
    print(f"Saved CSV : {OUTPUT_CSV}")
    return rows

In [2]:
rows = await run_full_scan_async()


Scanning from id 1 with miss gap 300 and concurrency 16 …
[hit] id=9 cat='ขี้หูอุดตัน (Impacted cerumen)' title='มีอาการได้เสียงกรอบแกลบในหูมั้ยมีอาการเจ'
[hit] id=16 cat='ภาวะการปรับตัวผิดปกติ (Adjustment disord' title='ทำแบบสอบถามเดือนละครั้งนี่ครั้งที่3แล้วค'
[hit] id=14 cat='เยื่อหุ้มหัวใจอักเสบ (Acute pericarditis' title='สวัสดีค่ะ หนูมีอาการเจ็บแน่นหน้าอกเหมือน'
[hit] id=8 cat='ผื่นตามร่างกาย (Dermatitis unspecified)' title='สวัสดีค่ะคุณหมอ รบกวนสอบถามหน่อยค่ะว่า น'
[hit] id=10 cat='ภาวะผิดปกติที่เกิดจากสารเสพติด (Substanc' title='ผู้ป่วยโรคนี้ต้องรักษาด้วยวิธีไหนคะ'
[hit] id=7 cat='การอักเสบของอุ้งเชิงกราน (Pelvic inflamm' title='ประจำเดือนไม่มาเกือบ3เดือน แต่ตรวจขึ้นขี'
[hit] id=1 cat='ผื่นตามร่างกาย (Dermatitis unspecified)' title='เป็นคล้ายๆผดผื่นที่ก้นค่ะน้อง9เดือน ผดนู'
[   3.3s] next_id=129 last_hit=16 hits=7 processed=8 miss_streak=0 ~2.4 req/s
[hit] id=6 cat='ผื่นตามร่างกาย (Dermatitis unspecified)' title='เวลาหลังอาบน้ำทุกครั้ง(ยกเว้นอาบน้ำจากเค'
[hit] id=11 cat='กรดไหล