# core

>Core routines, and also where the main `formalyzer` workflow is defined.

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

## Basic File I/O

In [None]:
#| export
import os 

def read_text_file(filename: str) -> str:
    "generic, read any text file" 
    with open(os.path.expanduser(filename)) as f:
        return f.read()

In [None]:
recc_info = read_text_file("../example/recc_info.txt") 
print(recc_info)

Reccomender Name: Teacher Person 
Title: Professor of Cleverness 

Address: 
Department of Curiosities
Generic University 
1337 Generic Pl. 
Springfield, WA 31416 USA

Phone: 555-123-1337
Email: teacher.person@generic.edu



In [None]:
#| export
def read_urls_file(filename: str) -> list[str]:
    "read a text file where each line is a url of a submission site" 
    return [line for line in read_text_file(filename).splitlines() if line]

In [None]:
urls = read_urls_file("../example/sample_urls.txt") 
print(f"{len(urls)} urls in list")
for i, url in enumerate(urls): 
    print(f"{i+1} of {len(urls)}: {url}")

2 urls in list
1 of 2: http://localhost:8000/sample_form.html
2 of 2: http://localhost:8000/sample_form2.html


In [None]:
#| export
from pypdf import PdfReader
import logging
logging.getLogger("pypdf").setLevel(logging.ERROR)

def read_pdf_text(filename: str) -> str:
    reader = PdfReader(os.path.expanduser(filename))
    return "\n".join(page.extract_text() for page in reader.pages)

In [None]:
letter_text = read_pdf_text("../example/sample_letter.pdf")
print(letter_text)

   Dear Graduate Admissions Committee,  I am writing to recommend Student Person for admission to your graduate program. Having worked closely with them for two years in both teaching and research capacities, I can say they are among the strongest students I have encountered in over a decade of academic work.  Student Person took several of my advanced courses — Quantum Rollercoasters, Physics of Impossible Machines, and a seminar on Neural Networks for Curious Minds. They also worked with me on an independent research project. In every setting, they showed sharp intellectual ability, creative thinking, and real persistence. Their coursework went beyond surface-level competence; they clearly grasped the deeper principles at play. As a researcher, they brought fresh perspectives while staying receptive to guidance.  What stands out most is their dependability. They consistently met deadlines and produced high-quality work. During our independent project, they actually moved ahead of sch

## Parsing HTML (Form)

In [None]:
#| export
from bs4 import BeautifulSoup
import json, re

def group_radio_buttons(soup, name):
    """Group radio buttons by name into a single field dict"""
    radios = soup.find_all('input', {'type': 'radio', 'name': name})
    first = radios[0] if radios else None
    field_id = first.get('id', '') if first else ''
    label = soup.find('label', {'for': field_id}) if field_id else None
    label_text = label.get_text(strip=True) if label else ''
    options = [r.get('value', '') for r in radios if r.get('value')]
    return {'id': name, 'label': label_text, 'type': 'radio', 'options': options, 'prefilled': False}

In [None]:
#| export
def scrape_form_fields(html: str) -> list[dict]:
    """Extract all fillable form fields from HTML"""
    soup = BeautifulSoup(html, 'html.parser')
    fields = []
    seen_radio_groups = set()
    
    for inp in soup.find_all(['input', 'select', 'textarea']):
        field_id = inp.get('id') or inp.get('name', '')
        if not field_id: continue
        field_type = inp.get('type', inp.name)
        if field_type in ['hidden', 'submit', 'button']: continue
        
        if field_type == 'radio':
            name = inp.get('name', '')
            if not name or name in seen_radio_groups: continue
            seen_radio_groups.add(name)
            fields.append(group_radio_buttons(soup, name))
            continue
        
        label = soup.find('label', {'for': field_id})
        label_text = label.get_text(strip=True) if label else ''
        current_value = inp.get('value', '')
        
        options = None
        if inp.name == 'select':
            options = [opt.get_text(strip=True) for opt in inp.find_all('option') if opt.get_text(strip=True)]
        
        fields.append({
            'id': field_id, 'label': label_text, 'type': field_type,
            'options': options, 'prefilled': bool(current_value and inp.name != 'select')
        })
    
    return fields

In [None]:
html = read_text_file("../example/sample_form.html") 
fields = scrape_form_fields(html) 
[f['id'] for f in fields][10:30]

['title',
 'phone',
 'email',
 'addr1',
 'addr2',
 'city',
 'state',
 'zip',
 'country',
 'months_known',
 'years_range',
 'capacity',
 'rating_intellectual',
 'rating_scientific',
 'rating_research',
 'rating_prev_work',
 'rating_lab',
 'rating_oral',
 'rating_writing',
 'rating_originality']

## LLM Usage
Next we prompt the LLM to figure out which form fields apply, and how: 

In [None]:
#| export
def trim_fields(fields: list[dict]) -> list[dict]:
    """Remove unnecessary fields so we send fewer tokens to LLM: 
    remove prefilled fields and drop options from non-select fields"""
    trimmed = []
    for f in fields:
        if f['prefilled']: continue
        f.pop('prefilled', None)
        if f['type'] not in ['select', 'radio']: f.pop('options', None)
        if f.get('label') == '': f.pop('label', None)
        trimmed.append(f)
    return trimmed

In [None]:
print(f"Fields JSON length:  {len(json.dumps(fields, separators=(',', ':')))} characters")
trimmed = trim_fields(fields) 
print(f"Trimmed JSON length: {len(json.dumps(trimmed, separators=(',', ':')))} characters")

In [None]:
#| export
def make_prompt(fields:list[dict], recc_info:str, letter_text:str) -> str:
    "build the prompt that will go to the LLM"
    return f"""You are filling out a graduate school recommendation form.

RECOMMENDER INFO:
{recc_info}

RECOMMENDATION LETTER:
{letter_text}

FORM FIELDS TO FILL:
{json.dumps(fields, separators=(',', ':'))}

For each field, provide the field ID and value to fill. 
Be careful to use ONLY the exact field IDs provided! 
For dropdowns, pick from the options listed.
Pay attention to groups of radio buttons (grouped via div or similar id prefixes) as they may form likert scales.
Return as valid JSON array: [{{"id": "form_xxx", "value": "..."}}]
Do not add any comments to the JSON. 
"""

In [None]:
prompt = make_prompt(trimmed, recc_info, letter_text)
print(f"Prompt is {len(prompt)} characters")
prompt[2000:3000] # brief inspection

In [None]:
#| export
def get_field_mappings(
        fields: list[dict],  # list of form fields
        recc_info: str,      # info on recommending person
        letter_text: str,    # text of recc letter
        model='claude-sonnet-4-20250514',  # LLM choice, e.g. "ollama/qwen2.5:14b" 
        debug=False,        # print debugging/status info
        ) -> str:
    """Use LLM to map recommender info and letter to form fields"""
    if 'claude' in model.lower():
        from claudette import Chat
    else:
        from lisette import Chat

    prompt = make_prompt(trim_fields(fields), recc_info, letter_text)

    chat = Chat(model=model)
    if debug: print(f"  Prompt length is {len(prompt)} characters")
    response = chat(prompt)
    content_text = response.content[0].text if hasattr(response, 'content') else response.choices[0].message.content
    content_text = re.sub(r'//.*', '', content_text)        # llm may have added JS-style comments, we don't want
    content_text = re.sub(r'/\*.*?\*/', '', content_text)   # and strip any /* */ -style comments
    if debug: print(f"LLM response:\n{content_text}\n")
    json_match = re.search(r'```json\s*(.*?)\s*```', content_text, re.DOTALL)
    json_str = json_match.group(1) if json_match else content_text.strip()
    return json.loads(json_str)

## Filling in the Form

In [None]:
#| export
async def get_element_info(page, field_id, field_type=None):
    "given an id or a name, find the element on the page and get its info"
    if field_type == 'radio':
        elem = page.locator(f'[name="{field_id}"]').first
    else:
        elem = page.locator(f'#{field_id}, [name="{field_id}"]')
    await elem.wait_for(timeout=1000)
    tag = await elem.evaluate('el => el.tagName.toLowerCase()')
    input_type = await elem.evaluate('el => el.type')
    return elem, tag, input_type

In [None]:
#| export
async def should_skip(elem, tag, input_type, skip_prefilled) -> bool:
    "should we fill in this element? Not if there's already a value there."
    if skip_prefilled and tag != 'select' and input_type != 'radio':
        current = await elem.input_value()
        if current: return True # there's already a value provided, skip it
    return False

In [None]:
#| export
async def fill_element(page, elem, tag, input_type, field_id, value):
    "actually fill in this element"
    if tag == 'select':
        await elem.select_option(label=value)
    elif input_type == 'radio':
        radio = page.locator(f'[name="{field_id}"][value="{value}"]')
        await radio.click()
    else:
        await elem.fill(value)

In [None]:
#| export
async def fill_form(page, mappings, fields, skip_prefilled=True, debug=False):
    """Fill form fields using Playwright"""
    # Build a type lookup from fields
    field_types = {f['id']: f['type'] for f in fields}
    
    results = {'filled': [], 'skipped': [], 'errors': []}
    for i, item in enumerate(mappings):
        field_id, value = item['id'], item['value']
        if debug: print(f"Mapping {i+1} of {len(mappings)}:  Processing {field_id}...")
        try:
            field_type = field_types.get(field_id)
            elem, tag, input_type = await get_element_info(page, field_id, field_type)
            
            if await should_skip(elem, tag, input_type, skip_prefilled):
                results['skipped'].append(field_id)
                continue
            
            await fill_element(page, elem, tag, input_type, field_id, value)
            results['filled'].append(field_id)
        except Exception as e:
            print(f"  Error filling {field_id}: {e}")
            results['errors'].append({'id': field_id, 'error': str(e)[:50]})
    return results

In [None]:
#| export
async def upload_pdf(page, pdf_path):
    """Upload the recommendation letter PDF"""
    file_input = page.locator('input[type="file"]').first
    await file_input.set_input_files(pdf_path)

In [None]:
#| export
async def process_url(page, url, recc_info, letter_text, pdf_path, model, debug=False):
    """Process a single recommendation URL"""
    await page.goto(url)
    html = await page.content()
    
    if debug: print("Scraping form fields")
    fields = scrape_form_fields(html)
    if debug: print(f"Found {len(fields)} fields")
    
    if debug: print(f"Calling LLM ({model}) to get field mappings")
    mappings = get_field_mappings(fields, recc_info, letter_text, model=model, debug=debug)
    if debug: print(f"Got {len(mappings)} mappings from LLM")
    
    if debug: print("Filling in form")
    results = await fill_form(page, mappings, fields, debug=debug)
    if debug: print(f"Filled: {len(results['filled'])}, Errors: {len(results['errors'])}")
    
    if debug: print("Uploading PDF") 
    await upload_pdf(page, pdf_path)
    if debug: print("Uploaded PDF")
    
    #input("Review the form, then press Enter to continue to next URL (or Ctrl+C to stop)...")

# `formalyzer` CLI script

In [None]:
#| export
def read_inputs(recc_info: str, pdf_path: str, urls: str):
    "reads all input files"
    recc_info, pdf_path = [os.path.expanduser(_) for _ in [recc_info, pdf_path]]
    recc_info = read_text_file(recc_info) 
    letter_text = read_pdf_text(pdf_path)
    if re.match(r"https?://", urls): urls = [urls]
    else: urls = read_urls_file(urls)
    return recc_info, letter_text, urls 

In [None]:
#| export
async def setup_browser():
    """Connect to Chrome with remote debugging"""
    from playwright.async_api import async_playwright

    pw = await async_playwright().start()
    browser = await pw.chromium.connect_over_cdp("http://localhost:9222")
    #page = await browser.new_page()
    context = browser.contexts[0]
    return pw, browser, context

In [None]:
#| export
async def run_formalyzer(recc_info: str, letter_text: str, urls: list, pdf_path: str, model: str, debug=False):
    """Main async workflow"""
    pw, browser, context = await setup_browser()
    try:
        for i, url in enumerate(urls):
            if not url.strip(): continue  # skip empty urls
            print(f"\nURL {i+1} of {len(urls)}: {url}")
            page = await context.new_page()  # get a new tab
            await process_url(page, url, recc_info, letter_text, pdf_path, model, debug=debug)
    finally:
        await browser.close()
        await pw.stop()

In [None]:
#| export
from fastcore.script import call_parse
import asyncio

@call_parse
def main(
    recc_info: str,   # text file with recommender name, address, etc
    pdf_path: str,    # name of PDF recc letter
    urls: str,        # txt file w/ one URL per line
    model: str='claude-sonnet-4-20250514',  # 'ollama/qwen2.5:14b' for local model
    debug: bool=False,  # best to always turn this on, actually
    ):
    recc_info, letter_text, urls = read_inputs(recc_info, pdf_path, urls)
    if debug:
        print(f"recc_info ({len(recc_info)} characters) =\n", recc_info)
        print(f"letter_text ({len(letter_text)} characters)=\n", letter_text)
        print("urls =\n", urls)
    
    asyncio.run(run_formalyzer(recc_info, letter_text, urls, pdf_path, model, debug))

In [None]:
#main("~/recc_info.txt", "~/recc_letter.pdf","~/recc_urls.txt", debug=True)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()