In [None]:
# Submit selected pages to Gemini

This file submits the sets of pages selected by the `select_pages_for_gemini.ipynb` notebook to Gemini.

Instead of submitting all pages (there are ~4 million pages with dates which might cost $3000 total), we submit only pages selected to fit a $300 budget.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from wiki_dump_extractor import WikiAvroDumpExtractor, llm_utils
import json
import db_utils
import tempfile
from tqdm.auto import tqdm

model_name = "gemini-2.0-flash"
wiki_dump = WikiAvroDumpExtractor("wiki_dump.avro", index_dir="wiki_dump_idx")


In [3]:
def process_batch(pages_batch_and_index):
    pages_batch, index = pages_batch_and_index
    batch_name = f"batch_{index}"
    requests = [
        llm_utils.PageEventExtractionRequest.from_page(
            page=page,
            model_settings={"temperature": 0, "top_p": 0.95},
        )
        for page in pages_batch
    ]
    with tempfile.NamedTemporaryFile(delete=False, suffix=".jsonl") as f:
        llm_utils.PageEventExtractionRequest.list_to_batch_jsonl(
            requests=requests, path=f.name
        )
        return llm_utils.process_batch_request(
            jsonl_path=f.name,
            batch_name=batch_name,
            bucket_name="wikipage-extraction",
            model=model_name,
        )


all_events = 0
all_usage = {"promptTokenCount": 0, "candidatesTokenCount": 0}
all_failed = {}
all_errored = {}


def process_dump(dump, db_path):
    all_events = 0
    all_usage = {"promptTokenCount": 0, "candidatesTokenCount": 0}
    all_failed = {}
    all_errored = {}
    with db_utils.open_plyvel_db(db_path, replace=False) as events_by_page_db:
        batches_being_processed = dump.process_page_batches_in_parallel(
            process_fn=process_batch,
            batch_size=1_000,
            page_filter=lambda p: not events_by_page_db.get(p.title.encode("utf-8")),
            num_workers=2,
        )

        for batch_result in tqdm(batches_being_processed):
            events_by_page, failed, errored, usage = batch_result
            all_events += sum(len(events) for events in events_by_page.values())
            all_failed.update(failed)
            all_errored.update(errored)
            all_usage["promptTokenCount"] += usage["promptTokenCount"]
            all_usage["candidatesTokenCount"] += usage["candidatesTokenCount"]
            for page_title, data in failed.items():
                data = json.dumps(data).encode("utf-8")
                events_by_page_db.put(page_title.encode("utf-8"), data)

            for page_title, error in errored.items():
                events_by_page_db.put(page_title.encode("utf-8"), error.encode("utf-8"))

            for page_title, events in events_by_page.items():
                events_json = json.dumps(events).encode("utf-8")
                events_by_page_db.put(page_title.encode("utf-8"), events_json)

    return all_events, all_usage, all_failed, all_errored


## Top historical pages


In [4]:
historical_dump = WikiAvroDumpExtractor("historical_pages.avro")
historical_pages = set([p.title for p in historical_dump.iter_pages()])
len(historical_pages)
if False:
    all_events, all_usage, all_failed, all_errored = process_dump(
        historical_dump, db_path=f"events_extracted_by_page_{model_name}_db"
    )
    print("Failed", len(all_failed))
    print("Errored", len(all_errored))
    print("Events", all_events)
    print("Usage", all_usage)


ValueError: cannot read header - is it an avro file?

## Top places


In [5]:
places_dump = WikiAvroDumpExtractor("geospatial/places_with_over_20_dates.avro")
places_pages = set([p.title for p in places_dump.iter_pages()])
len(places_pages)

117871

In [6]:
if False:
    all_events, all_usage, all_failed, all_errored = process_dump(
        places_dump, db_path=f"events_extracted_by_page_{model_name}_db"
    )
    print("Failed", len(all_failed))
    print("Errored", len(all_errored))
    print("Events", all_events)
    print("Usage", all_usage)


## BC pages


In [7]:
import glob
from pathlib import Path

target = Path("negative_dates_dump.avro")
if not target.exists():
    pages_with_negative_dates = set()
    for file_path in glob.glob("dates_by_year/-*.avro"):
        df = db_utils.avro_to_pandas(file_path)
        pages_with_negative_dates.update(set(df.page))

    pages_with_negative_dates = sorted(pages_with_negative_dates)
    print(f"Total pages in negative year files: {len(pages_with_negative_dates)}")
    wiki_dump.extract_pages_titles_to_new_dump(pages_with_negative_dates, target)

In [8]:
negative_dates_dump = WikiAvroDumpExtractor(target)
if False:
    all_events, all_usage, all_failed, all_errored = process_dump(
        negative_dates_dump, db_path=f"events_extracted_by_page_{model_name}_db"
    )
    print("Failed", len(all_failed))
    print("Errored", len(all_errored))
    print("Events", all_events)
    print("Usage", all_usage)


## 1949 pages


In [67]:
dates_1949_df = db_utils.avro_to_pandas("dates_by_year/1949.avro")
pages_1949 = list(set(dates_1949_df.page))
len(pages_1949)


116731

In [68]:
target = Path("y1949_dump.avro")
if not target.exists():
    wiki_dump.extract_pages_titles_to_new_dump(pages_1949, target)
y1949_dump = WikiAvroDumpExtractor(target)
if False:
    all_events, all_usage, all_failed, all_errored = process_dump(
        y1949_dump, db_path=f"events_extracted_by_page_{model_name}_db"
    )
    print("Failed", len(all_failed))
    print("Errored", len(all_errored))
    print("Events", all_events)
    print("Usage", all_usage)

0it [00:00, ?it/s]

## Pages 400-1949 with over 10 events


In [20]:
with open("pages_250_1920_with_over_10_dates.json", "r") as f:
    y250_1920_10plus_event_pages = json.load(f)

In [21]:
target = Path("y250_1920_10plus_events_dump.avro")
if not target.exists():
    wiki_dump.extract_pages_titles_to_new_dump(y250_1920_10plus_event_pages, target)


0it [00:00, ?it/s]

In [22]:
y250_1920_10plus_events_dump = WikiAvroDumpExtractor(target)
if False:
    all_events, all_usage, all_failed, all_errored = process_dump(
        y250_1920_10plus_events_dump,
        db_path=f"events_extracted_by_page_{model_name}_db",
    )
    print("Failed", len(all_failed))
    print("Errored", len(all_errored))
    print("Events", all_events)
    print("Usage", all_usage)

0it [00:00, ?it/s]

gs://wikipage-extraction/batch_1/batch_1.jsonl
Launching batch_1...
Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/39028607318/locations/us-central1/batchPredictionJobs/6875829654020685824
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/39028607318/locations/us-central1/batchPredictionJobs/6875829654020685824')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/6875829654020685824?project=39028607318
BatchPredictionJob projects/39028607318/locations/us-central1/batchPredictionJobs/6875829654020685824 current state:
3
gs://wikipage-extraction/batch_0/batch_0.jsonl
Launching batch_0...
Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/39028607318/locations/us-central1/batchPredictionJobs/66387017436495872
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/39028607318/locat

In [16]:
counter = 0
with db_utils.open_plyvel_db(
    "events_extracted_by_page_gemini-2.0-flash_db", replace=False
) as db:
    for page_title, data in db.iterator():
        counter += 1
counter


372188

In [None]:
150k pages / 100$

1.488

In [None]:
67c per 1000 pages