
# Google Forms Autofill (YAML-driven, Selenium)
รองรับ:
- `multiple_choice_grid` (Likert: 1 คำตอบต่อ 1 แถว, ตั้ง `weights.default` และ `row_overrides` ได้)
- `choice` (radio), `checkbox`, `dropdown`, `scale`, `text`, `paragraph`, `date`, `time`
- การกรอกหลายหน้า (Next/Submit)

> ก่อนรัน แก้ค่าพารามิเตอร์ในเซลล์ **Config** ให้ตรงกับฟอร์มของคุณ และเตรียมไฟล์ `config_quota.yaml` ไว้โฟลเดอร์เดียวกัน


In [28]:

# !pip install -U selenium webdriver-manager pyyaml pandas
# ถ้ามีอยู่แล้ว ข้ามได้


## Config

In [None]:

FORM_URL = "https://docs.google.com/forms/d/e/================================/viewform"
EDIT_URL = "https://docs.google.com/forms/d/==================================/edit"  # ใช้ดู/แก้ในเบราว์เซอร์เอง
YAML_PATH = "C:\othai\maget\gforms-quota-orchestrator\config_quota.yaml"   # วางไฟล์ YAML ในโฟลเดอร์เดียวกับ .ipynb
N_SUBMISSIONS = 1                  # จำนวนครั้งที่จะส่งฟอร์ม
HEADLESS = False                   # True = ซ่อนหน้าต่างเบราว์เซอร์
PAGE_TIMEOUT = 12                  # วินาที: รอโหลด page/body
DELAY_RANGE = (0.2, 0.3)           # หน่วงสุ่มระหว่าง action ลดโดนบล็อก


## Imports & helpers

In [30]:

import os, re, time, random, json, math, yaml, sys
import pandas as pd
from typing import Dict, Any, List, Optional

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    NoSuchElementException, TimeoutException, StaleElementReferenceException,
    ElementClickInterceptedException
)
from webdriver_manager.chrome import ChromeDriverManager

def dprint(*a):
    print("[debug]", *a)

def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "")).strip()

def sleep_jitter(lo=None, hi=None):
    lo = DELAY_RANGE[0] if lo is None else lo
    hi = DELAY_RANGE[1] if hi is None else hi
    time.sleep(random.uniform(lo, hi))

def pick_weighted(weights: Dict[str, float]) -> Optional[str]:
    items = [(str(k), float(v)) for k, v in (weights or {}).items()]
    items = [(k, max(0.0, v)) for k, v in items if k]
    if not items:
        return None
    total = sum(v for _, v in items)
    if total <= 0:
        return random.choice([k for k, _ in items])
    r = random.uniform(0.0, total)
    acc = 0.0
    for k, v in items:
        acc += v
        if r <= acc:
            return k
    return items[-1][0]

def wait(driver, timeout=PAGE_TIMEOUT):
    return WebDriverWait(driver, timeout, poll_frequency=0.2, ignored_exceptions=(
        NoSuchElementException, StaleElementReferenceException
    ))

def find_elem(ctx, by, sel):
    try:
        return ctx.find_element(by, sel)
    except Exception:
        return None

def find_elems(ctx, by, sel):
    try:
        return ctx.find_elements(by, sel)
    except Exception:
        return []

def scroll_into_view(driver, el):
    try:
        driver.execute_script("arguments[0].scrollIntoView({block:'center', inline:'center'});", el)
        sleep_jitter()
    except Exception:
        pass


## Load YAML

In [31]:

def load_yaml(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        cfg = yaml.safe_load(f) or {}
    ans = cfg.get("answers", [])
    out = []
    for a in ans:
        a = dict(a)
        m = a.get("match", {}) or {}
        # รองรับชื่อกุญแจสองแบบ
        key = m.get("title_contains") or m.get("question_contains") or ""
        m["title_contains"] = norm_text(key)
        a["match"] = m

        # แก้ชื่อ type เก่า
        if a.get("type") == "grid":
            a["type"] = "multiple_choice_grid"
        out.append(a)
    cfg["answers"] = out
    return cfg

cfg = load_yaml(YAML_PATH)
print("Loaded YAML items:", len(cfg.get("answers", [])))


Loaded YAML items: 25


## Start Selenium (Chrome)

In [32]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import os, platform, subprocess

def make_driver(headless=True, size=(1300, 1024), quiet=False, use_wdm=True, **kw):
    # รองรับการเรียกแบบ headless=… หรือ HEADLESS=…
    if 'HEADLESS' in kw:
        headless = kw.pop('HEADLESS')

    opts = Options()
    if headless:
        opts.add_argument("--headless=new")  # ไม่เด้งหน้าต่าง
    opts.add_argument(f"--window-size={size[0]},{size[1]}")

    # ช่วยกันปัญหาใน Docker/CI หรือ root (Linux)
    if os.environ.get("CI") or (os.name == "posix" and hasattr(os, "geteuid") and os.geteuid() == 0):
        opts.add_argument("--no-sandbox")
        opts.add_argument("--disable-dev-shm-usage")

    # เลือกวิธีจัดการไดรเวอร์
    if use_wdm:
        from webdriver_manager.chrome import ChromeDriverManager
        service = Service(ChromeDriverManager().install())
    else:
        service = Service()  # Selenium ≥ 4.6 จะหาไดรเวอร์ให้เอง

    # ซ่อนหน้าต่างคอนโซล/ตัด log ของ chromedriver
    if quiet:
        try:
            service.log_output = subprocess.DEVNULL
        except Exception:
            pass
        if platform.system() == "Windows":
            try:
                service.creationflags = subprocess.CREATE_NO_WINDOW
            except Exception:
                pass

    driver = webdriver.Chrome(service=service, options=opts)
    driver.set_page_load_timeout(30)
    return driver


## Question matching

In [33]:

def get_question_containers(driver):
    # ปกติ Google Forms ใช้ <div role="listitem"> เป็นคอนเทนเนอร์ของคำถาม
    items = find_elems(driver, By.CSS_SELECTOR, 'div[role="listitem"]')
    if not items:
        # บางธีม
        items = find_elems(driver, By.XPATH, '//div[contains(@class,"Qr7Oae") or contains(@class,"geS5n")]')
    return items

def container_text(el):
    try:
        return norm_text(el.text)
    except Exception:
        return ""

def match_question_container(containers, title_contains: str):
    key = norm_text(title_contains)
    if not key:
        return None
    for c in containers:
        txt = container_text(c)
        if key and key in txt:
            return c
    return None


## Fillers per type

In [34]:

def fill_text_or_paragraph(container, value: str):
    # input text
    inp = find_elem(container, By.CSS_SELECTOR, 'input[type="text"], input[type="email"], input[type="tel"], input')
    if inp:
        scroll_into_view(driver, inp)
        try:
            inp.clear()
        except Exception:
            pass
        sleep_jitter()
        inp.send_keys(value)
        sleep_jitter()
        return True
    # textarea
    ta = find_elem(container, By.TAG_NAME, "textarea")
    if ta:
        scroll_into_view(driver, ta)
        try:
            ta.clear()
        except Exception:
            pass
        sleep_jitter()
        ta.send_keys(value)
        sleep_jitter()
        return True
    return False

def click_choice(container, label: str, role: str):
    # role = 'radio' or 'checkbox'
    target = norm_text(label)
    # หา aria-label ตรงๆ
    xp = f'.//div[@role="{role}" and contains(@aria-label, "{target}")]'
    btns = find_elems(container, By.XPATH, xp)
    for b in btns:
        try:
            scroll_into_view(driver, b); b.click(); sleep_jitter(); return True
        except ElementClickInterceptedException:
            driver.execute_script("arguments[0].click();", b); sleep_jitter(); return True
        except Exception:
            pass
    # หาโดยอิงข้อความใน span/label ใกล้เคียง
    spans = find_elems(container, By.XPATH, f'.//*[self::span or self::div][contains(normalize-space(.), "{target}")]')
    for sp in spans:
        # ไต่ขึ้นหา ancestor ที่เป็นปุ่ม
        anc = sp
        for _ in range(5):
            role_attr = anc.get_attribute("role")
            if role_attr == role:
                try:
                    scroll_into_view(driver, anc); anc.click(); sleep_jitter(); return True
                except Exception:
                    try:
                        driver.execute_script("arguments[0].click();", anc); sleep_jitter(); return True
                    except Exception:
                        pass
            try:
                anc = anc.find_element(By.XPATH, "./..")
            except Exception:
                break
        # ถ้าไม่เจอ ancestor เป็นปุ่ม ลองคลิกตัว sp เอง
        try:
            scroll_into_view(driver, sp); sp.click(); sleep_jitter(); return True
        except Exception:
            pass
    return False

def open_and_pick_dropdown(container, label: str):
    btn = find_elem(container, By.XPATH, './/div[@role="listbox" or @role="combobox"]')
    if not btn:
        btn = find_elem(container, By.XPATH, './/div[contains(@class,"e2CuFe") or contains(@class,"ry3kXd")]')
    if not btn:
        return False
    try:
        scroll_into_view(driver, btn); btn.click(); sleep_jitter()
    except Exception:
        try:
            driver.execute_script("arguments[0].click();", btn); sleep_jitter()
        except Exception:
            return False

    target = norm_text(label)
    # หา option ใน overlay ของ Google
    for xp in [
        f'//div[@role="option" and .//*[contains(normalize-space(.), "{target}")]]',
        f'//div[@role="option" and contains(normalize-space(.), "{target}")]',
        f'//span[contains(normalize-space(.), "{target}")]/ancestor::div[@role="option"][1]',
    ]:
        opts = find_elems(driver, By.XPATH, xp)
        if opts:
            opt = opts[0]
            try:
                scroll_into_view(driver, opt); opt.click(); sleep_jitter(); return True
            except Exception:
                try:
                    driver.execute_script("arguments[0].click();", opt); sleep_jitter(); return True
                except Exception:
                    pass
    # fallback: ปิดเมนู
    try:
        btn.click()
    except Exception:
        pass
    return False

def fill_multiple_choice_grid(container, spec: Dict[str, Any]) -> bool:
    # spec: rows, columns, weights.default, weights.row_overrides
    rows = [norm_text(x) for x in spec.get("rows", [])]
    cols = [norm_text(x) for x in spec.get("columns", [])]
    weights_default = spec.get("weights", {}).get("default", {}) or {c: 1 for c in cols}
    row_overrides = spec.get("weights", {}).get("row_overrides", {}) or {}

    table = find_elem(container, By.XPATH, './/*[@role="table" or @role="grid"]')
    if not table:
        table = container

    ok_any = False
    all_rows = find_elems(table, By.XPATH, './/*[@role="row"]')
    for row_label in rows:
        row_el = None
        for r in all_rows:
            rh = find_elem(r, By.XPATH, './/*[@role="rowheader"]')
            rh_txt = norm_text((rh.get_attribute("aria-label") if rh else "") or r.text)
            if row_label and row_label in rh_txt:
                row_el = r; break
        if not row_el:
            cand = find_elems(table, By.XPATH, f'.//*[contains(normalize-space(.), "{row_label}")]/ancestor::*[@role="row"][1]')
            row_el = cand[0] if cand else None
        if not row_el:
            continue

        w = row_overrides.get(row_label, weights_default) or weights_default
        weighted_map = {norm_text(k): float(v) for k, v in w.items() if norm_text(k)}
        for c in cols:
            weighted_map.setdefault(c, 0.0)
        picked_col = pick_weighted(weighted_map) or (random.choice(cols) if cols else None)
        if not picked_col:
            continue

        clicked = False
        radios = find_elems(row_el, By.XPATH, './/*[@role="radio"]')
        for rb in radios:
            lab = norm_text(rb.get_attribute("aria-label") or rb.text)
            if picked_col in lab:
                try:
                    scroll_into_view(driver, rb); rb.click(); sleep_jitter(); clicked = True; break
                except Exception:
                    try:
                        driver.execute_script("arguments[0].click();", rb); sleep_jitter(); clicked = True; break
                    except Exception:
                        pass
        if not clicked:
            cells = find_elems(row_el, By.XPATH, './/*[@role="gridcell"]')
            for gc in cells:
                lab = norm_text(gc.get_attribute("aria-label") or gc.text)
                if picked_col in lab:
                    try:
                        scroll_into_view(driver, gc); gc.click(); sleep_jitter(); clicked = True; break
                    except Exception:
                        try:
                            driver.execute_script("arguments[0].click();", gc); sleep_jitter(); clicked = True; break
                        except Exception:
                            pass

        ok_any = ok_any or clicked

    return ok_any


## Fill all on a page

In [35]:

def fill_all_on_page(driver, answers: List[Dict[str, Any]]) -> List[int]:
    containers = get_question_containers(driver)
    done = []
    for i, spec in enumerate(answers):
        m = spec.get("match", {}) or {}
        key = m.get("title_contains") or ""
        if not key:
            continue
        cont = match_question_container(containers, key)
        if not cont:
            continue  # ยังไม่ใช่หน้านี้

        qtype = spec.get("type")
        ok = False

        if qtype in ("text", "text_rand"):
            quota = spec.get("quota", {})
            value = pick_weighted(quota) if quota else spec.get("value", "ตัวอย่างคำตอบ")
            ok = fill_text_or_paragraph(cont, value)

        elif qtype in ("paragraph", "paragraph_rand"):
            quota = spec.get("quota", {})
            value = pick_weighted(quota) if quota else spec.get("value", "ตัวอย่างย่อหน้าคำตอบ")
            ok = fill_text_or_paragraph(cont, value)

        elif qtype == "choice":
            quota = spec.get("quota", {})
            label = pick_weighted(quota) if quota else None
            if not label and spec.get("choices"):
                label = random.choice(spec["choices"])
            if label:
                ok = click_choice(cont, label, role="radio")

        elif qtype == "checkbox":
            presence = spec.get("presence_quota", {})
            labels = list(presence.keys())
            min_k = int(spec.get("min_k", 1))
            max_k = int(spec.get("max_k", max(1, len(labels))))
            k = random.randint(min_k, min(max_k, len(labels))) if labels else 0
            chosen = []
            pool = labels.copy()
            for _ in range(k):
                if not pool: break
                pick = pick_weighted({lab: presence.get(lab,1) for lab in pool})
                if pick is None: break
                chosen.append(pick); pool.remove(pick)
            ok = any(click_choice(cont, lab, role="checkbox") for lab in chosen)

        elif qtype == "dropdown":
            quota = spec.get("quota", {})
            label = pick_weighted(quota) if quota else None
            if not label and spec.get("choices"):
                label = random.choice(spec["choices"])
            if label:
                ok = open_and_pick_dropdown(cont, label)

        elif qtype == "scale":
            vmin = int(spec.get("min", 1)); vmax = int(spec.get("max", 5))
            val = str(int(spec.get("value", (vmin + vmax)//2)))
            ok = click_choice(cont, val, role="radio")

        elif qtype in ("date","time"):
            value = spec.get("value", "")
            ok = fill_text_or_paragraph(cont, value) if value else True

        elif qtype == "multiple_choice_grid":
            ok = fill_multiple_choice_grid(cont, spec)

        if ok:
            done.append(i)
    return done


## Navigation (Next / Submit)

In [36]:

NEXT_LABELS = ["ถัดไป", "ต่อไป", "Next", "Continue", "Proceed"]
SUBMIT_LABELS = ["ส่ง", "ส่งแบบฟอร์ม", "Submit", "บันทึก", "Save"]
BACK_LABELS = ["ย้อนกลับ", "Previous", "Back"]

def try_click_button_by_labels(driver, labels: List[str]) -> bool:
    labs = [norm_text(x) for x in labels]
    # ตรงตัว: ปุ่มมีข้อความ
    for lab in labs:
        for xp in [
            f'//button[.//*[normalize-space(text())="{lab}"] or normalize-space(text())="{lab}"]',
            f'//*[@role="button"][.//*[normalize-space(text())="{lab}"] or normalize-space(text())="{lab}"]'
        ]:
            btns = find_elems(driver, By.XPATH, xp)
            if btns:
                btn = btns[0]
                try:
                    scroll_into_view(driver, btn); btn.click(); sleep_jitter(); return True
                except Exception:
                    try:
                        driver.execute_script("arguments[0].click();", btn); sleep_jitter(); return True
                    except Exception:
                        pass
    # aria-label
    for lab in labs:
        xp = f'//*[@role="button" and @aria-label="{lab}"]'
        btns = find_elems(driver, By.XPATH, xp)
        if btns:
            btn = btns[0]
            try:
                scroll_into_view(driver, btn); btn.click(); sleep_jitter(); return True
            except Exception:
                try:
                    driver.execute_script("arguments[0].click();", btn); sleep_jitter(); return True
                except Exception:
                    pass
    return False

def is_thankyou_page(driver) -> bool:
    body = find_elem(driver, By.TAG_NAME, "body")
    if not body: return False
    txt = norm_text(body.text)
    markers = ["คำตอบของคุณถูกบันทึกแล้ว", "Your response has been recorded", "เราได้รับคำตอบแล้ว"]
    return any(m in txt for m in markers)

def click_next_or_submit(driver) -> Optional[str]:
    # ลองกด Submit ก่อน
    if try_click_button_by_labels(driver, SUBMIT_LABELS):
        return "submit"
    # ลองกด Next
    if try_click_button_by_labels(driver, NEXT_LABELS):
        return "next"
    # กันคลิก Back โดยพลาด
    # สุดท้าย: กดปุ่มที่อยู่ท้ายสุด (แต่ถ้าเป็น Back ให้ข้าม)
    cand = find_elems(driver, By.XPATH, '(//*[@role="button" or self::button])[position() > last()-3]')
    for btn in cand[::-1]:
        label = norm_text(btn.text or btn.get_attribute("aria-label") or "")
        if any(b == label for b in BACK_LABELS):
            continue
        try:
            scroll_into_view(driver, btn); btn.click(); sleep_jitter(); return "clicked"
        except Exception:
            try:
                driver.execute_script("arguments[0].click();", btn); sleep_jitter(); return "clicked"
            except Exception:
                pass
    return None


## Runner

In [37]:

def submit_once(driver, cfg: Dict[str, Any]) -> bool:
    answers = cfg.get("answers", [])
    for step in range(1, 20):  # safety: ไม่เกิน 20 หน้า
        try:
            wait(driver).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        except TimeoutException:
            pass

        done_idx = fill_all_on_page(driver, answers)
        dprint(f"Page {step}: filled {done_idx}")

        act = click_next_or_submit(driver)
        if not act:
            break  # ไม่มีปุ่มจะไปต่อแล้ว (อาจเป็นหน้าสุดท้าย)
        sleep_jitter()

        if is_thankyou_page(driver):
            return True

    return is_thankyou_page(driver)

def run_submitter(n=N_SUBMISSIONS, url=FORM_URL, headless=HEADLESS):
    recs = []
    for i in range(1, n+1):
        drv = make_driver(headless=headless)
        drv.get(url)
        ok = submit_once(drv, cfg)
        recs.append({"run": i, "status": "OK" if ok else "FAIL"})
        try:
            drv.quit()
        except Exception:
            pass
        time.sleep(random.uniform(*DELAY_RANGE))
    return pd.DataFrame(recs)

print("พร้อมรันแล้ว (ตั้งค่า N_SUBMISSIONS ในส่วน Config)")


พร้อมรันแล้ว (ตั้งค่า N_SUBMISSIONS ในส่วน Config)


## Run once (manual trigger)

In [38]:

# ปลดคอมเมนต์บรรทัดล่างเพื่อรันทันที
# df = run_submitter()
# df
print("ตัวอย่าง: ปิดการรันอัตโนมัติไว้ก่อน ให้ตรวจ YAML/ฟอร์มให้ตรงก่อนแล้วค่อยรัน")


ตัวอย่าง: ปิดการรันอัตโนมัติไว้ก่อน ให้ตรวจ YAML/ฟอร์มให้ตรงก่อนแล้วค่อยรัน


In [39]:
df = run_submitter()
df


[debug] Page 1: filled []
[debug] Page 2: filled [0, 1, 2, 3, 4]


Unnamed: 0,run,status
0,1,FAIL
