# Webpage Schema Builder — Gemini 2.0 Flash Lite — JS rendering fallback
Adds a JavaScript-rendering fetcher with Playwright. Flow:
1) Try fast static fetch + extraction.
2) Optionally render the page with Chromium, wait for content, auto-expand common accordions, then re-extract.
3) Merge any FAQs found.

Outputs:
- `schemas_bundle.docx`
- `schemas_bundle.txt`
- Optional `scrape_previews.zip`

Toggles at the top of the code:
- `USE_JS_RENDER = True` to enable Playwright fallback
- `DEBUG_SCRAPES` prints previews
- `SAVE_SCRAPES` writes raw scraped text files

In [None]:
# Install packages
!pip -q install google-generativeai trafilatura beautifulsoup4 pandas tldextract tqdm python-docx playwright
# Install Chromium for Playwright (one-time in a fresh Colab)
!python -m playwright install --with-deps chromium > /dev/null 2>&1

In [None]:
# Imports and settings
import os, io, re, json, requests, zipfile, asyncio
import pandas as pd
from bs4 import BeautifulSoup, Tag
import trafilatura
import tldextract
from tqdm.auto import tqdm
from docx import Document
from docx.shared import Pt
from docx.oxml.ns import qn
from docx.oxml import OxmlElement

from playwright.async_api import async_playwright

import google.generativeai as genai
from google.colab import files

pd.set_option('display.max_colwidth', 200)

# Toggles
USE_JS_RENDER = True        # enable Playwright fallback
DEBUG_SCRAPES = True
SAVE_SCRAPES = True

# Caps
CONTENT_CAP = 50000         # characters kept from extracted content

In [None]:
# API key
print("Paste your Google AI Studio API key. Create one at https://aistudio.google.com/app/api-keys")
API_KEY = input().strip()
if not API_KEY:
    raise ValueError("API key missing")
genai.configure(api_key=API_KEY)

Paste your Google AI Studio API key. Create one at https://aistudio.google.com/app/api-keys
AIzaSyA0sYVwNjzwLPcDyyW-S2B1Nd9Bolfh9bY


In [None]:
# Upload CSV with columns: url, primary_keywords
uploaded = files.upload()
if not uploaded:
    raise ValueError("No file uploaded")
csv_name = list(uploaded.keys())[0]
df = pd.read_csv(io.BytesIO(uploaded[csv_name]))

df['url'] = df['url'].astype(str).str.strip()
df['primary_keywords'] = df['primary_keywords'].fillna('').astype(str)

def parse_kw_list(s: str):
    parts = [p.strip() for p in re.split(r'[;,]', s) if p.strip()]
    seen = set()
    out = []
    for p in parts:
        low = p.lower()
        if low not in seen:
            seen.add(low)
            out.append(p)
    return out

df['seed_keywords'] = df['primary_keywords'].apply(parse_kw_list)
print("Rows loaded:", len(df))
df.head()

Saving New Microsoft Excel Worksheet.csv to New Microsoft Excel Worksheet (2).csv
Rows loaded: 10


Unnamed: 0,url,primary_keywords,seed_keywords
0,https://www.wingtactical.com/ar-15/,ar-15 parts,[ar-15 parts]
1,https://www.wingtactical.com/ar-15-magazines-accessories/,ar-15 magazine,[ar-15 magazine]
2,https://www.wingtactical.com/lower-receiver-parts/,ar-15 lower receiver kit,[ar-15 lower receiver kit]
3,https://www.wingtactical.com/ar-15-lower-receiver-parts-kit/,ar-15 lower parts kit,[ar-15 lower parts kit]
4,https://www.wingtactical.com/handguards-rails/,ar-15 handguards,[ar-15 handguards]


In [None]:
# Static HTTP fetch and extraction
session = requests.Session()
session.headers.update({
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
})

def fetch_html(url: str, timeout=25) -> str:
    r = session.get(url, timeout=timeout, allow_redirects=True)
    r.raise_for_status()
    return r.text

In [None]:
# Playwright JS-rendered HTML
async def _render_html_async(url: str, timeout_ms: int = 20000) -> str:
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36")
        page = await context.new_page()
        # Block heavy assets to speed up
        async def route_handler(route, request):
            if request.resource_type in ["image", "media", "font"]:
                return await route.abort()
            return await route.continue_()
        await context.route("**/*", route_handler)
        await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
        # Give the page time to load dynamic sections
        try:
            await page.wait_for_load_state("networkidle", timeout=timeout_ms)
        except:
            pass
        # Try to auto-expand accordions and FAQs
        selectors = [
            "details summary",
            "[aria-controls]",
            "[aria-expanded='false']",
            ".accordion button,.accordion .toggle,.faq button,.faq .question,.collapsible .trigger",
            "button[role='tab'],[role='button']"
        ]
        for sel in selectors:
            try:
                elements = await page.query_selector_all(sel)
                for el in elements[:50]:
                    try:
                        await el.click(timeout=500)
                    except:
                        continue
            except:
                continue
        # Wait a moment for content to expand
        await page.wait_for_timeout(500)
        html = await page.content()
        await context.close()
        await browser.close()
        return html

def render_html_js(url: str, timeout_ms: int = 20000) -> str:
    return asyncio.run(_render_html_async(url, timeout_ms))

In [None]:
# FAQ parsing helpers
def parse_faqs_from_jsonld(soup: BeautifulSoup):
    faqs = []
    for s in soup.find_all("script", attrs={"type": "application/ld+json"}):
        try:
            data = json.loads(s.get_text(strip=True))
        except Exception:
            continue
        items = data if isinstance(data, list) else [data]
        for it in items:
            if not isinstance(it, dict):
                continue
            candidates = []
            if "@graph" in it and isinstance(it["@graph"], list):
                candidates.extend([x for x in it["@graph"] if isinstance(x, dict)])
            else:
                candidates.append(it)
            for c in candidates:
                typ = c.get("@type") or c.get("type")
                types = [t.lower() for t in (typ if isinstance(typ, list) else [typ])]
                if "faqpage" in types:
                    main = c.get("mainEntity") or []
                    if isinstance(main, dict):
                        main = [main]
                    for q in main:
                        if not isinstance(q, dict):
                            continue
                        qname = q.get("name") or q.get("headline") or ""
                        accepted = q.get("acceptedAnswer") or {}
                        if isinstance(accepted, list):
                            accepted = accepted[0] if accepted else {}
                        answer = accepted.get("text") or ""
                        qname = BeautifulSoup(str(qname), "html.parser").get_text(" ", strip=True)
                        answer = BeautifulSoup(str(answer), "html.parser").get_text(" ", strip=True)
                        if qname and answer:
                            faqs.append((qname, answer))
    return faqs

def parse_faqs_from_dom(soup: BeautifulSoup):
    faqs = []
    for det in soup.find_all("details"):
        summary = det.find("summary")
        q = summary.get_text(" ", strip=True) if summary else ""
        a = det.get_text(" ", strip=True)
        if summary:
            a = a.replace(q, "", 1).strip()
        if q and a:
            faqs.append((q, a))
    candidate_blocks = soup.find_all(True, class_=lambda c: c and re.search(r"(faq|accordion|collapsible|toggle)", " ".join(c if isinstance(c, list) else [c]), re.I))
    for blk in candidate_blocks:
        questions = blk.find_all(["h2","h3","h4","button","summary"])
        for qn in questions:
            q_text = qn.get_text(" ", strip=True)
            ans = []
            for sib in qn.next_siblings:
                if isinstance(sib, Tag):
                    if sib.name in ["h2","h3","h4","button","summary"]:
                        break
                    ans.append(sib.get_text(" ", strip=True))
            a_text = " ".join([t for t in ans if t]).strip()
            if q_text and a_text and len(a_text) > 5:
                faqs.append((q_text, a_text))
    seen = set()
    uniq = []
    for q,a in faqs:
        key = (q.lower(), a.lower())
        if key not in seen:
            seen.add(key)
            uniq.append((q,a))
    return uniq

In [None]:
# Clean HTML by removing navigation and footer elements
def clean_html(html: str) -> str:
    soup = BeautifulSoup(html, "html.parser")

    # Remove common navigation and footer elements
    selectors_to_remove = [
        'nav', 'header', 'footer',
        '[role="navigation"]', '[role="banner"]', '[role="contentinfo"]',
        '.nav', '.navbar', '.navigation', '.menu', '.header', '.footer',
        '#nav', '#navbar', '#navigation', '#menu', '#header', '#footer',
        '.site-header', '.site-footer', '.page-header', '.page-footer',
        '.main-navigation', '.primary-navigation', '.secondary-navigation',
        '.sidebar', '.widget', '.cookie-banner', '.cookie-notice',
        '[class*="cookie"]', '[id*="cookie"]',
        '.breadcrumb', '.breadcrumbs',
        '[aria-label*="navigation"]', '[aria-label*="menu"]',
    ]

    for selector in selectors_to_remove:
        for element in soup.select(selector):
            element.decompose()

    return str(soup)

# Main extraction with cleaned HTML
def extract_main_content(html: str):
    # Clean the HTML first
    cleaned_html = clean_html(html)

    extracted = trafilatura.extract(cleaned_html, include_comments=False, include_tables=False, target_language=None)
    soup = BeautifulSoup(cleaned_html, "html.parser")

    title = (soup.title.string or "").strip() if soup.title and soup.title.string else ""
    meta_desc = ""
    md = soup.find("meta", attrs={"name": "description"})
    if md and md.get("content"):
        meta_desc = md["content"].strip()
    h1s = [h.get_text(strip=True) for h in soup.find_all("h1")]
    base_text = extracted if extracted and len(extracted) > 200 else soup.get_text(" ", strip=True)
    base_text = re.sub(r"\s+", " ", base_text).strip()

    faqs = parse_faqs_from_jsonld(soup)
    if not faqs:
        faqs = parse_faqs_from_dom(soup)

    if faqs:
        faq_block = "\n\nFAQs:\n" + "\n".join([f"Q: {q}\nA: {a}" for q,a in faqs])
        combined = f"{base_text}\n{faq_block}"
    else:
        combined = base_text

    return {
        "title": title[:300],
        "meta_description": meta_desc[:500],
        "h1": h1s[:5],
        "content": combined[:CONTENT_CAP],
        "faqs": faqs
    }

In [None]:
# Gemini model and prompts
MODEL = "gemini-2.5-flash-lite"

SYSTEM_INSTRUCTIONS = (
    "You are an SEO and structured data assistant. "
    "Given a page's extracted content and seed keywords, write a concise WebPage JSON-LD that includes: "
    "description, keywords, and knowsAbout. "
    "Return JSON with fields: description, keywords (array), knowsAbout (array of objects with 'name' and 'description'), and schema_jsonld (string). "
    "The schema_jsonld must be valid JSON-LD for schema.org WebPage. "
    "The description must reflect the page. "
    "Mix the seed keywords with new relevant ones. Avoid near-duplicates and keep them page-specific."
)

def build_prompt(url, page, seed_keywords, domain_hint):
    return f"""URL: {url}
Domain: {domain_hint}

Title: {page.get('title','')}
Meta Description: {page.get('meta_description','')}
H1: {page.get('h1', [])}

Extracted content (truncated):
{page.get('content','')}

Seed keywords:
{seed_keywords}

Task:
1) Write a one paragraph description of the page that fits the content.
2) Propose a clean list of 5 to 20 page-specific keywords. Include some of the seed keywords if relevant.
3) Propose a focused 'knowsAbout' list of 3 to 12 entities or topics. Each entity should have:
   - "name": the name of the entity
   - "description": a short 1-sentence description of the entity
4) Identify the publisher/organization from the domain and content. Create a publisher object with:
   - "name": the organization/brand name
   - "url": the organization's homepage URL
   - "knowsAbout": 3-8 topics/areas the organization specializes in (simple strings, no descriptions)
5) Produce a JSON-LD for schema.org 'WebPage' with:
   - '@context'
   - '@type': 'WebPage'
   - 'url'
   - 'name' if clear
   - 'description' from step 1
   - 'keywords' as a comma separated string from step 2
   - 'mainEntityOfPage' set to the URL
   - 'publisher' object in this exact format:
     {{
       "@type": "Organization",
       "name": "Organization Name",
       "url": "https://organization-url.com",
       "knowsAbout": ["topic1", "topic2", "topic3"]
     }}
   - 'about' array with objects in this exact format:
     {{
       "@type": "Thing",
       "name": "name of entity",
       "description": "short description of entity"
     }}

Return JSON with keys: description, keywords (array of strings), knowsAbout (array of objects with 'name' and 'description'), publisher (object with 'name', 'url', and 'knowsAbout' array of strings), schema_jsonld (string).
Only return JSON. No commentary."""

def call_gemini(prompt: str):
    model = genai.GenerativeModel(MODEL, system_instruction=SYSTEM_INSTRUCTIONS)
    resp = model.generate_content(prompt)
    text = resp.text or ""
    m = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text)
    if m:
        text = m.group(1)
    try:
        data = json.loads(text)
    except Exception:
        m2 = re.search(r"(\{[\s\S]*\})", text)
        if not m2:
            raise ValueError("Model did not return JSON")
        data = json.loads(m2.group(1))
    return {
        "description": str(data.get("description","")).strip(),
        "keywords": [str(k).strip() for k in data.get("keywords", []) if str(k).strip()],
        "knowsAbout": data.get("knowsAbout", []),
        "schema_jsonld": str(data.get("schema_jsonld","")).strip(),
    }

In [None]:
# De-cannibalization
def dedupe_ordered(items):
    seen = set()
    out = []
    for it in items:
        low = it.lower()
        if low not in seen:
            seen.add(low)
            out.append(it)
    return out

def decannibalize(current, already_used, protected):
    cleaned = []
    for kw in current:
        low = kw.lower()
        if low in protected:
            cleaned.append(kw)
        elif low not in already_used:
            cleaned.append(kw)
    return dedupe_ordered(cleaned)

def decannibalize_entities(current, already_used, protected):
    cleaned = []
    for entity in current:
        if isinstance(entity, dict):
            name = entity.get('name', '')
            low = name.lower()
            if low in protected:
                cleaned.append(entity)
            elif low not in already_used:
                cleaned.append(entity)
    return cleaned

In [None]:
# DOCX helpers
def add_paragraph_clean(document, text, bold=False, mono=False):
    p = document.add_paragraph()
    p.paragraph_format.space_before = Pt(0)
    p.paragraph_format.space_after = Pt(0)
    run = p.add_run(text)
    run.bold = bold
    if mono:
        run.font.name = 'Consolas'
        r = run._element
        rPr = r.get_or_add_rPr()
        rFonts = OxmlElement('w:rFonts')
        rFonts.set(qn('w:ascii'), 'Consolas')
        rFonts.set(qn('w:hAnsi'), 'Consolas')
        rPr.append(rFonts)
    return p

def add_schema_block(document, schema_text):
    p = document.add_paragraph()
    p.paragraph_format.space_before = Pt(0)
    p.paragraph_format.space_after = Pt(0)
    run = p.add_run()
    run.font.name = 'Consolas'
    r = run._element
    rPr = r.get_or_add_rPr()
    rFonts = OxmlElement('w:rFonts')
    rFonts.set(qn('w:ascii'), 'Consolas')
    rFonts.set(qn('w:hAnsi'), 'Consolas')
    rPr.append(rFonts)
    for i, line in enumerate(schema_text.splitlines()):
        if i > 0:
            run.add_break()
        run.add_text(line)

In [None]:
# Process and export
txt_path = "schemas_bundle.txt"
docx_path = "schemas_bundle.docx"

used_keywords = set()
used_knows = set()

if SAVE_SCRAPES:
    os.makedirs("scrape_previews", exist_ok=True)

document = Document()
style = document.styles['Normal']
style.font.name = 'Calibri'
style.font.size = Pt(11)

with open(txt_path, "w", encoding="utf-8") as txtf:
    first_written = False
    for row in tqdm(df.itertuples(index=False), total=len(df)):
        url = row.url
        seeds = row.seed_keywords

        try:
            html = fetch_html(url)
            page = extract_main_content(html)

            # If JS rendering is enabled, or if no FAQs found statically, try JS-rendered HTML
            if USE_JS_RENDER and (not page.get('faqs') or len(page.get('content','')) < 1500):
                try:
                    html_js = render_html_js(url)
                    page_js = extract_main_content(html_js)
                    # Prefer JS-render if it has more content or more FAQs
                    if len(page_js.get('content','')) > len(page.get('content','')) or len(page_js.get('faqs',[])) >= len(page.get('faqs',[])):
                        page = page_js
                except Exception as e_js:
                    print(f"JS render failed for {url}: {e_js}")

            ext = tldextract.extract(url)
            domain_hint = getattr(ext, "top_domain_under_public_suffix", "") or getattr(ext, "registered_domain", "")

            if DEBUG_SCRAPES:
                print("URL:", url)
                print("TITLE:", page.get("title",""))
                print("META:", page.get("meta_description",""))
                print("H1:", page.get("h1", []))
                print(f"FAQS found: {len(page.get('faqs', []))}")
                if page.get('faqs'):
                    for i, (q,a) in enumerate(page['faqs'][:2]):
                        print(f"Q{i+1}:", q)
                        print(f"A{i+1}:", a[:250], "...")
                print("CONTENT PREVIEW:", page.get("content","")[:1500], "...\n")

            if SAVE_SCRAPES:
                safe = re.sub(r'[^a-zA-Z0-9]+', '_', url)[:100].strip('_') or "page"
                with open(f"scrape_previews/{safe}.txt", "w", encoding="utf-8") as f:
                    f.write(f"URL: {url}\n\n")
                    f.write(f"TITLE: {page.get('title','')}\n")
                    f.write(f"META: {page.get('meta_description','')}\n")
                    f.write(f"H1: {page.get('h1', [])}\n")
                    f.write(f"FAQS: {len(page.get('faqs', []))}\n\n")
                    if page.get('faqs'):
                        for q,a in page['faqs']:
                            f.write(f"Q: {q}\nA: {a}\n\n")
                    f.write(page.get("content",""))

            # Build prompt and call Gemini
            prompt = build_prompt(url, page, seeds, domain_hint)
            result = call_gemini(prompt)

            kw = decannibalize(result.get('keywords', []), used_keywords, set([s.lower() for s in seeds]))
            ka = decannibalize_entities(result.get('knowsAbout', []), used_knows, set())
            used_keywords.update([k.lower() for k in kw])
            used_knows.update([entity.get('name', '').lower() for entity in ka if isinstance(entity, dict)])

            schema_text = result.get('schema_jsonld', '').strip()
            try:
                obj = json.loads(schema_text)
                obj['url'] = url
                obj['mainEntityOfPage'] = url
                obj['description'] = result.get('description', '')
                obj['keywords'] = ", ".join(kw)
                if ka:
                    obj['about'] = [
                        {
                            "@type": "Thing",
                            "name": entity.get('name', ''),
                            "description": entity.get('description', '')
                        }
                        for entity in ka if isinstance(entity, dict) and entity.get('name')
                    ]
                schema_text = json.dumps(obj, ensure_ascii=False, indent=2)
            except Exception:
                obj = {
                    "@context": "https://schema.org",
                    "@type": "WebPage",
                    "url": url,
                    "mainEntityOfPage": url,
                    "name": page.get('title') or None,
                    "description": result.get('description', ''),
                    "keywords": ", ".join(kw),
                    "about": [
                        {
                            "@type": "Thing",
                            "name": entity.get('name', ''),
                            "description": entity.get('description', '')
                        }
                        for entity in ka if isinstance(entity, dict) and entity.get('name')
                    ] if ka else None,
                }
                obj = {k: v for k, v in obj.items() if v}
                schema_text = json.dumps(obj, ensure_ascii=False, indent=2)

            # Wrap schema in script tags
            wrapped_schema = '<script type="application/ld+json">\n' + schema_text.rstrip() + '\n</script>'

            # TXT
            if first_written:
                txtf.write("\n")
            txtf.write(url.strip() + "\n")
            txtf.write(wrapped_schema)
            first_written = True

            # DOCX
            add_paragraph_clean(document, url.strip(), bold=True)
            add_schema_block(document, wrapped_schema)
            spacer = document.add_paragraph()
            spacer.paragraph_format.space_before = Pt(0)
            spacer.paragraph_format.space_after = Pt(6)

        except Exception as e:
            err = f"ERROR for {url}: {e}"
            print(err)
            if first_written:
                txtf.write("\n")
            txtf.write(url.strip() + "\n" + err)
            first_written = True
            add_paragraph_clean(document, url.strip(), bold=True)
            add_paragraph_clean(document, err, mono=False)
            spacer = document.add_paragraph()
            spacer.paragraph_format.space_before = Pt(0)
            spacer.paragraph_format.space_after = Pt(6)

document.save("schemas_bundle.docx")
print("Created:", "schemas_bundle.txt", "and", "schemas_bundle.docx")

if SAVE_SCRAPES and os.path.isdir("scrape_previews"):
    with zipfile.ZipFile("scrape_previews.zip", "w", zipfile.ZIP_DEFLATED) as z:
        for fn in os.listdir("scrape_previews"):
            z.write(os.path.join("scrape_previews", fn), arcname=fn)
    print("Created: scrape_previews.zip")

files.download("schemas_bundle.txt")
files.download("schemas_bundle.docx")
if SAVE_SCRAPES and os.path.exists("scrape_previews.zip"):
    files.download("scrape_previews.zip")

  0%|          | 0/10 [00:00<?, ?it/s]

URL: https://www.wingtactical.com/ar-15/
TITLE: AR-15 Parts for Rifle Customization | Wing Tactical
META: Improve performance and personalize your shooting experience with AR-15 parts from Wing Tactical, your number one source for quality AR parts and accessories.
H1: ['AR-15 Parts']
FAQS found: 4
Q1: V Seven Titanium AR-15 Buffer Retainer Spring Kit
A1: MSRP: $13.00 Was: Now: $12.61 ...
Q2: Strike Industries AR-15 Charging Handle with Extended Latch
A2: MSRP: $46.95 Was: Now: $39.95 - $42.95 ...
CONTENT PREVIEW: Whether you’re building a custom AR-15 or replacing an existing component to optimize performance, Wing Tactical has the quality AR-15 parts that will take your rifle to the next level. Our carefully-curated catalog of AR parts and accessories caters to everyone from law enforcement officers to competition shooters and civilian rifle enthusiasts looking for quality above all else. Explore our collection of AR-15 accessories and start customizing your rifle. AR-15 Parts and Acc

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>