# Data Curation

This python notebook is used to extract and process a dataset from the wikipedia, used to train YAPPER.

The flow is as follows:

* Extract English-written Wiki pages in science, geography, ecnonomics, and history
* Preproccess extracted text to remove artifacts and whitespace
* Semantically chunk them to ~ 500 words long chunks of text. Drop all chunks less than 400 words
* Generate metadata for each chunk
* Ensure there are no near duplicates through embedding similarity
* Split to train/eval/test
* Generate dataset summary

## Imports & Initializations

In [1]:
import requests
import mwparserfromhell
import re
import time
import html
import json
import uuid
import numpy as np
import pandas as pd
import duckdb as ddb
from matplotlib import pyplot as plt
from dotenv import dotenv_values
from tqdm import tqdm
import hashlib
from collections import deque
import asyncio
import inspect
import os
from tqdm.asyncio import tqdm_asyncio

import semchunk
from google import genai

In [2]:
# Get Env vars
config = dotenv_values("../.env")
config["WIKI_API"] = "https://en.wikipedia.org/w/api.php"
config["WIKI_BATCH_SIZE"] = 50
config["WIKI_HEADERS"] = {"User-Agent": "WikiDatasetBot/0.1 (contact: your_email@example.com)"}

In [3]:
# Duckdb connection
# con = ddb.connect("/hpc/home/bfa6/work/data/yapper/database.duckdb")

In [4]:
# Gemini client
client = genai.Client(api_key=config["GEMINI_API_KEY"])

In [5]:
# Semchunk chunker
chunker = semchunk.chunkerify(lambda text: len(text.split()), 500)

## Functions

### Wiki related functions

In [6]:
# 1) utility: ensure category prefix
def ensure_cat_prefix(cat_name: str) -> str:
    return cat_name if cat_name.startswith("Category:") else "Category:" + cat_name

# 2) generator: category members (pages / subcats / files) - top-level generator function
def category_members_generator(cat_title: str, cmtype: str = "page", cmnamespace: int = 0, limit: int = 500, delay: float = 0.0):
    """
    Yield category members for a category (page dicts from API).
    cmtype: "page" | "subcat" | "file"
    cmnamespace: namespace filter (0 for articles, 14 for categories)
    """
    params = {
        "action": "query",
        "format": "json",
        "list": "categorymembers",
        "cmtitle": cat_title,
        "cmtype": cmtype,
        "cmnamespace": cmnamespace,
        "cmlimit": str(limit),
        "formatversion": 2
    }
    cont = {}
    while True:
        p = params.copy()
        p.update(cont)
        r = requests.get(config["WIKI_API"], params=p, headers=config["WIKI_HEADERS"], timeout=30)
        r.raise_for_status()
        j = r.json()
        members = j.get("query", {}).get("categorymembers", [])
        for m in members:
            yield m
        if "continue" in j:
            cont = j["continue"]
        else:
            break
        if delay:
            time.sleep(delay)

# 3) gather titles from one seed category (BFS over subcats optional)
def gather_titles_from_category(seed_category: str, recurse: bool = False, max_depth: int = 1, max_pages_per_category: int = 1000, delay: float = 0.0):
    """
    Return a set of page titles (namespace 0) under seed_category.
    """
    seed_cat = ensure_cat_prefix(seed_category)
    queue = deque()
    queue.append((seed_cat, 0))
    seen_cats = {seed_cat}
    titles = set()

    while queue:
        cat, depth = queue.popleft()
        count = 0
        for m in category_members_generator(cat, cmtype="page", cmnamespace=0, limit=500, delay=delay):
            title = m.get("title")
            if title:
                titles.add(title)
            count += 1
            if count >= max_pages_per_category:
                break

        if not recurse or depth >= max_depth:
            continue

        for sub in category_members_generator(cat, cmtype="subcat", cmnamespace=14, limit=500, delay=delay):
            sub_title = sub.get("title")
            if sub_title and sub_title not in seen_cats:
                seen_cats.add(sub_title)
                queue.append((sub_title, depth + 1))

    return titles

# 4) fetch metadata for a list of pageids (batched)
def fetch_pages_metadata_by_ids(pageids, delay: float = 0.0):
    """
    Return list of metadata dicts for the given pageids.
    Each dict contains pageid, title, ns, fullurl, length, lastrevid, categories (list).
    """
    results = []
    for i in tqdm(range(0, len(pageids), config["WIKI_BATCH_SIZE"]), desc="Fetching metadata", leave=False):
        chunk = pageids[i:i+config["WIKI_BATCH_SIZE"]]
        params = {
            "action": "query",
            "format": "json",
            "pageids": "|".join(str(x) for x in chunk),
            "prop": "info|categories",
            "inprop": "url",
            "cllimit": "max",
            "formatversion": 2
        }
        r = requests.get(config["WIKI_API"], params=params, headers=config["WIKI_HEADERS"], timeout=30)
        r.raise_for_status()
        j = r.json()
        pages = j.get("query", {}).get("pages", [])
        for p in pages:
            rec = {
                "pageid": p.get("pageid"),
                "title": p.get("title"),
                "ns": p.get("ns"),
                "fullurl": p.get("fullurl"),
                "length": p.get("length"),
                "lastrevid": p.get("lastrevid"),
                "categories": [c.get("title") for c in (p.get("categories") or [])]
            }
            results.append(rec)
        if delay:
            time.sleep(delay)
    return results

# 5) main orchestrator (no nested functions)
def get_pages_from_categories(seed_categories, recurse: bool = False, max_depth: int = 1, max_pages_per_seed: int = 1000, delay_between_api_calls: float = 0.0):
    """
    Given a list of seed category names, return a list of enriched page metadata dicts.
    - seed_categories: list of strings (with or w/o "Category:" prefix)
    - returns list of dicts with page metadata and seed_category field.
    """
    # map pageid -> seed_category (first seed encountered)
    pageid_to_seed = {}
    for seed in tqdm(seed_categories, desc="Seed categories"):
        seed_pref = ensure_cat_prefix(seed)
        titles = gather_titles_from_category(seed_pref, recurse=recurse, max_depth=max_depth, max_pages_per_category=max_pages_per_seed, delay=delay_between_api_calls)
        # gather pageids for these titles using a query (batch)
        # We need to convert titles -> pageids in batches
        titles_list = list(titles)
        for i in tqdm(range(0, len(titles_list), config["WIKI_BATCH_SIZE"]), desc=f"Converting titles for {seed_pref}", leave=False):
            chunk = titles_list[i:i+config["WIKI_BATCH_SIZE"]]
            params = {
                "action": "query",
                "format": "json",
                "titles": "|".join(chunk),
                "prop": "info",
                "inprop": "url",
                "formatversion": 2
            }
            r = requests.get(config["WIKI_API"], params=params, headers=config["WIKI_HEADERS"], timeout=30)
            r.raise_for_status()
            j = r.json()
            pages = j.get("query", {}).get("pages", [])
            for p in pages:
                pid = p.get("pageid")
                if pid and pid not in pageid_to_seed:
                    pageid_to_seed[pid] = {"seed_category": seed_pref}
            if delay_between_api_calls:
                time.sleep(delay_between_api_calls)

    if not pageid_to_seed:
        return []

    # fetch metadata for collected pageids
    pageids = list(pageid_to_seed.keys())
    meta_list = fetch_pages_metadata_by_ids(pageids, delay=delay_between_api_calls)

    # attach seed_category
    for m in meta_list:
        pid = m.get("pageid")
        m["seed_category"] = pageid_to_seed.get(pid, {}).get("seed_category")

    # sort deterministically
    meta_list.sort(key=lambda x: (x.get("seed_category") or "", x.get("title") or ""))
    return meta_list


In [7]:
def _clean_wikitext(wikitext: str) -> str:
    """
    Convert wikitext to plain text paragraphs only.
    """
    
    # Remove comments
    text = re.sub(r'<!--.*?-->', '', wikitext, flags=re.DOTALL)
    
    # Remove all references (including nested ones)
    while '<ref' in text:
        text = re.sub(r'<ref[^>]*>.*?</ref>', '', text, flags=re.DOTALL)
        text = re.sub(r'<ref[^>]*\/>', '', text)
    
    # Remove infoboxes and templates aggressively
    def remove_nested_braces(s):
        while '{{' in s:
            start = s.find('{{')
            if start == -1:
                break
            count = 0
            i = start
            while i < len(s):
                if s[i:i+2] == '{{':
                    count += 1
                    i += 2
                elif s[i:i+2] == '}}':
                    count -= 1
                    i += 2
                    if count == 0:
                        s = s[:start] + s[i:]
                        break
                else:
                    i += 1
            else:
                break
        return s
    
    text = remove_nested_braces(text)
    
    # Remove tables
    text = re.sub(r'\{\|.*?\|\}', '', text, flags=re.DOTALL)
    
    # Remove files/images with nested brackets
    def remove_files(s):
        while True:
            match = re.search(r'\[\[(?:File|Image):', s, re.IGNORECASE)
            if not match:
                break
            start = match.start()
            count = 0
            i = start
            while i < len(s):
                if s[i:i+2] == '[[':
                    count += 1
                    i += 2
                elif s[i:i+2] == ']]':
                    count -= 1
                    i += 2
                    if count == 0:
                        s = s[:start] + s[i:]
                        break
                else:
                    i += 1
            else:
                break
        return s
    
    text = remove_files(text)
    
    # Remove categories
    text = re.sub(r'\[\[Category:.*?\]\]', '', text, flags=re.IGNORECASE)
    
    # Convert wikilinks: [[Link|Display]] -> Display, [[Link]] -> Link
    text = re.sub(r'\[\[(?:[^|\]]*\|)?([^\]]+)\]\]', r'\1', text)
    
    # Remove external links
    text = re.sub(r'\[https?://[^\]]+\]', '', text)
    
    # Remove section headers
    text = re.sub(r'={2,}.*?={2,}', '', text)
    
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    
    # Remove formatting
    text = re.sub(r"'{2,}", '', text)
    
    # Filter lines
    lines = []
    skip_sections = {'Footnotes', 'Bibliography', 'References', 'External links', 'See also', 'Notes', 'Further reading'}
    
    for line in text.split('\n'):
        line = line.strip()
        if not line or line in skip_sections:
            continue
        if re.match(r'^[\*:;\|!{}\[\]]', line):
            continue
        lines.append(line)
    
    return '\n\n'.join(lines)


def get_wikipedia_page_text(title: str, sleep: float = 0.5) -> str:
    """
    Fetch the plain text of a Wikipedia page (English) given its title.

    Args:
        title (str): The Wikipedia page title (e.g., "Artificial intelligence").
        sleep (float): Optional pause between requests (in seconds) to be polite.

    Returns:
        str: Cleaned plain-text content of the page, or empty string if not found.
    """
    params = {
        "action": "query",
        "format": "json",
        "prop": "revisions",
        "rvprop": "content",
        "titles": title,
        "formatversion": 2,
        "redirects": 1
    }

    try:
        r = requests.get(
            config["WIKI_API"],
            params=params,
            headers=config["WIKI_HEADERS"],
            timeout=10
        )
        r.raise_for_status()
        data = r.json()
        pages = data.get("query", {}).get("pages", [])
        if not pages or "revisions" not in pages[0]:
            return ""

        # Raw wikitext
        wikitext = pages[0]["revisions"][0]["content"]
        text = _clean_wikitext(wikitext)

        time.sleep(sleep)
        return text.strip()

    except Exception as e:
        print(f"[Error] {title}: {e}")
        return ""


### Duckdb Functions

In [8]:
# Create dataset table
def create_tables() -> bool:
    ddl = """
    CREATE OR REPLACE TABLE wiki_pages (
        id UUID DEFAULT uuid() PRIMARY KEY,
        wiki_page_id INTEGER UNIQUE,
        title TEXT,
        url TEXT,
        categories JSON  -- store list of strings as JSON
    );

    CREATE OR REPLACE TABLE wiki_chunks (
        id UUID DEFAULT uuid() PRIMARY KEY,
        chunk TEXT,
        questions JSON,  -- store list of strings as JSON
        hash TEXT UNIQUE,
        page_id UUID 
    );
    """

    try:
        con.execute(ddl)
    except Exception as e:
        print(f"Failed to create tables: {e}")
        return False

    return True

In [9]:
def insert_wiki_pages(pages: list[dict]) -> bool:
    if not pages:
        return True

    try:
        data = [
            (p["wiki_page_id"], p["title"], p["url"], json.dumps(p.get("categories", [])))
            for p in pages
        ]
        con.executemany(
            "INSERT INTO wiki_pages (wiki_page_id, title, url, categories) VALUES (?, ?, ?, ?)",
            data
        )
        return True
    except Exception as e:
        print(f"Batch insert into wiki_pages failed: {e}")
        return False


def insert_wiki_chunks(chunks: list[dict]) -> bool:
    if not chunks:
        return True

    try:
        data = [
            (c["chunk"], json.dumps(c.get("questions", [])), c["hash"], c["page_id"])
            for c in chunks
        ]
        con.executemany(
            "INSERT INTO wiki_chunks (chunk, questions, hash, page_id) VALUES (?, ?, ?, ?)",
            data
        )
        return True
    except Exception as e:
        print(f"Batch insert into wiki_chunks failed: {e}")
        return False


In [10]:
def retrieve_wiki_pages():
    query = "SELECT * from wiki_pages"

    df_result = con.execute(query).df()

    return df_result.to_dict('records')

In [11]:
def generate_data_stats():
    query = """
    SELECT
        (SELECT COUNT(DISTINCT id) FROM wiki_pages) AS num_pages,
        (SELECT COUNT(DISTINCT id) FROM wiki_chunks) AS num_chunks,

        -- Chunk word counts
        (SELECT MIN(array_length(regexp_split_to_array(trim(chunk), '\\s+'), 1))
         FROM wiki_chunks WHERE chunk IS NOT NULL AND trim(chunk) <> '') AS min_chunk_length,

        (SELECT MAX(array_length(regexp_split_to_array(trim(chunk), '\\s+'), 1))
         FROM wiki_chunks WHERE chunk IS NOT NULL AND trim(chunk) <> '') AS max_chunk_length,

        (SELECT AVG(array_length(regexp_split_to_array(trim(chunk), '\\s+'), 1)::numeric)
         FROM wiki_chunks WHERE chunk IS NOT NULL AND trim(chunk) <> '') AS avg_chunk_length,

        (SELECT percentile_disc(0.5)  WITHIN GROUP (ORDER BY array_length(regexp_split_to_array(trim(chunk), '\\s+'), 1))
         FROM wiki_chunks WHERE chunk IS NOT NULL AND trim(chunk) <> '') AS median_chunk_length,

        (SELECT percentile_disc(0.25) WITHIN GROUP (ORDER BY array_length(regexp_split_to_array(trim(chunk), '\\s+'), 1))
         FROM wiki_chunks WHERE chunk IS NOT NULL AND trim(chunk) <> '') AS q1_chunk_length,

        (SELECT percentile_disc(0.75) WITHIN GROUP (ORDER BY array_length(regexp_split_to_array(trim(chunk), '\\s+'), 1))
         FROM wiki_chunks WHERE chunk IS NOT NULL AND trim(chunk) <> '') AS q3_chunk_length,

        -- Questions 
        (SELECT SUM(json_array_length(questions)) FROM wiki_chunks WHERE questions IS NOT NULL) AS num_questions, 
        (SELECT AVG(json_array_length(questions)::numeric) FROM wiki_chunks WHERE questions IS NOT NULL) AS avg_num_questions_per_chunk ;
    ;
    """
    df_result = con.execute(query).df()
    
    return df_result.to_dict('records')


In [12]:
def save_tables(dir_path: str):
    os.makedirs(dir_path, exist_ok=True)

    con.execute("SELECT * FROM wiki_pages").df().to_csv(os.path.join(dir_path, "wiki_pages.csv"), index=False)
    con.execute("SELECT * FROM wiki_chunks").df().to_csv(os.path.join(dir_path, "wiki_chunks.csv"), index=False)

### Gemini Question generation Functions

In [13]:
def ask_gemini(
    contents: str,
    model: str = "gemini-2.5-flash",
    retries: int = 3,
    backoff_factor: float = 2.0,
) -> str:
    """
    Generate text using Gemini with retry and standard exception handling.
    """
    for attempt in range(1, retries + 1):
        try:
            response = client.models.generate_content(
                model=model,
                contents=contents,
            )
            return response.text.strip()

        except (ConnectionError, TimeoutError) as e:
            print(f"Network issue on attempt {attempt}: {e}")
        except Exception as e:
            print(f"Error on attempt {attempt}: {e}")

        # Retry if not the last attempt
        if attempt < retries:
            sleep_time = backoff_factor ** (attempt - 1)
            print(f"Retrying in {sleep_time:.1f} seconds...")
            time.sleep(sleep_time)
        else:
            print("All retries failed.")
            return f"Error: {e}"

    return "Failed to generate response after multiple attempts."

In [14]:
def generate_questions(
    context: str,
    model: str = "gemini-2.5-flash",
    retries: int = 3,
    backoff_factor: float = 2.0,
) -> dict:
    """
    Generate Q&A pairs from a given context using Gemini,
    returning a JSON object like:
      {"QAs": [{"Question": "...", "Answer": "..."}]}
    """
    system_prompt = (
        "You are an expert question generator. "
        "Given the following text, generate several question–answer pairs "
        "that test understanding of its key ideas. "
        "Answers must be short and concise — ideally one short phrase or sentence, not long explanations. "
        "Respond ONLY in valid JSON with this structure:\n"
        '{"QAs": [{"Question": "<string>", "Answer": "<string>"}]}'
    )

    for attempt in range(1, retries + 1):
        try:
            response = client.models.generate_content(
                model=model,
                contents=[system_prompt, context],
                config={"response_mime_type": "application/json"},
            )

            # If response is valid JSON, .parsed gives a Python dict
            # print(response.text)
            return json.loads(response.text)

        except (ConnectionError, TimeoutError) as e:
            print(f"Network issue on attempt {attempt}: {e}")
        except Exception as e:
            print(f"Error on attempt {attempt}: {e}")

        if attempt < retries:
            sleep_time = backoff_factor ** (attempt - 1)
            print(f"Retrying in {sleep_time:.1f} seconds...")
            time.sleep(sleep_time)
        else:
            print("All retries failed.")
            return {"QAs": []}

    return {"QAs": []}

### Semantic Chunking Functions

In [15]:
def get_chunks(content: str, threshold: int = 400) -> list[str]:
    chunks = chunker(content)
    valid_chunks = [chunk for chunk in chunks if len(chunk.split()) > threshold]
    
    return valid_chunks

### Miscellaneous functions

In [16]:
def hash_text(text: str):
    text_bytes = text.encode('utf-8')
    hash_object = hashlib.sha256(text_bytes)
    hash_hex = hash_object.hexdigest()
    
    return hash_hex


### Final Functions

In [17]:
def fetch_all_articles(
    seed_categories: list, 
    recurse: bool = False, 
    max_depth: int = 0, 
    max_pages_per_seed: int = 10000, 
    delay_between_api_calls: float = 0.01
):
    # Get articles
    pages = get_pages_from_categories(
        seed_categories=seed_categories,
        recurse=recurse,
        max_depth=max_depth, 
        max_pages_per_seed=max_pages_per_seed, 
        delay_between_api_calls=delay_between_api_calls
    )

    # Save to wiki_pages
    wiki_pages = [{"wiki_page_id": page["pageid"], "title": page["title"], "url": page["fullurl"], "categories": page["categories"]} for page in pages]
    result = insert_wiki_pages(pages=wiki_pages)


    return result


In [18]:
async def generate_wiki_chunks(model: str = "gemini-2.5-flash", concurrency: int = 8, chunk_min_threshold: int = 400):
    # Get list of wiki page titles and ids
    pages = retrieve_wiki_pages()

    records = []
    sem = asyncio.Semaphore(concurrency)

    # Loop (async) to generate all the chunks
    for page in tqdm(pages, desc="Generating chunks"):
        # Get content
        content = get_wikipedia_page_text(page["title"])

        # get chunks
        chunks = get_chunks(content, threshold=chunk_min_threshold)

        # for each chunk, generate Q&As then store the chunk into the database
        async def process_chunk(chunk):
            async with sem:
                qa = generate_questions(context=chunk, model=model)
                return {
                    "chunk": chunk,
                    "questions": qa["QAs"],
                    "hash": hash_text(chunk),
                    "page_id": page["id"]
                }

        tasks = [asyncio.create_task(process_chunk(chunk)) for chunk in chunks]
        results = await asyncio.gather(*tasks)
        records.extend(results)

    # Store all chunks into the database
    insert_wiki_chunks(records)

    return True

In [19]:
async def generate_dataset(
    seed_categories: list, 
    recurse: bool = False, 
    max_depth: int = 0, 
    max_pages_per_seed: int = 10000, 
    delay_between_api_calls: float = 0.01,
    model: str = "gemini-2.5-flash",
    refresh_tables: bool = True,
    page_processing_concurrency: int = 8,
    chunk_min_threshold: int = 400
):
    # Clear tables if refresh_tables=True
    if refresh_tables:
        flag = create_tables()

        if not flag:
            raise ValueError("Failed to create tables")

    # Find the articles
    flag = fetch_all_articles(
        seed_categories=seed_categories,
        recurse=recurse,
        max_depth=max_depth, 
        max_pages_per_seed=max_pages_per_seed, 
        delay_between_api_calls=delay_between_api_calls
    )

    if not flag:
        raise ValueError("Failed to fetch articles")

    # For each article: Chunk, then save into the chunk table
    flag = await generate_wiki_chunks(model=model, concurrency=page_processing_concurrency, chunk_min_threshold=chunk_min_threshold)

    if not flag:
        raise ValueError("Failed to generate wiki chunks")

    # Print some dataset statistics
    stats = generate_data_stats()[0]

    print("Number of pages:", stats['num_pages'])
    print("Number of chunks:", stats['num_chunks'])
    print("Min chunk length (words):", stats['min_chunk_length'])
    print("Max chunk length (words):", stats['max_chunk_length'])
    print("Avg chunk length (words):", stats['avg_chunk_length'])
    print("Median chunk length (words):", stats['median_chunk_length'])
    print("Q1 (25th percentile):", stats['q1_chunk_length'])
    print("Q3 (75th percentile):", stats['q3_chunk_length'])
    print("Total questions:", stats['num_questions'])
    print("Avg questions per chunk:", stats['avg_num_questions_per_chunk'])

    return True

## Wiki generator

In [None]:
seeds = ["Science", "Geography", "Economics", "History"]
await generate_dataset(
    seed_categories=seeds, 
    recurse=True, 
    max_depth=2, 
    max_pages_per_seed=2000, 
    delay_between_api_calls=0.01,
    model="gemini-2.5-flash",
    refresh_tables=True,
    page_processing_concurrency=10
)

In [None]:
save_tables(dir_path="/hpc/home/bfa6/work/data/yapper")

# Moby Dick Book

In [1]:
import requests, re, os, time
from slugify import slugify

OUT_DIR = "/hpc/home/bfa6/work/github/yapper/dataset"
TARGET = 200
os.makedirs(OUT_DIR, exist_ok=True)

def clean(t):
    t = t.replace("\r\n","\n")
    m = re.search(r"(?mi)\*\*\* *START OF (THIS|THE) PROJECT GUTENBERG EBOOK.*?\*\*\*", t)
    if m: t = t[m.end():]
    m2 = re.search(r"(?mi)^start of (the|this) project gutenberg", t)
    if m2: t = t[m2.end():]
    m3 = re.search(r"(?mi)^\s*contents\s*$", t[:4000])
    chap = re.compile(r"(?mi)^(chapter|chap\.|book|part)\b.*", re.M)
    if m3:
        mc = chap.search(t[m3.end():])
        if mc: t = t[m3.end()+mc.start():]
    m4 = chap.search(t[:10000])
    if m4: t = t[m4.start():]
    first = t.split("\n\n",1)
    if len(first)==2 and re.search(r"(?i)project gutenberg|license|copyright", first[0]):
        t = first[1]
    return t.strip()

def pick(fmt):
    for k,v in fmt.items():
        if "text/plain" in k.lower(): return v
    return None

def download_books(n=TARGET):
    s = requests.Session()
    got = 0
    seen = set()
    saved = []
    url = "https://gutendex.com/books?languages=en&mime_type=text%2Fplain&sort=descending"
    while url and got < n:
        try:
            j = s.get(url, timeout=20).json()
        except Exception as e:
            print("page error:", e); break
        page = sorted(j.get("results",[]), key=lambda b: (b.get("download_count") or 0))
        for b in page:
            if got >= n: break
            bid = b.get("id")
            if not bid or bid in seen: 
                continue
            u = pick(b.get("formats",{}))
            if not u: 
                seen.add(bid); continue
            try:
                r = s.get(u, timeout=20)
                r.encoding = r.apparent_encoding or "utf-8"
                txt = clean(r.text)
                fn = os.path.join(OUT_DIR, slugify(f"{bid}-{b.get('title','untitled')}")[:150]+".txt")
                # avoid overwriting if file exists (very unlikely because of slug with id)
                if not os.path.exists(fn):
                    with open(fn,"w",encoding="utf8") as f: f.write(txt)
                    print(f"[{got+1}] {b.get('title')} (downloads: {b.get('download_count')})")
                    saved.append(fn); got += 1
                else:
                    print("exists, skip:", fn)
                seen.add(bid)
                time.sleep(0.25)
            except Exception as e:
                print("skip", bid, e)
                seen.add(bid)
        url = j.get("next")
        time.sleep(0.25)
    print(f"Done: saved {got} books to {OUT_DIR}")
    return saved


In [22]:
files = download_books(200)
files[:5] 

[1] School education, Home Education Series, vol. 3 (of 6) (downloads: 0)
[2] The passing of the phantoms : $b A study of evolutionary psychology and morals (downloads: 0)
[3] Cats and kittens (downloads: 0)
[4] Anthropology and modern life (downloads: 0)
[5] The girl at Silver Thistle (downloads: 0)
[6] Bear and forbear : $b or, The young skipper of lake Ucayga (downloads: 0)
[7] The life of Abdel Kader, ex-sultan of the Arabs of Algeria (downloads: 0)
[8] The crusades (downloads: 0)
[9] Codes (downloads: 0)
[10] A diary of the wreck of His Majesty's ship Challenger, on the western coast of South America, in May, 1835 : $b with an account of the subsequent encampment of the officers and crew, during a period of seven weeks, on the south coast of Chili (downloads: 0)
[11] The adventures of Harlequin (downloads: 0)
[12] Walks and talks of an American farmer in England (Part 2 of 2) : $b In the years 1850-51. (downloads: 0)
[13] Walks and talks of an American farmer in England (Part 1 of

['/hpc/home/bfa6/work/github/yapper/dataset/77188-school-education-home-education-series-vol-3-of-6.txt',
 '/hpc/home/bfa6/work/github/yapper/dataset/77186-the-passing-of-the-phantoms-b-a-study-of-evolutionary-psychology-and-morals.txt',
 '/hpc/home/bfa6/work/github/yapper/dataset/77183-cats-and-kittens.txt',
 '/hpc/home/bfa6/work/github/yapper/dataset/77181-anthropology-and-modern-life.txt',
 '/hpc/home/bfa6/work/github/yapper/dataset/77180-the-girl-at-silver-thistle.txt']

In [20]:
base = "/hpc/home/bfa6/work/github/yapper/dataset"
files = [base + "/" + file for file in os.listdir(base)]

In [21]:
all_chunks = []

for file in files:
    with open(file, "r") as f:
        book = f.read() 
    chunks = get_chunks(book)
    all_chunks += [{"chunk": chunk, "source": file} for chunk in chunks]

In [22]:
print(len(all_chunks))

18914


In [24]:
with open(base + "/chunks.json", "w") as f:
    json.dump(all_chunks, f) 

In [23]:
all_chunks[:1]

[{'chunk': '                         THE CAVE OF ELEPHANTA.\n\n [Illustration: A view of a cave, with large statues and pillars and two\n                         people standing inside.]\n\nOne of the earliest monuments of India that attracted the notice of\nEuropeans was the excavation of Elephanta, situated in a beautiful\nisland of the same name, called by the natives Goripura, or _Mountain\nCity_. This island is in the bay of Bombay, seven miles from Bombay\ncastle; it is about six miles in circumference, and composed of two long\nhills with a narrow valley between them.\n\nThe island has taken its familiar name from a colossal statue of an\nelephant, cut out of a detached mass of blackish rock unconnected with\nany stratum below. This figure has had another on its back, which the\nold travellers call a young elephant, but which, as far as we can judge\nfrom the drawing of what remains of it, has much more probably been a\ntiger. The head and neck of this elephant dropped off about

In [25]:
async def generate_chunks_qa(chunks: list, model: str = "gemini-2.5-flash", concurrency: int = 8):
    sem = asyncio.Semaphore(concurrency)

    async def process_chunk(chunk):
        async with sem:
            loop = asyncio.get_event_loop()
            qa = await loop.run_in_executor(
                None,  # Uses default ThreadPoolExecutor
                generate_questions,
                chunk["chunk"],
                model
            )
            return {
                "chunk": chunk["chunk"],
                "QAs": qa["QAs"],
                "hash": hash_text(chunk["chunk"]),
                "source": chunk["source"]
            }

    tasks = [process_chunk(chunk) for chunk in chunks]
    results = await tqdm_asyncio.gather(*tasks, desc="Generating Q&A")
    return results

In [None]:
result = await generate_chunks_qa(all_chunks)