##Contributor: Akash Yadav

# GESIS Eurobarometer bulk downloader (≤ 2015) — Datasets → Purpose-of-use → Stata `.dta`

This notebook implements the flow confirmed with your screenshots:

1. From the **GESIS Eurobarometer study overview**, collect projects with year ≤ 2015.
2. For each project page:
   - Prefer **"Study description"** link.
   - If missing, fall back to the **DOI** in the remarks.
3. Resolve to the **GESIS Search dataset page** (`https://search.gesis.org/research_data/...`).
4. On the dataset page:
   - Open **Downloads → Datasets**.
   - Select the purpose from the dropdown: **"for further education and qualification"**.
   - Then click only **Stata `.dta`** items to download.

Downloads go to `./downloads/ZAxxxx/` and a resume-friendly manifest is written to:
`./downloads/download_manifest_le_2016.csv`

**Credentials** are requested at runtime and stored only in RAM.

Headless mode is enabled by default (needed if no X server / DISPLAY).


In [None]:
# Install dependencies (run once), then restart the kernel.
!pip -q install playwright beautifulsoup4 lxml tqdm requests

# Install Chromium + system deps
!python -m playwright install --with-deps chromium


In [None]:
import re
import csv
from dataclasses import dataclass
from datetime import datetime
from getpass import getpass
from pathlib import Path
from typing import List, Optional, Tuple
from google.colab import drive
from pathlib import Path


import requests
from bs4 import BeautifulSoup
from tqdm.auto import tqdm

from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError

STUDY_OVERVIEW_URL = (
    "https://www.gesis.org/en/eurobarometer-data-service/data-and-documentation/"
    "standard-special-eb/study-overview"
)

YEAR_CUTOFF = 2015

PURPOSE_CANDIDATES = [
    "for further education and qualification",
    "For further education and qualification",
    "zur weiteren Ausbildung und Qualifikation",
]

drive.mount("/content/drive")

DOWNLOAD_ROOT = Path("/content/drive/MyDrive/gesis_downloads").resolve()
DOWNLOAD_ROOT.mkdir(parents=True, exist_ok=True)

MANIFEST = DOWNLOAD_ROOT / f"download_manifest_le_{YEAR_CUTOFF}.csv"

print("DOWNLOAD_ROOT:", DOWNLOAD_ROOT)
print("MANIFEST:", MANIFEST)

UA = (
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/120.0 Safari/537.36"
)

session = requests.Session()
session.headers.update({"User-Agent": UA})

DOI_RE = re.compile(r"\b10\.\d{4,9}/\S+\b")
DTA_RE = re.compile(r"\.dta(?:$|[?#]|\s|\))", re.I)

@dataclass
class EBProject:
    title: str
    project_url: str
    year: int
    za_id: Optional[str]
    doi_url: Optional[str] = None
    dataset_url: Optional[str] = None

def _abs_url(base: str, href: str) -> str:
    return requests.compat.urljoin(base, href)

def extract_year(text: str) -> Optional[int]:
    years = re.findall(r"\b(19\d{2}|20\d{2})\b", text)
    return int(years[-1]) if years else None

def extract_za_id(text: str) -> Optional[str]:
    m = re.search(r"\bZA\s*No\.?\s*(\d{3,5})\b", text)
    if m:
        return f"ZA{m.group(1)}"
    m2 = re.search(r"\bZA\s*(\d{3,5})\b", text)
    if m2:
        return f"ZA{m2.group(1)}"
    return None

def list_projects_from_overview() -> List[EBProject]:
    r = session.get(STUDY_OVERVIEW_URL, timeout=60)
    r.raise_for_status()
    soup = BeautifulSoup(r.text, "lxml")

    projects: List[EBProject] = []
    for a in soup.select("a[href]"):
        title = (a.get_text(" ", strip=True) or "").strip()
        if not title.startswith("Eurobarometer"):
            continue
        if "ZA" not in title:
            continue
        href = a.get("href")
        if not href:
            continue
        url = _abs_url(STUDY_OVERVIEW_URL, href)
        yr = extract_year(title)
        if yr is None:
            continue
        za = extract_za_id(title)
        projects.append(EBProject(title=title, project_url=url, year=yr, za_id=za))

    return list({p.project_url: p for p in projects}.values())

def get_dataset_or_doi_from_project_page(project_url: str) -> Tuple[Optional[str], Optional[str]]:
    """Return (dataset_url, doi_url) from a project page.
    Prefer 'Study description'; else DOI hyperlink; else DOI text.
    """
    r = session.get(project_url, timeout=60)
    r.raise_for_status()
    soup = BeautifulSoup(r.text, "lxml")

    for a in soup.select("a[href]"):
        txt = (a.get_text(" ", strip=True) or "").strip().lower()
        if "study description" in txt:
            return (_abs_url(project_url, a["href"].strip()), None)

    for a in soup.select("a[href]"):
        href = a["href"].strip()
        if "doi.org/" in href or "dx.doi.org/" in href:
            return (None, _abs_url(project_url, href))

    text = soup.get_text(" ", strip=True)
    m = DOI_RE.search(text)
    if m:
        return (None, f"https://doi.org/{m.group(0)}")

    return (None, None)

def resolve_to_final_url(url: str) -> str:
    r = session.get(url, allow_redirects=True, timeout=60)
    r.raise_for_status()
    return r.url

projects = list_projects_from_overview()
targets = sorted([p for p in projects if p.year <= YEAR_CUTOFF], key=lambda x: (x.year, x.title))
print(f"Found {len(targets)} Eurobarometer projects with year <= {YEAR_CUTOFF}.")
targets[:5]


In [None]:
resolved: List[EBProject] = []
for p in tqdm(targets, desc="Resolving dataset URLs from project pages"):
    try:
        dataset_url, doi_url = get_dataset_or_doi_from_project_page(p.project_url)
        if dataset_url:
            p.dataset_url = resolve_to_final_url(dataset_url)
        elif doi_url:
            p.doi_url = doi_url
            p.dataset_url = resolve_to_final_url(doi_url)
        else:
            p.dataset_url = None

        if p.dataset_url:
            resolved.append(p)
    except Exception:
        continue

targets = resolved
print("Targets with resolved dataset_url:", len(targets))
print("Example dataset_url:", targets[0].dataset_url if targets else "(none)")


In [None]:
def manifest_has(za_id: str, filename: str) -> bool:
    if not MANIFEST.exists():
        return False
    with MANIFEST.open("r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            if row.get("za_id") == za_id and row.get("filename") == filename and row.get("status") == "ok":
                return True
    return False

def append_manifest(row: dict):
    exists = MANIFEST.exists()
    with MANIFEST.open("a", newline="", encoding="utf-8") as f:
        fieldnames = ["timestamp", "za_id", "study_title", "dataset_url", "filename", "saved_to", "status", "note"]
        w = csv.DictWriter(f, fieldnames=fieldnames)
        if not exists:
            w.writeheader()
        w.writerow(row)

async def accept_cookies_if_present(page):
    for txt in ["Accept all", "Accept", "I agree", "Agree", "Alle akzeptieren", "Akzeptieren"]:
        try:
            loc = page.get_by_role("button", name=re.compile(rf"^{re.escape(txt)}$", re.I))
            if await loc.count() > 0 and await loc.first.is_visible():
                await loc.first.click(timeout=1500)
                break
        except Exception:
            pass

async def is_logged_in(page, email_hint: str | None = None) -> bool:
    # Best-effort heuristics
    try:
        # Many pages show the user/email in the top bar when logged in
        if email_hint:
            if await page.locator(f"text={email_hint}").count() > 0:
                return True
        # Sometimes there is a logout button/link
        if await page.locator("a:has-text('Logout'), a:has-text('Log out'), a:has-text('Abmelden')").count() > 0:
            return True
    except Exception:
        pass
    return False


async def ensure_logged_in(page, email: str, password: str):
    if await is_logged_in(page, email_hint=email):
        return

    # Try to find an actual login link and navigate to its href (more reliable than click in headless)
    login_href = None
    try:
        cand = page.locator("a:has-text('Login'), a:has-text('Log in')").first
        if await cand.count() > 0:
            login_href = await cand.get_attribute("href")
    except Exception:
        pass

    if login_href:
        try:
            await page.goto(login_href, wait_until="domcontentloaded")
        except Exception:
            pass
    else:
        # fallback: click and hope it navigates
        try:
            await page.locator("a:has-text('Login'), a:has-text('Log in'), button:has-text('Login'), button:has-text('Log in')").first.click(timeout=8000)
            await page.wait_for_timeout(1500)
        except Exception:
            pass

    # If on SSO/login domain, fill credentials
    if is_login_page_url(page.url):
        await login_if_needed(page, email, password)

    # wait briefly for session to settle
    await page.wait_for_timeout(2000)

async def open_datasets_dialog(page):
    # Ensure the right-side Downloads box is present (best-effort)
    try:
        await page.locator("text=Downloads").first.wait_for(timeout=8000)
    except Exception:
        pass

    # Try multiple ways to click "Datasets"
    clicked = False
    clickers = [
        lambda: page.get_by_role("link", name=re.compile(r"^Datasets$", re.I)).click(timeout=15000),
        lambda: page.get_by_role("button", name=re.compile(r"^Datasets$", re.I)).click(timeout=15000),
        lambda: page.locator("a:has-text('Datasets')").first.click(timeout=15000),
        lambda: page.locator("text=Datasets").first.click(timeout=15000),
        lambda: page.locator("div:has-text('Downloads') a:has-text('Datasets')").first.click(timeout=15000),
    ]
    for fn in clickers:
        try:
            await fn()
            clicked = True
            break
        except Exception:
            continue

    if not clicked:
        raise TimeoutError("Could not click 'Datasets' in the Downloads panel")

    # Wait for the modal dialog to appear
    dialog = page.locator("div[role='dialog']").filter(
        has_text=re.compile(r"Download datasets", re.I)
    ).first
    await dialog.wait_for(state="visible", timeout=15000)
    return dialog

async def ensure_purpose_selected(dialog):
    """
    Select purpose-of-use inside the 'Download datasets' modal dialog.
    Must be called AFTER open_datasets_dialog().
    """
    # 1) native <select> inside modal (your screenshots show a select dropdown)
    try:
        sel = dialog.locator("select").first
        await sel.wait_for(state="visible", timeout=5000)
        for purpose in PURPOSE_CANDIDATES:
            try:
                await sel.select_option(label=purpose)
                return True
            except Exception:
                continue
    except Exception:
        pass

    # 2) fallback: combobox-style dropdown inside modal
    try:
        combo = dialog.get_by_role("combobox")
        if await combo.count() > 0:
            await combo.first.click(timeout=3000)
            for purpose in PURPOSE_CANDIDATES:
                opt = dialog.get_by_role("option", name=re.compile(re.escape(purpose), re.I))
                if await opt.count() > 0:
                    await opt.first.click(timeout=3000)
                    return True
                t = dialog.locator(f"text={purpose}")
                if await t.count() > 0:
                    await t.first.click(timeout=3000)
                    return True
    except Exception:
        pass

    return False

async def handle_purpose_dialog(page):
    try:
        prompt_loc = page.locator("text=Please specify a purpose")
        if await prompt_loc.count() == 0:
            prompt_loc = page.locator("text=Bitte geben Sie einen Zweck")
        if await prompt_loc.count() == 0:
            return
        if not await prompt_loc.first.is_visible(timeout=1500):
            return
    except Exception:
        return

    for purpose in PURPOSE_CANDIDATES:
        try:
            loc = page.locator(f"text={purpose}")
            if await loc.count() > 0:
                await loc.first.click(timeout=1500)
                break
        except Exception:
            pass

    for lab in [
        "I agree to the Terms of Use",
        "I agree to the terms of use",
        "Ich stimme den Nutzungsbedingungen zu",
        "I have read and accept",
    ]:
        try:
            cb = page.get_by_label(lab)
            if await cb.count() > 0 and await cb.first.is_visible():
                await cb.first.check()
                break
        except Exception:
            pass

    for btn in ["Download", "Start download", "Continue", "OK", "Confirm", "Proceed", "Submit", "Fortfahren", "Herunterladen"]:
        try:
            b = page.get_by_role("button", name=re.compile(btn, re.I))
            if await b.count() > 0 and await b.first.is_visible():
                await b.first.click()
                break
        except Exception:
            pass

from urllib.parse import urljoin

async def discover_dta_click_targets(dialog, base_url: str):
    """
    Returns list of dicts:
      [{"fname": "...dta", "href": "https://...", "idx": <index in filtered locator>}]
    Works even if the site uses JS / data-attributes for URLs.
    Skips disabled/greyed-out items.
    """
    loc = dialog.locator("a").filter(has_text=re.compile(r"\.dta\b", re.I))
    n = await loc.count()
    items = []

    for i in range(n):
        a = loc.nth(i)

        info = await a.evaluate("""
            el => ({
                text: (el.innerText || '').trim(),
                hrefProp: el.href || '',
                hrefAttr: el.getAttribute('href') || '',
                ariaDisabled: el.getAttribute('aria-disabled') || '',
                className: el.className || '',
                dataHref: (el.dataset && (el.dataset.href || el.dataset.url || el.dataset.download || el.dataset.downloadUrl)) || ''
            })
        """)

        aria_disabled = (info.get("ariaDisabled") or "").lower()
        cls = (info.get("className") or "").lower()
        if aria_disabled == "true" or "disabled" in cls:
            continue

        href = (info.get("hrefProp") or "").strip()
        if not href:
            href = (info.get("dataHref") or "").strip()
        if not href:
            href = (info.get("hrefAttr") or "").strip()

        if (not href) or href.startswith("#") or href.lower().startswith("javascript:"):
            continue

        href_abs = urljoin(base_url, href)

        m = re.search(r"([^/?#]+\.dta)\b", href_abs, re.I)
        if m:
            fname = m.group(1)
        else:
            text = info.get("text") or ""
            m2 = re.search(r"([A-Za-z0-9_\-]+\.dta)\b", text, re.I)
            fname = m2.group(1) if m2 else f"file_{i}.dta"

        items.append({"fname": fname, "href": href_abs, "idx": i})

    # de-dup by href
    dedup = {}
    for it in items:
        dedup[it["href"]] = it
    return [dedup[k] for k in sorted(dedup.keys())]

async def close_dialog(dialog, page):
    # Try the X button in the modal header
    try:
        btn = dialog.locator("button").filter(has=dialog.locator("svg")).first
        if await btn.count() > 0:
            await btn.click(timeout=2000)
            return
    except Exception:
        pass

    # Fallbacks
    try:
        await page.keyboard.press("Escape")
    except Exception:
        pass


print("Manifest path:", MANIFEST)


In [None]:
# --- FULL MAIN DOWNLOAD LOOP (REQUEST-BASED, MODAL-SCOPED) ---

gesis_email = input("GESIS login email: ").strip()
gesis_password = getpass("GESIS password (input hidden): ")

HEADLESS = True  # Colab: keep True

async def run_all():
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=HEADLESS)
        context = await browser.new_context(accept_downloads=True)
        page = await context.new_page()

        # warm-up
        await page.goto("https://search.gesis.org/", wait_until="domcontentloaded")
        await accept_cookies_if_present(page)

        # proactive login once
        try:
            await ensure_logged_in(page, gesis_email, gesis_password)
        except Exception:
            pass

        for proj in tqdm(targets, desc=f"Downloading EB studies <= {YEAR_CUTOFF}"):
            za_dir = DOWNLOAD_ROOT / (proj.za_id or "ZA_UNKNOWN")
            za_dir.mkdir(parents=True, exist_ok=True)

            if not getattr(proj, "dataset_url", None):
                append_manifest({
                    "timestamp": datetime.utcnow().isoformat(),
                    "za_id": proj.za_id or "(unknown)",
                    "study_title": proj.title,
                    "dataset_url": "(none)",
                    "filename": "(none)",
                    "saved_to": "(none)",
                    "status": "skip",
                    "note": "No dataset_url resolved",
                })
                continue

            dialog = None
            try:
                # 1) open the dataset landing page
                await page.goto(proj.dataset_url, wait_until="networkidle")
                await accept_cookies_if_present(page)

                # 2) ensure logged in (important: may not redirect automatically)
                await ensure_logged_in(page, gesis_email, gesis_password)

                # If still on login URL after ensure, do manual completion
                if is_login_page_url(page.url):
                    await login_if_needed(page, gesis_email, gesis_password)
                    await page.goto(proj.dataset_url, wait_until="networkidle")
                    await accept_cookies_if_present(page)

                # 3) open modal: Downloads → Datasets
                dialog = await open_datasets_dialog(page)

                # If modal itself says login required, re-login and reopen
                if await dialog.locator("text=requires a login").count() > 0:
                    await close_dialog(dialog, page)
                    await ensure_logged_in(page, gesis_email, gesis_password)
                    await page.goto(proj.dataset_url, wait_until="networkidle")
                    dialog = await open_datasets_dialog(page)

                # 4) select purpose INSIDE modal
                ok = await ensure_purpose_selected(dialog)
                if not ok:
                    await page.screenshot(path=str(za_dir / "debug_no_purpose_dropdown.png"), full_page=True)
                    append_manifest({
                        "timestamp": datetime.utcnow().isoformat(),
                        "za_id": proj.za_id or "(unknown)",
                        "study_title": proj.title,
                        "dataset_url": proj.dataset_url,
                        "filename": "(none)",
                        "saved_to": "(none)",
                        "status": "error",
                        "note": "Purpose dropdown not found/selected. Likely not logged in. See debug_no_purpose_dropdown.png",
                    })
                    await close_dialog(dialog, page)
                    await page.wait_for_timeout(300)
                    continue

                await page.wait_for_timeout(800)

                # 5) find *valid* .dta links INSIDE modal
                dta_items = await discover_dta_click_targets(dialog, page.url)

                if not dta_items:
                    await page.screenshot(path=str(za_dir / "debug_no_dta_links.png"), full_page=True)
                    append_manifest({
                        "timestamp": datetime.utcnow().isoformat(),
                        "za_id": proj.za_id or "(unknown)",
                        "study_title": proj.title,
                        "dataset_url": proj.dataset_url,
                        "filename": "(none)",
                        "saved_to": "(none)",
                        "status": "warn",
                        "note": "No enabled .dta links in modal (still greyed?). See debug_no_dta_links.png",
                    })
                    await close_dialog(dialog, page)
                    await page.wait_for_timeout(300)
                    continue

                # 6) download each file (request-first; click-fallback; DO NOT save HTML)
                for it in dta_items:
                    filename = it["fname"]
                    href = it["href"]
                    idx = it["idx"]

                    if proj.za_id and manifest_has(proj.za_id, filename):
                        continue

                    try:
                        used = None

                        # --- Attempt A: direct authenticated request ---
                        resp = await context.request.get(
                            href,
                            timeout=180000,
                            headers={"Referer": page.url}
                        )

                        if resp.ok:
                            data = await resp.body()
                            ct = (resp.headers.get("content-type") or "").lower()
                            head = data[:400].lstrip()

                            is_html = ("text/html" in ct) or head.startswith(
                                (b"<!doctype html", b"<html", b"<head", b"<body")
                            )

                            # If it's not HTML and looks like a file, save it
                            if not is_html and len(data) > 1024:  # quick sanity threshold
                                dest = za_dir / filename

                                # avoid overwrite
                                if dest.exists():
                                    stem, suf = dest.stem, dest.suffix
                                    j = 1
                                    while (za_dir / f"{stem}__{j}{suf}").exists():
                                        j += 1
                                    dest = za_dir / f"{stem}__{j}{suf}"

                                dest.write_bytes(data)
                                used = "request"
                            else:
                                used = "fallback"  # request gave HTML/interstitial
                        else:
                            used = "fallback"  # non-200 / blocked

                        # --- Attempt B: click-based download (browser-faithful) ---
                        if used == "fallback":
                            # re-locate the same anchor inside the current modal
                            link_loc = dialog.locator("a").filter(has_text=re.compile(r"\.dta\b", re.I)).nth(idx)

                            async with page.expect_download(timeout=180000) as di:
                                await link_loc.click(button="left", timeout=15000)

                            download = await di.value
                            real_name = download.suggested_filename or filename
                            dest = za_dir / real_name

                            # avoid overwrite
                            if dest.exists():
                                stem, suf = dest.stem, dest.suffix
                                j = 1
                                while (za_dir / f"{stem}__{j}{suf}").exists():
                                    j += 1
                                dest = za_dir / f"{stem}__{j}{suf}"

                            await download.save_as(str(dest))
                            filename = real_name  # for manifest

                        append_manifest({
                            "timestamp": datetime.utcnow().isoformat(),
                            "za_id": proj.za_id or "(unknown)",
                            "study_title": proj.title,
                            "dataset_url": proj.dataset_url,
                            "filename": filename,
                            "saved_to": str(dest),
                            "status": "ok",
                            "note": f"mode=B; used={used}; href={href}",
                        })

                    except Exception as e:
                        await page.screenshot(path=str(za_dir / "debug_download_failed.png"), full_page=True)
                        append_manifest({
                            "timestamp": datetime.utcnow().isoformat(),
                            "za_id": proj.za_id or "(unknown)",
                            "study_title": proj.title,
                            "dataset_url": proj.dataset_url,
                            "filename": filename,
                            "saved_to": "(none)",
                            "status": "error",
                            "note": f"Download failed: {repr(e)}",
                        })
                # 7) close modal
                if dialog is not None:
                    await close_dialog(dialog, page)
                    await page.wait_for_timeout(300)

            except Exception as e:
                try:
                    if dialog is not None:
                        await close_dialog(dialog, page)
                        await page.wait_for_timeout(300)
                except Exception:
                    pass

                append_manifest({
                    "timestamp": datetime.utcnow().isoformat(),
                    "za_id": proj.za_id or "(unknown)",
                    "study_title": proj.title,
                    "dataset_url": proj.dataset_url,
                    "filename": "(none)",
                    "saved_to": "(none)",
                    "status": "error",
                    "note": f"Project-level failure: {repr(e)}",
                })

        print("Done. Manifest saved to:", MANIFEST)
        await browser.close()

await run_all()