# Stage 3: Fetch Pages


This notebook implements the third stage of the pipeline: fetching pages using LLMs, recording metrics, and saving results.

In [1]:
# Import Required Libraries
import os
import glob
import json
import pandas as pd
import time
import requests
from datetime import datetime
from pathlib import Path

In [2]:
# Identify Latest Folder Under Pagination
pagination_root = 'processing/1_pagination/'
folders = [f for f in os.listdir(pagination_root) if os.path.isdir(os.path.join(pagination_root, f))]
latest_folder = sorted(folders)[-1] if folders else None
pagination_path = os.path.join(pagination_root, latest_folder) if latest_folder else None
assert pagination_path and os.path.exists(pagination_path), 'No pagination folder found.'
print('Using pagination folder:', pagination_path)

Using pagination folder: processing/1_pagination/20250831_091630


In [3]:
# Load Pagination Criteria
json_files = glob.glob(os.path.join(pagination_path, '*.json'))
tables = []
for jf in json_files:
    with open(jf, 'r') as f:
        obj = json.load(f)
    tables.append({'path': jf, 'meta': obj['meta'], 'criteria': obj['pagination_criteria']})
print(f'Loaded {len(tables)} tables.')

Loaded 5 tables.


In [4]:
import re

class TableGenerator_JSON():
    TEMPLATE = """
    List %s - as many as possible to fit into response.
    The response will be formatted as JSON shown below.
    Each element of the response will contain %d fields: %s.
    Do not output any additional text that is not in JSON format.
    %s
    
    """   

    def _norm_field(self, s):
        s = s.lower().replace(" ","_").replace("-","_").replace(".", "").replace(",","_")\
                .replace("(", "").replace(")", "").replace(":", "").replace('"','').replace("'","")\
                .replace("/", "")
        return re.sub('_+', '_', s)
        
    def generate_prompts(self, query, fields, paging: dict | None):
        system_msg = "You are a retriever of facts."

        num_fields = len(fields)
        fields_json = []
        fields = [f for f in fields]
        for field in fields:
            fields_json.append('"%s": "%s"' % ('_'.join(field.replace("-", " ").split()), field))
        response_format = ', '.join(fields)
        if paging:
            paging_criteria = ('Only fetch the results where values for %s match: %s.' % (paging['field'], paging['value']))
        else:
            paging_criteria = ''
        user_msg = self.TEMPLATE % (query, num_fields, response_format, paging_criteria)
        return system_msg, user_msg

    def parse_llm_response(self, response): 
        res = []
        try:
            if not response.startswith("[") and "[" in response:
                response = response[response.find("["):]

            if not response.endswith("]") and "]" in response:
                response = response[:response.rfind("]")+1]

            if '[' not in response and ']' not in response and '{' in response and '}' in response:
                response = '[' + response + ']'    

            response_json = json.loads(response)

            if isinstance(response_json, dict) and len(response_json.keys()) == 1:
                response_json = list(response_json.values())[0]    
        except:  
            split_response = response.split("{")
            response_json = []
            for s in split_response[1:]:
                split_s = s.split("}")
                if len(split_s) > 1:
                    content = split_s[0]
                    attributes = content.split(",")
                    elements = {}
                    for attr in attributes:
                        knv = attr.split(":")   
                        if len(knv) > 1:
                            parsed_k = "%s" % knv[0].replace('"','').strip()
                            parsed_v = "%s" % knv[1].replace('"','').strip()
                            elements[parsed_k] = parsed_v

                    response_json.append(elements)  

        df = pd.DataFrame.from_records(response_json) 
        return df

In [None]:
# Fetch Pages Using LLM and Record Metrics (Parallelized)
from io import StringIO
from concurrent.futures import ThreadPoolExecutor, as_completed

PROVIDE_SOURCE_TABLE = True  # If True, include source table as CSV in the prompt
LLM_TIMEOUT = 30  # seconds
LLM_MODEL = 'x-ai/grok-3-mini'  # Use this model for all criteria

OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY', '')

output_root = 'processing/2_fetched_pages/'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_folder = os.path.join(output_root, timestamp)
os.makedirs(output_folder, exist_ok=True)

def compute_metrics(merged_df, source_columns, metrics, source_df):
    row_count = len(merged_df) if merged_df is not None else 0
    col_consistency = (set(merged_df.columns) == set(source_columns)) if source_columns and not merged_df.empty else None
    error_count = sum(1 for m in metrics if m['error'])
    total_pages = len(metrics)
    error_rate = error_count / total_pages if total_pages > 0 else None
    latencies = [m['latency'] for m in metrics if m['latency'] is not None]
    avg_latency = sum(latencies) / len(latencies) if latencies else None
    token_counts = [m['usage'].get('total_tokens', 0) for m in metrics if m['usage'] and 'total_tokens' in m['usage']]
    sum_tokens = sum(token_counts) if token_counts else None
    acc_metrics = None
    if source_df is not None and not merged_df.empty:
        acc_metrics = accuracy_metrics(merged_df, source_df)
    return {
        'row_count': row_count,
        'column_consistency': col_consistency,
        'error_rate': error_rate,
        'avg_latency': avg_latency,
        'sum_tokens': sum_tokens,
        'accuracy': acc_metrics
    }

table_generator = TableGenerator_JSON()

def fetch_page_llm(prompt, model, api_key, system_msg: str = ''):
    url = 'https://openrouter.ai/api/v1/chat/completions'
    headers = {'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json'}
    payload = {
        'model': model,
        'messages': [
            {'role': 'system', 'content': system_msg},
            {'role': 'user', 'content': prompt}
        ],
        'max_tokens': 100000
    }
    start = time.time()
    try:
        resp = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT)
        latency = time.time() - start
        resp.raise_for_status()
        result = resp.json()
        content = result['choices'][0]['message']['content'] if 'choices' in result else ''
        content = table_generator.parse_llm_response(content)
        usage = result.get('usage', {})
        return content, latency, usage, None
    except Exception as e:
        print(f"[ERROR] Failed to fetch page: {e}")
        return None, None, None, str(e)

# Accuracy metric functions (from old/2_Metrics_calculation.ipynb)
def accuracy_metrics(merged_df, source_df):
    # Only compare columns present in both
    common_cols = [col for col in source_df.columns if col in merged_df.columns]
    # Cast all columns to string for comparison
    src = source_df[common_cols].drop_duplicates().reset_index(drop=True).astype(str)
    pred = merged_df[common_cols].drop_duplicates().reset_index(drop=True).astype(str)
    # Row-level accuracy: fraction of source rows present in merged
    correct_rows = src.merge(pred, how='inner').shape[0]
    total_rows = src.shape[0]
    row_recall = correct_rows / total_rows if total_rows > 0 else None
    # Precision: fraction of merged rows that are correct
    correct_pred_rows = pred.merge(src, how='inner').shape[0]
    total_pred_rows = pred.shape[0]
    row_precision = correct_pred_rows / total_pred_rows if total_pred_rows > 0 else None
    # F1 score
    if row_precision is not None and row_recall is not None and (row_precision + row_recall) > 0:
        row_f1 = 2 * row_precision * row_recall / (row_precision + row_recall)
    else:
        row_f1 = None
    return {
        'row_recall': row_recall,
        'row_precision': row_precision,
        'row_f1': row_f1
    }

def fetch_page_task(args):
    # args: (meta, crit_obj, page_key, model_name, source_columns, source_csv_str)
    meta, crit_obj, page_key, model_name, source_columns, source_csv_str = args
    page_content = {'field': crit_obj.get('criteria',''), 'value': page_key} if page_key != 'ALL' else None
    system_msg, user_msg = table_generator.generate_prompts(meta.get('query_without_cutoff'), source_columns, page_content)
    if PROVIDE_SOURCE_TABLE and source_csv_str:
        user_msg += f"\n\nSource table as CSV:\n{source_csv_str}"
    print(f"[FETCH] Table: {meta.get('name')}, Page: {page_content}, Model: {model_name}")
    content, latency, usage, error = fetch_page_llm(user_msg, LLM_MODEL, OPENROUTER_API_KEY, system_msg)
    return {
        'content': content,
        'latency': latency,
        'usage': usage,
        'error': error,
        'page_key': page_key
    }

for table in tables:
    meta = table['meta']
    criteria = table['criteria']
    source_csv_path = meta.get('source_file')
    source_csv_str = ''
    source_columns = None
    source_df = None
    if PROVIDE_SOURCE_TABLE and source_csv_path and os.path.exists(source_csv_path):
        try:
            source_df = pd.read_csv(source_csv_path)
            source_csv_str = source_df.to_csv(index=False)
            source_columns = list(source_df.columns)
            csv_path = os.path.join(output_folder, meta['file'])
            source_df.to_csv(csv_path, index=False)
        except Exception as e:
            print(f'[WARNING] Could not load source CSV for {meta.get("name")}: {e}')
            source_csv_str = ''
            source_columns = None
    table_results = {}
    for method, crit in criteria.items():
        # For llm, collect top recommendation from all listed models
        if method == 'llm':
            criteria_list = []
            for model_name, model_criteria in crit.items():
                if not model_criteria:
                    continue
                top_crit = model_criteria[0] if isinstance(model_criteria, list) else model_criteria
                criteria_list.append((model_name, top_crit))
        else:
            criteria_list = [(LLM_MODEL, crit)]
        for model_name, crit_obj in criteria_list:
            model_name = model_name.replace('/', '_')
            pages = crit_obj.get('pages', [])
            if source_columns:
                merged_df = pd.DataFrame(columns=source_columns)
            else:
                merged_df = pd.DataFrame()
            metrics = []
            # Prepare tasks for all pages
            tasks = [(meta, crit_obj, page_key, model_name, source_columns, source_csv_str) for page_key in pages]
            results = []
            with ThreadPoolExecutor(max_workers=8) as executor:
                future_to_page = {executor.submit(fetch_page_task, t): t[2] for t in tasks}
                for future in as_completed(future_to_page):
                    page_key = future_to_page[future]
                    try:
                        res = future.result()
                        metrics.append({'latency': res['latency'], 'usage': res['usage'], 'error': res['error']})
                        if res['content'] is not None:
                            try:
                                df_page = res['content']
                                merged_df = pd.concat([merged_df, df_page], ignore_index=True)
                            except Exception:
                                print(f"[WARNING] Skipping non-CSV response for table {meta.get('name')}, page {page_key}, model {model_name}")
                    except Exception as e:
                        print(f"[ERROR] Exception in future for page {page_key}: {e}")
            if merged_df is not None and not merged_df.empty:
                csv_name = f"{meta.get('id','')}_{meta.get('name','')}_{method}_{model_name}.csv"
                csv_path = os.path.join(output_folder, csv_name)
                merged_df.to_csv(csv_path, index=False)
                print(f'Saved merged CSV: {csv_path}')
            else:
                raise ValueError(f'Merged DataFrame is empty for table {meta.get("name")}, method {method}, model {model_name}')
            # After building merged_df and metrics:
            metric_result = compute_metrics(merged_df, source_columns, metrics, source_df)
            table_results[f'{method}_{model_name}'] = {
                'merged_df': merged_df,
                'metrics': metrics,
                'criteria': crit_obj,
                **metric_result
            }
    out_json = {
        'meta': meta,
        'results': {}
    }
    for key, res in table_results.items():
        out_json['results'][key] = {
            'criteria': res.get('criteria'),
            'metrics': res.get('metrics'),
            'row_count': res.get('row_count'),
            'column_consistency': res.get('column_consistency'),
            'error_rate': res.get('error_rate'),
            'avg_latency': res.get('avg_latency'),
            'sum_tokens': res.get('sum_tokens'),
            'accuracy': res.get('accuracy')
        }
    json_name = f"{meta.get('id','')}_{meta.get('name','')}_metrics.json"
    json_path = os.path.join(output_folder, json_name)
    os.makedirs(os.path.dirname(json_path), exist_ok=True)
    with open(json_path, 'w') as f:
        json.dump(out_json, f, indent=2)
    print(f'Saved JSON metadata: {json_path}')

[FETCH] Table: english_latin_rivalry_1887_2012, Page: None, Model: x-ai_grok-3-mini
     Year  Latin  English   Winner
0    1887     16        0    Latin
1    1888     38        0    Latin
2    1889      4       10  English
3    1890      0       22  English
4    1891     14       10    Latin
..    ...    ...      ...      ...
121  2008     36        0    Latin
122  2009     27       16    Latin
123  2010     54       12    Latin
124  2011     50        0    Latin
125  2012     44       15    Latin

[126 rows x 4 columns]
Saved merged CSV: processing/2_fetched_pages/20250831_104628/25_english_latin_rivalry_1887_2012_naive_x-ai_grok-3-mini.csv
[FETCH] Table: english_latin_rivalry_1887_2012, Page: {'field': 'English', 'value': 0}, Model: x-ai_grok-3-mini
[FETCH] Table: english_latin_rivalry_1887_2012, Page: {'field': 'English', 'value': 4}, Model: x-ai_grok-3-mini
[FETCH] Table: english_latin_rivalry_1887_2012, Page: {'field': 'English', 'value': 5}, Model: x-ai_grok-3-mini
[FETCH] Table