In [None]:
from bs4 import BeautifulSoup
import requests
import re
import chromadb
import uuid
from ollama import chat, ChatResponse, show

In [None]:
CHROMA_URL = "10.10.2.8" # URL to your ChromaDB instance; can be localhost
CHROMA_PORT = 8000 # Port for ChromaDB instance
OLLAMA_MODEL = "gemma3" # Ollama model to use for chat
BILL_URL = "https://www.congress.gov/119/bills/hr1/generated/BILLS-119hr1eas.html" # URL to the bill text; Get from congress.gov by searching for a bill and clicking the HTML link

In [None]:

def extract_bill_text_from_html(html: str) -> str:
    """
    Extracts the main text of a bill from HTML content, removing navigation, headers, footers, scripts, styles, and hyperlinks.
    Args:
        html (str): The HTML content of the bill page.
    Returns:
        str: The cleaned text of the bill.
    """
    soup = BeautifulSoup(html, 'html.parser')
    # Remove navigation, headers, footers, scripts, styles
    for tag in soup.select("nav, header, footer, script, style"):
        tag.decompose()

    # Remove hyperlinks but keep the text
    for a in soup.find_all('a'):
        a.replace_with(a.get_text())

    # Get all visible text
    text = soup.get_text(separator='\n')

    # Collapse excessive whitespace
    clean_lines = [line.strip() for line in text.splitlines() if line.strip()]
    return '\n'.join(clean_lines)

def scrape_congress_bill(url):
    """
    Scrapes the bill text from the given URL and returns the cleaned text.
    Args:
        url (str): The URL of the bill page on congress.gov.
    Returns:
        str: The cleaned bill text.
    Raises:
        requests.HTTPError: If the request to the URL fails.
    """
    response = requests.get(url)
    response.raise_for_status()
    response.encoding = 'utf-8'
    return extract_bill_text_from_html(response.text)

def has_toc(body: str) -> bool:
    """
    Checks if the bill text contains a table of contents by looking for multiple instances of "TITLE I—". This is the best way I could think of in a short amount of time. Sue me.
    Args:
        body (str): The text of the bill.
    Returns:
        bool: True if a table of contents is found, False otherwise.
    """
    # looks for more than 1 instance of "TITLE I—" indicating a table of contents
    return len(re.findall(r'TITLE\s+[IVXLCDM]+\s*—', body)) > 1
def validate_bill_text(body: str) -> bool:
    """
    Validates if the bill text contains at least one section by checking for at least one instance of "TITLE I—".
    Args:
        body (str): The text of the bill.
    Returns:
        bool: True if the bill text is valid (contains at least one section), False otherwise.
    """
    return len(re.findall(r'TITLE\s+[IVXLCDM]+\s*—', body)) > 0

def extract_body(body: str) -> str:
    """
    Extracts the main body of the bill text, removing the table of contents if present.
    Args:
        body (str): The text of the bill.
    Returns:
        str: The main body of the bill text, with the table of contents removed if it exists.
    """
    if not has_toc(body):
        return body.strip()
    toc_start = body.find('TITLE I—')
    body_start = body.find('TITLE I—', toc_start + 1)
    return body[body_start:].strip()

In [None]:
def get_sections(body:str) -> list:
    """
    Splits the bill body into sections based on the "SEC." pattern.
    Args:
        body (str): The text of the bill body.
    Returns:
        list: A list of sections, each starting with "SEC.".
    """
    sections = []
    sec_pattern = re.compile(r'(SEC\.\s+.*?)(?=SEC\.|$)', re.DOTALL)
    matches = sec_pattern.findall(body)
    for match in matches:
        section_text = match.strip()
        if section_text:
            sections.append(section_text)
    return sections
def get_subsections(section: str) -> list:
    """
    Splits a section into subsections, which are typically denoted by "(a)", "(b)", etc.
    Args:
        section (str): The text of the section.
    Returns:
        list: A list of subsections, each starting with "(a)", "(b)", etc
    """
    subsections = []
    subsec_pattern = re.compile(r'\n\([a-z]\)\s+.*?(?=\n\([a-z]\)|$)', re.DOTALL)
    matches = subsec_pattern.findall(section)
    for match in matches:
        subsection_text = match.strip()
        if subsection_text:
            subsections.append(subsection_text)
    return subsections
def get_divisions(subsection: str) -> list:
    """
    Splits a subsection into divisions, which are typically denoted by "“(1)", "“(2)", etc. I don't know what they're actually called so I chose divisions.
    Looks for the weird quotes following a newline and a number in parentheses.
    Args:
        subsection (str): The text of the subsection.
    Returns:
        list: A list of divisions, each starting with "“(1)", "“(2)", etc.
    """
    divisions = []
    div_pattern = re.compile(r'\n“\(\d+\)\s+.*?(?=\n“\(\d+\)|$)', re.DOTALL)
    matches = div_pattern.findall(subsection)
    for match in matches:
        division_text = match.strip()
        if division_text:
            divisions.append(division_text)
    return divisions

def extract_section_title(section: str) -> str:
    """
    Extracts the title of a section from the section text.
    Args:
        section (str): The text of the section.
    Returns:
        str: The title of the section, which is the text following "SEC." up to the next newline.
    """
    sec_index = section.index("SEC")
    newline_index = section.index("\n", sec_index)
    if newline_index == -1:
        return section[sec_index:].strip()
    return section[sec_index:newline_index].strip()
def extract_section_number(section: str) -> str:
    """
    Extracts the section number from the section text.
    Args:
        section (str): The text of the section.
    Returns:
        str: The section number, which is the text following "SEC." up to the next period.
    Raises: 
        ValueError: If "SEC." is not found in the section text.
    """
    sec_index = section.index("SEC.")
    end_index = section.index(".", sec_index + 4)
    if end_index == -1:
        return section[sec_index:].strip()
    return section[sec_index + 4:end_index].strip()

def collapse_whitespace(text: str) -> str:
    """
    Collapses multiple whitespace characters into a single space and trims leading/trailing whitespace. This is useful because of the goofy way congress does formatting.
    Args:
        text (str): The text to collapse whitespace in.
    Returns:
        str: The text with collapsed whitespace.
    """
    return re.sub(r'\s+', ' ', text).strip()
def number_to_letter(number: int) -> str:
    """
    Converts a number to a letter or two-letter combination based on the pattern used in sections of bills.
    For example, 1 -> 'a', 2 -> 'b', ..., 26
    -> 'z', 27 -> 'aa', 28 -> 'ab', ..., 52 -> 'az', 53 -> 'ba', ..., 702 -> 'zz'.
    Args:
        number (int): The number to convert (1 to 702).
    Returns:    
        str: The corresponding letter or two-letter combination.
    Raises:
        ValueError: If the number is not between 1 and 702 inclusive.
    702 is the highest number that can be converted to a two-letter combination in this scheme
    """
    if number < 1 or number > 702:  # 702 is 'zz'
        raise ValueError("Number must be between 1 and 702 inclusive.")
    if number <= 26:
        return chr(ord('a') + number - 1)
    else:
        first_letter = chr(ord('a') + (number - 1) // 26 - 1)
        second_letter = chr(ord('a') + (number - 1) % 26)
        return first_letter + second_letter
    
def get_section_by_number(sections: list, section_number: str) -> str:
    """
    Retrieves a section from the list of sections by its section number.
    Args:
        sections (list): A list of sections, each starting with "SEC.".
        section_number (str): The section number to search for.
    Returns:
        str: The section text if found, None otherwise.
    """
    for section in sections:
        if extract_section_number(section) == section_number:
            return section
    return None
    
def build_messages(user: str, system: str = None) -> list:
    """
    Builds a list of messages for the Ollama chat API.
    Args:
        user (str): The user's message.
        system (str, optional): The system message to set the context. Defaults to None.
    Returns:
        list: A list of messages formatted for the Ollama chat API.
    The list will contain a system message if provided, followed by the user's message.
    """
    messages = []
    if system:
        messages.append({"role": "system", "content": system})
    messages.append({"role": "user", "content": user})
    return messages

def get_ollama_response(messages: list) -> str:
    """
    Sends a chat request to the Ollama API with the provided messages and returns the response content.
    This currently only supports a locally running Ollama instance. Changes need to be made here to support remote instances.
    Args:
        messages (list): A list of messages formatted for the Ollama chat API.
    Returns:
        str: The content of the response message from the Ollama API.
    Raises:
        ValueError: If the response from the Ollama API is empty or invalid.
    """
    response: ChatResponse = chat(
        model=OLLAMA_MODEL,
        messages=messages
    )
    return response.message.content

In [None]:
def query_vector_store(query: str):
    """
    Queries the ChromaDB collection for relevant sections based on the provided query.
    Args:
        query (str): The query text to search for in the bill sections.
    Returns:
        dict: The results from the ChromaDB query, containing relevant sections and their metadata.
    Raises:
        chromadb.errors.ChromaError: If there is an error querying the ChromaDB collection.
    This function uses the ChromaDB client to perform a vector search on the collection.
    """
    results = collection.query(
        query_texts=[query],
        n_results=10
    )
    return results

def query_bill(query: str) -> str:
    """
    Queries the bill for relevant sections based on the provided query and returns the response from the Ollama chat API. It uses the question to find relevant sections in the bill and then uses those sections to answer the question using the model selected.
    Args:
        query (str): The question or query about the bill.
    Returns:
        dict: A dictionary containing the relevant sections of the bill, their section numbers, and the response from the Ollama chat API.
    Raises:
        ValueError: If the query is empty or if no relevant sections are found.
    This function first queries the vector store (ChromaDB) to find relevant sections based on the query.
    """
    results = query_vector_store(query)
    rel_sections = set()
    for r in results['metadatas'][0]:
        rel_sections.add(r["section_number"])

    rel_sections_str = ""
    for s in rel_sections:
        section = get_section_by_number(sections, s)
        if section:
            rel_sections_str += f"{section}\n\n"
    system_prompt = "You are a helpful assistant that provides information about US Congress bills. You will be given a question about a bill and the relevant sections of the bill. Use the sections to answer the question. If the question is not related to the bill, respond with something like 'There's nothing in the bill related to that query'."
    user_prompt = ""
    if rel_sections_str:
        user_prompt = f"Relevant sections of the bill:\n\n{rel_sections_str}\n\n"
    user_prompt += f"Question: {query}"
    messages = build_messages(user_prompt, system_prompt)
    response = get_ollama_response(messages)
    ret = {
        "relevant_sections": rel_sections_str,
        "relevant_section_numbers": list(rel_sections),
        "response": response
    }
    return ret
    

In [None]:
# Get the bill text from congress.gov
scraped_text = scrape_congress_bill(BILL_URL)

In [None]:
# Extract just the body of the bill text ignoring the table of contents if it exists
body = extract_body(scraped_text)

In [None]:
# Get the sections of the bill body

sections = get_sections(body)

In [None]:
# Divde the sections into subsections and divisions, and prepare the data for insertion into ChromaDB
bill_data = []
for section in sections:
    number = extract_section_number(section)
    print(f"Processing section: {number}")
    title = extract_section_title(section)
    
    subsections = get_subsections(section)
    if not subsections:
        # If no subsections, treat the section as a single subsection
        subsections = [section]
    subsection_count = 0
    for subsection in subsections:
        subsection_count += 1
        divisions = get_divisions(subsection)
        if not divisions:
            # If no divisions, treat the subsection as a single division
            divisions = [subsection]
        division_count = 0
        for division in divisions:
            division_count += 1
            bill_data.append({
                'section_title': title,
                'section_number': number,
                'subsection_letter': number_to_letter(subsection_count),
                'division_number': division_count,
                'division_text': collapse_whitespace(division)
            })

In [None]:
# Connect to ChromaDB
chroma_client = chromadb.HttpClient(host=CHROMA_URL, port=CHROMA_PORT)
chroma_client.heartbeat()

In [None]:
# Create or get the collection for the bill
# If you want to delete the collection first, uncomment the next line

#chroma_client.delete_collection("Bill")
collection = chroma_client.get_or_create_collection("Bill")

In [None]:
# Add the bill data to the ChromaDB collection
# Each document is a division of a subsection, with metadata for section title, section number, and subsection letter
# This allows for citations and sourcing in the chat responses

for d in bill_data:
    collection.add(
        documents=[d['division_text']],
        metadatas=[{
            'section_title': d['section_title'],
            'section_number': d['section_number'],
            'subsection_letter': d['subsection_letter'],
            'division_number': d['division_number']
        }],
        ids=[str(uuid.uuid4())]
    )

In [None]:
# Question to ask about the bill
# Edit this to ask your question about the bill

question = "What, if anything, does this bill say about items covered by NFA?"
investigation = query_bill(question)

In [None]:
print(investigation["response"])
