<a href="https://colab.research.google.com/github/Menon-Vineet/Python-Codes/blob/main/Audio_retrieve.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
import json
import time
import zipfile
from pathlib import Path
from urllib.parse import urlparse

import requests
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeoutError

# ============================
# CONFIG
# ============================
URLS = [
    "https://login.formation-tcfcanada.com/comprehension-oral-test-20-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-21-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-22-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-23-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-24-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-25-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-26-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-27-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-28-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-29-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-30-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-31-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-32-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-33-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-34-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-35-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-36-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-37-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-38-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-39-/",
    "https://login.formation-tcfcanada.com/comprehension-oral-test-40-/",
]

OUT_DIR = Path("french_audio")
ZIP_PATH = Path("french_audio.zip")
REPORT_PATH = OUT_DIR / "download_report.json"
DEBUG_DIR = OUT_DIR / "_debug"

USER_DATA_DIR = r"C:\Users\Vineet\pw_chrome_profile"
PROFILE_DIR_NAME = "Profile 5"

AUDIO_EXTS = (".mp3", ".m4a", ".aac", ".wav", ".ogg", ".webm")
NAV_TIMEOUT_MS = 120_000
CAPTURE_WINDOW_SEC = 12
POST_TRIGGER_WAIT_SEC = 1.2
BETWEEN_PAGES_SLEEP_SEC = 2.0

# Resume
RESUME_SKIP_COMPLETED_PAGES = True
MIN_FILES_PER_PAGE = 30

# If you get challenged, we pause and let you solve it manually
ENABLE_LOGIN_PAUSE = True
MAX_LOGIN_PAUSES = 20

REQ_TIMEOUT = (20, 90)

AUDIO_MIME_HINTS = ("audio/", "application/octet-stream")
URL_HINTS = ("audio", "mp3", "m4a", "media", "sound", "listen", "download")


# ============================
# HELPERS
# ============================
def page_number_from_url(url: str) -> int:
    m = re.search(r"test-(\d+)", url, flags=re.IGNORECASE)
    return int(m.group(1)) if m else -1


def safe_filename(name: str) -> str:
    return re.sub(r'[<>:"/\\|?*]+', "_", name).strip()


def ext_from_url(url: str) -> str:
    path = urlparse(url).path.lower().split("?")[0]
    for ext in AUDIO_EXTS:
        if path.endswith(ext):
            return ext
    return ".mp3"


def zip_folder(folder: Path, zip_path: Path):
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z:
        for p in folder.rglob("*"):
            if p.is_file():
                z.write(p, arcname=str(p.relative_to(folder)))


def to_requests_cookies(playwright_cookies: list[dict]) -> dict:
    return {c["name"]: c["value"] for c in playwright_cookies}


def looks_like_audio(url: str, mime: str | None = None) -> bool:
    u = (url or "").lower()
    base = u.split("?")[0]
    if any(base.endswith(ext) for ext in AUDIO_EXTS):
        return True
    if mime:
        ml = mime.lower()
        if ml.startswith("audio/"):
            return True
        if ml in AUDIO_MIME_HINTS and any(h in u for h in URL_HINTS):
            return True
    if any(h in u for h in URL_HINTS):
        return True
    return False


def download_with_session(url: str, out_path: Path, cookies: dict, referer: str) -> tuple[bool, str]:
    headers = {
        "Referer": referer,
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/143.0.0.0 Safari/537.36"
        ),
        "Accept": "*/*",
    }
    try:
        with requests.get(url, headers=headers, cookies=cookies, stream=True, timeout=REQ_TIMEOUT) as r:
            r.raise_for_status()
            out_path.parent.mkdir(parents=True, exist_ok=True)
            with open(out_path, "wb") as f:
                for chunk in r.iter_content(chunk_size=1024 * 256):
                    if chunk:
                        f.write(chunk)
        return True, ""
    except Exception as e:
        return False, str(e)


def trigger_audio_on_page(page):
    # try click many "Écouter"/play style controls
    play_like = page.locator(
        "button:has-text('Écouter'), button:has-text('Lecture'), "
        "button[aria-label*='play' i], button[title*='play' i], "
        ".plyr__control, .jp-play, .btn-play, .audio-play, .play"
    )
    try:
        n = min(play_like.count(), 120)
        for i in range(n):
            try:
                play_like.nth(i).click(timeout=800)
                time.sleep(POST_TRIGGER_WAIT_SEC)
            except Exception:
                pass
    except Exception:
        pass

    # scroll jiggle
    try:
        page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
        time.sleep(0.6)
        page.evaluate("window.scrollTo(0, 0)")
        time.sleep(0.6)
    except Exception:
        pass


def harvest_audio_srcs_from_dom(page) -> list[str]:
    js = """
    () => {
      const urls = new Set();
      const push = (u) => { if (u && typeof u === 'string') urls.add(u); };
      document.querySelectorAll('audio').forEach(a => {
        push(a.src);
        a.querySelectorAll('source').forEach(s => push(s.src));
      });
      document.querySelectorAll('source').forEach(s => push(s.src));
      return Array.from(urls);
    }
    """
    try:
        return page.evaluate(js) or []
    except Exception:
        return []


def count_downloaded_for_page(page_num_label: int) -> int:
    if not OUT_DIR.exists():
        return 0
    return sum(
        1
        for p in OUT_DIR.glob(f"Page {page_num_label} Q *")
        if p.is_file() and p.stat().st_size > 0
    )


def dedupe_preserve_order(urls: list[str]) -> list[str]:
    seen = set()
    out = []
    for u in urls:
        if not u:
            continue
        if u.startswith("blob:"):
            continue
        if u in seen:
            continue
        seen.add(u)
        out.append(u)
    return out


def on_test_page(page) -> bool:
    """Positive confirmation: only return True when we see real test text."""
    try:
        body = (page.inner_text("body") or "").lower()
        good = ["lecteur audio", "limite de temps", "compréhension orale", "question"]
        return sum(1 for g in good if g in body) >= 2
    except Exception:
        return False


def on_cloudflare_or_login(page) -> bool:
    """Detect CF/login interstitial."""
    try:
        body = (page.inner_text("body") or "").lower()
        bad = [
            "verifying you are human",
            "needs to review the security",
            "checking your browser",
            "cloudflare",
            "captcha",
            "se connecter",
            "connexion",
            "mot de passe",
            "identifiant",
        ]
        return any(b in body for b in bad) and not on_test_page(page)
    except Exception:
        return False


def ensure_ready(page, url, debug_tag: str, pause_counter: list[int]):
    """
    Navigate to url and ensure we are on the test page.
    If Cloudflare/login is shown, pause for manual solving, then continue WITHOUT reopening a new tab.
    """
    while True:
        try:
            page.goto(url, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS)
        except PWTimeoutError:
            pass

        time.sleep(1.5)

        if on_test_page(page):
            return

        if ENABLE_LOGIN_PAUSE and on_cloudflare_or_login(page):
            pause_counter[0] += 1
            shot = DEBUG_DIR / f"{debug_tag}_login_pause_{pause_counter[0]}.png"
            try:
                page.screenshot(path=str(shot), full_page=True)
            except Exception:
                pass

            if pause_counter[0] > MAX_LOGIN_PAUSES:
                raise RuntimeError(f"Too many login pauses. Last screenshot: {shot}")

            print("\n[LOGIN REQUIRED] Cloudflare/login detected.")
            print(f"[debug] Screenshot saved: {shot}")
            print("In the Playwright Chrome window:")
            print("1) Complete Cloudflare (wait until it finishes).")
            print("2) Log in.")
            print("3) After login, OPEN the target test page and confirm you see 'Lecteur audio' + timer.")
            input("Press ENTER only when the TEST CONTENT is visible… ")

            # Critical: give CF a moment to set cookies after you see content
            time.sleep(6)
            # Loop continues, but we stay on SAME PAGE OBJECT, no “retry tab”.
            continue

        # Not clearly test, not clearly CF — wait a bit and retry in-place
        time.sleep(2)


# ============================
# MAIN
# ============================
def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    DEBUG_DIR.mkdir(parents=True, exist_ok=True)

    report = {
        "user_data_dir": USER_DATA_DIR,
        "profile_directory": PROFILE_DIR_NAME,
        "pages": [],
        "failures": [],
    }

    login_pause_counter = [0]

    with sync_playwright() as p:
        context = p.chromium.launch_persistent_context(
            user_data_dir=USER_DATA_DIR,
            channel="chrome",
            headless=False,
            slow_mo=120,
            args=[
                f'--profile-directory={PROFILE_DIR_NAME}',
                "--start-maximized",
                "--no-first-run",
                "--no-default-browser-check",
                "--autoplay-policy=no-user-gesture-required",
            ],
            viewport=None,
            locale="en-CA",
        )

        # ONE TAB for the whole run (reduces CF suspicion)
        page = context.new_page()

        # CDP capture once, keep it alive
        captured = []
        seen = set()

        cdp = context.new_cdp_session(page)
        cdp.send("Network.enable")

        def on_cdp_response(evt):
            try:
                resp = evt.get("response", {})
                u = resp.get("url", "")
                mime = resp.get("mimeType") or ""
                if u and looks_like_audio(u, mime):
                    if u.lower().startswith("blob:"):
                        return
                    if u not in seen:
                        seen.add(u)
                        captured.append(u)
            except Exception:
                pass

        cdp.on("Network.responseReceived", on_cdp_response)

        for idx, url in enumerate(URLS, start=1):
            page_num = page_number_from_url(url)
            page_num_label = page_num if page_num != -1 else idx

            if RESUME_SKIP_COMPLETED_PAGES:
                already = count_downloaded_for_page(page_num_label)
                if already >= MIN_FILES_PER_PAGE:
                    print(f"\n=== Page {page_num_label}: already have {already} files → SKIP ===")
                    continue

            print(f"\n=== Page {page_num_label}: {url} ===")

            try:
                # reset capture buffers per page
                captured.clear()
                seen.clear()

                ensure_ready(page, url, f"page_{page_num_label}", login_pause_counter)

                # Trigger network audio
                trigger_audio_on_page(page)

                # DOM harvest (some srcs appear here)
                dom_urls = harvest_audio_srcs_from_dom(page)
                for u in dom_urls:
                    if u and looks_like_audio(u) and not u.lower().startswith("blob:") and u not in seen:
                        seen.add(u)
                        captured.append(u)

                # wait capture window
                for t in range(CAPTURE_WINDOW_SEC):
                    print(f"[info] Capturing network… {t+1}/{CAPTURE_WINDOW_SEC} | urls: {len(captured)}", end="\r")
                    time.sleep(1)
                print()

                captured_urls = dedupe_preserve_order(list(captured))

                if not captured_urls:
                    shot = DEBUG_DIR / f"page_{page_num_label}_no_urls.png"
                    try:
                        page.screenshot(path=str(shot), full_page=True)
                    except Exception:
                        pass
                    raise RuntimeError(f"No audio URLs captured. Screenshot: {shot}")

                print(f"[info] Captured {len(captured_urls)} audio URLs. Downloading…")

                req_cookies = to_requests_cookies(context.cookies())
                downloaded_files = []
                failures_here = []

                for q_idx, audio_url in enumerate(captured_urls, start=1):
                    ext = ext_from_url(audio_url)
                    out_name = safe_filename(f"Page {page_num_label} Q {q_idx}{ext}")
                    out_path = OUT_DIR / out_name

                    if out_path.exists() and out_path.stat().st_size > 0:
                        downloaded_files.append(out_name)
                        print(f"[SKIP] {out_name}")
                        continue

                    ok, err = download_with_session(audio_url, out_path, cookies=req_cookies, referer=url)
                    if ok:
                        downloaded_files.append(out_name)
                        print(f"[OK] {out_name}")
                    else:
                        failures_here.append({"audio_url": audio_url, "error": err})
                        report["failures"].append({"page_url": url, "audio_url": audio_url, "error": err})
                        print(f"[FAIL] {out_name}: {err}")

                report["pages"].append(
                    {
                        "page_url": url,
                        "page_number": page_num_label,
                        "captured_audio_urls_count": len(captured_urls),
                        "downloaded_files": downloaded_files,
                        "page_failures": failures_here,
                    }
                )

            except Exception as e:
                err = str(e)
                print(f"[ERROR] Page {page_num_label} failed: {err}")
                report["pages"].append(
                    {
                        "page_url": url,
                        "page_number": page_num_label,
                        "captured_audio_urls_count": 0,
                        "downloaded_files": [],
                        "page_failures": [{"error": err}],
                    }
                )

            time.sleep(BETWEEN_PAGES_SLEEP_SEC)

        REPORT_PATH.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8")
        zip_folder(OUT_DIR, ZIP_PATH)

        try:
            page.close()
        except Exception:
            pass
        context.close()

    print("\nDone.")
    print(f"Saved folder: {OUT_DIR.resolve()}")
    print(f"Zip:         {ZIP_PATH.resolve()}")
    print(f"Report:      {REPORT_PATH.resolve()}")
    print(f"Debug shots: {DEBUG_DIR.resolve()}")


if __name__ == "__main__":
    main()
