In [41]:
import sys
import os
import pandas as pd
from Workflow.google_storage_workflow import create_folder_if_not_exists, upload_csv_to_bucket, read_csv_from_gcs
from Models.gemini_model import GeminiModel
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from typing import Any, List, Tuple, Dict 
from pydantic import BaseModel, Field
import numpy as np
from pydantic import BaseModel
from Models.gpt_model import GPTModel
from Models.gemini_model import GeminiModel
from typing import List, Dict, Tuple, Any
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Union
from typing import Union
from bs4 import BeautifulSoup
from typing import Union
import pandas as pd
from google.cloud import storage
from urllib.parse import urlparse

In [42]:

def llm_imputer(df:pd.DataFrame, idx:int) -> Dict[str, Any]:
    """ 
    Execute LLM imputation task for a given SKU and return the output in a DataFrame

    Args:
        df: pd.DataFrame, PMA dataframe
        idx: int, index of the SKU to be processed
    Returns:
        dict: output in a dictionary format
    """

    input_df = df.copy()

    class SubAttribution(BaseModel):

        Has_Coffee_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has coffee makers or not as indicated in the scraped text"
        )
        Has_Coffee_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the coffee service classification."
        )
        Has_Vending_Machine_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has vending machines or not as indicated in the scraped text"
        )
        Has_Vending_Machine_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the vending machine service classification."
        )
        Has_Micro_Market_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has an in-house micro market or not as indicated in the scraped text"
        )
        Has_Micro_Market_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the micro market service classification."
        )

        Decision_Maker_Name: str = Field(
            description="Full name of the decision maker for operations/procurement/office management."
        )
        Decision_Maker_Title: str = Field(
            description="Job title of the decision maker for operations/procurement/office management."
        )
        Decision_Maker_Phone: str = Field(
            description="Phone number of the decision maker for operations/procurement/office management."
        )
        Decision_Maker_Email_Address: str = Field(
            description="Email address of the decision maker for operations/procurement/office management."
        )
        Business_Name: str = Field(
            description="Name of the business."
        )
        Business_Location: str = Field(
            description="Location of the business (Full address if available)."
        )
        Business_Summary: str = Field(
            description="A brief summary of the business, including its name, location, and any relevant details."
        )
        Canteen_Value_Proposition: str = Field(
            description="A value proposition that Canteen can communicate tailored to this specific businesses' needs around coffee, micro-markets, vending machines."
        )

    # Initialize LLM 
    model = GeminiModel()

    # Generate system instruction
    sys_inst = f'''
    You are an expert business leads parser.

    ### Task
    Using a chain-of-thought style reasoning, your task is to analyze the scraped text and determine if the business has coffee makers, vending machines, or an in-house micro market.
    You will also identify the decision maker for operations/procurement/office management, including their full name, job title, phone number, and email address.

    ### Input Format
    You will be provided with scraped text from a business website. 
    The text may contain various terms and phrases that could indicate the presence of coffee makers, vending machines, or an in-house micro market. 
    You will also look for the decision maker's information, including their full name, job title, phone number, and email address.

    ### Output Format
    Provide your evaluation as a JSON object in the following structure:
    {{
        Has_Coffee_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has coffee makers or not as indicated in the scraped text"
        )
        Has_Coffee_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the coffee service classification."
        )
        Has_Vending_Machine_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has vending machines or not as indicated in the scraped text"
        )
        Has_Vending_Machine_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the vending machine service classification."
        )
        Has_Micro_Market_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has an in-house micro market or not as indicated in the scraped text"
        )
        Has_Micro_Market_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the micro market service classification."
        )
        Decision_Maker_Name: str = Field(
            description="Full name of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Decision_Maker_Title: str = Field(
            description="Job title of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Decision_Maker_Phone: str = Field(
            description="Phone number of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Decision_Maker_Email_Address: str = Field(
            description="Email address of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Business_Summary: str = Field(
            description="A brief summary of the business, including its name, location, and any relevant details."
        )
        Canteen_Value_Proposition: str = Field(
            description="A value proposition that Canteen can communicate tailored to this specific businesses' needs around coffee, micro-markets, vending machines."
        )
    }}

    ### Constraints
    - The output must be a valid JSON object.
    - Do not include any additional text or explanations outside of the JSON object.
    - Ensure that the JSON object is well-structured and easy to read.


    '''

    # Generate user instruction
    user_inst = f''' 
       {input_df.loc[idx,'html_content_cleaned']} 
    '''

    # Generate response by leveraging LLM inference
    output = dict(model.generate_response(sys_inst, user_inst, SubAttribution, response_format_flag=True))
    print(output)
    total_prompt_io = sys_inst + user_inst + str(output)

    # Store the output in the inference dataframe
    for key, value in output.items():
        input_df.loc[idx, key] = value

    minimal_df = input_df.loc[[idx]]

    return minimal_df.to_dict(orient='records')[0]



def process_row(args: Tuple[pd.DataFrame, int, str]) -> Any:
    """
    Process a single row for subattribution.

    Args:
        args (Tuple[pd.DataFrame, int, str]): A tuple containing the DataFrame,
            the row index, and the source_type.
    
    Returns:
        Any: The result of the subattributor function.
    """
    inference_df, idx = args
    return llm_imputer(inference_df, idx)


def parallel_llm_evals(
    inference_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Apply the `llm_evaluator` function to each row of a DataFrame in parallel.

    This function parallelizes the process of generating subattributions for each
    row in the input DataFrame using multiple CPU cores.

    Args:
        inference_df (pd.DataFrame): The input DataFrame for which subattributions will be computed.
        source_type (str): The source type argument passed to the `subattributor` function.

    Returns:
        pd.DataFrame: A DataFrame containing the list of subattributions.
    """
    args_list: List[Tuple[pd.DataFrame, int, str]] = [
        (inference_df, idx) for idx, _ in inference_df.iterrows()
    ]
    
    with ProcessPoolExecutor() as executor:
        inner_list: List[Any] = list(
            executor.map(process_row, args_list)
        )

    return pd.DataFrame(inner_list)

In [43]:


def extract_text_from_html(html: Union[str, bytes]) -> str:
    """
    Strip out all HTML tags, scripts, styles, and metadata from an HTML document
    and return the visible text content.

    Parameters
    ----------
    html : str or bytes
        The raw HTML to clean.

    Returns
    -------
    str
        The concatenated, whitespace-normalized text extracted from the HTML.
    """
    # Parse the document
    soup = BeautifulSoup(html, "html.parser")

    # Remove elements that shouldn’t contribute to visible text
    for tag in soup(["script", "style", "head", "meta", "link", "noscript"]):
        tag.extract()

    # Get raw text and normalize whitespace
    text = soup.get_text(separator="\n")

    # Break into lines, strip each, and re-join non-blank lines
    lines = (line.strip() for line in text.splitlines())
    chunks = (chunk for line in lines for chunk in line.split("  ") if chunk)
    cleaned = "\n".join(chunk for chunk in chunks if chunk)

    return cleaned


In [44]:


def html_folder_to_dataframe(
    folder_path: Union[str, Path]
) -> pd.DataFrame:
    """
    Traverse a local folder recursively, locate all .html files,
    and assemble a pandas DataFrame with file names and raw HTML content.

    Parameters
    ----------
    folder_path : Union[str, pathlib.Path]
        Path to the root folder to traverse.

    Returns
    -------
    pd.DataFrame
        DataFrame with columns:
        - 'file_name': the name of the HTML file
        - 'html_content': the full text content of the HTML file

    Raises
    ------
    ValueError
        If `folder_path` does not exist or is not a directory.
    """
    folder = Path(folder_path)
    if not folder.exists() or not folder.is_dir():
        raise ValueError(f"Invalid folder path: {folder_path}")

    records = []
    # Traverse recursively for .html files
    for file in folder.rglob("*.html"):
        try:
            content = file.read_text(encoding="utf-8", errors="ignore")
        except Exception as e:
            # Skip files that can't be read
            continue
        records.append({
            "file_name": file.name,
            "html_content": content,
        })

    return pd.DataFrame(records)

In [45]:


def download_html_files_to_dataframe(
    bucket_name: str,
    prefix: str,
    client: storage.Client | None = None,
    metadata_parent: str = None,
    metadata_child: str = None
) -> pd.DataFrame:
    """
    Download all .html files under a given Google Cloud Storage prefix and
    return them in a DataFrame.

    Parameters
    ----------
    bucket_name : str
        Name of the GCS bucket (e.g., "my-bucket").
    prefix : str
        Folder path prefix within the bucket (e.g., "folder/subfolder").
    client : storage.Client | None, optional
        An existing Google Cloud Storage client. If None, a new client is created.
    metadata_parent : str, optional
        Metadata key for filtering parent files.
    metadata_child : str, optional
        Metadata key for filtering child files.
        
    Returns
    -------
    pd.DataFrame
        DataFrame with columns:
        - file_name: base name of each HTML file
        - html_content: the text content of the HTML file

    Raises
    ------
    ValueError
        If the bucket does not exist or no HTML files are found.
    """
    # Initialize client
    storage_client = client or storage.Client()
    bucket = storage_client.bucket(bucket_name)
    if not bucket.exists():
        raise ValueError(f"Bucket '{bucket_name}' does not exist.")

    records = []
    # List all blobs under the prefix
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)
    for blob in blobs:
        if not blob.name.lower().endswith(".html"):
            continue
        if blob.metadata.get("Parent") != metadata_parent and metadata_parent is not None:
            continue
        if blob.metadata.get("Child") != metadata_child and metadata_child is not None:
            continue
        try:
            content = blob.download_as_text(encoding="utf-8")
        except Exception as e:
            # Skip unreadable files
            continue
        file_name = blob.name.rsplit("/", 1)[-1]
        records.append({
            "file_name": file_name,
            "url": blob.metadata.get("url",""),
            "Parent Account": blob.metadata.get("Parent",""),
            "Child": blob.metadata.get("Child",""),
            "html_content": content,
        })

    if not records:
        raise ValueError(f"No HTML files found under '{prefix}' in bucket '{bucket_name}'.")

    return pd.DataFrame(records)


In [46]:

def aggregator(sitemap_df:pd.DataFrame, df:pd.DataFrame, idx:int) -> dict:
    """ 
    Execute LLM aggregator task for a given SKU and return the output in a DataFrame

    Args:
        df: pd.DataFrame, PMA dataframe
        idx: int, index of the SKU to be processed
    Returns:
        pd.DataFrame: output in a dataframe format
    """

    input_df = df.copy()
    sitemap_df = sitemap_df.copy()

    class SubAttribution(BaseModel):

        Has_Coffee_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has coffee makers or not as indicated in the scraped text"
        )
        Has_Coffee_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the coffee service classification."
        )
        Has_Vending_Machine_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has vending machines or not as indicated in the scraped text"
        )
        Has_Vending_Machine_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the vending machine service classification."
        )
        Has_Micro_Market_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has an in-house micro market or not as indicated in the scraped text"
        )
        Has_Micro_Market_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the micro market service classification."
        )

        Decision_Maker_Name: str = Field(
            description="Full name of the decision maker for operations/procurement/office management."
        )
        Decision_Maker_Title: str = Field(
            description="Job title of the decision maker for operations/procurement/office management."
        )
        Decision_Maker_Phone: str = Field(
            description="Phone number of the decision maker for operations/procurement/office management."
        )
        Decision_Maker_Email_Address: str = Field(
            description="Email address of the decision maker for operations/procurement/office management."
        )
        Business_Name: str = Field(
            description="Name of the business."
        )
        Business_Location: str = Field(
            description="Location of the business."
        )
        Business_Summary: str = Field(
            description="A brief summary of the business, including its name, location, and any relevant details."
        )
        Canteen_Value_Proposition: str = Field(
            description="A value proposition that Canteen can communicate tailored to this specific businesses' needs around coffee, micro-markets, vending machines."
        )

    # Initialize LLM 
    model = GeminiModel()

    # Generate system instruction
    sys_inst = f'''
    You are an expert business leads parser.

    ### Task
    Using a chain-of-thought style reasoning, your task is to analyze the scraped text and determine if the business has coffee makers, vending machines, or an in-house micro market.
    You will also identify the most relevant/populated decision makers for operations/procurement/office management, including their full name, job title, phone number, and email address.

    ### Input Format
    You will be provided with scraped and parsed texts from many business websites. 

    ### Output Format
    Provide your evaluation as a JSON object in the following structure:
    {{
        Has_Coffee_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has coffee makers or not as indicated in the scraped text"
        )
        Has_Coffee_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the coffee service classification."
        )
        Has_Vending_Machine_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has vending machines or not as indicated in the scraped text"
        )
        Has_Vending_Machine_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the vending machine service classification."
        )
        Has_Micro_Market_Service: str = Field(
            description="True, False, or undetermined. Indicates if the business has an in-house micro market or not as indicated in the scraped text"
        )
        Has_Micro_Market_Service_Explanation: str = Field(
            description="Brief CoT explanation of the reasoning behind the micro market service classification."
        )
        Decision_Maker_Name: str = Field(
            description="Full name of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Decision_Maker_Title: str = Field(
            description="Job title of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Decision_Maker_Phone: str = Field(
            description="Phone number of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Decision_Maker_Email_Address: str = Field(
            description="Email address of the decision maker for operations/procurement/office management. undetermined if none exists"
        )
        Business_Summary: str = Field(
            description="A brief summary of the business, including its name, location, and any relevant details."
        )
        Canteen_Value_Proposition: str = Field(
            description="A value proposition that Canteen can communicate tailored to this specific businesses' needs around coffee, micro-markets, vending machines."
        )
    }}

    ### Constraints
    - The output must be a valid JSON object.
    - Do not include any additional text or explanations outside of the JSON object.
    - Ensure that the JSON object is well-structured and easy to read.
    '''
    input_df = input_df.drop(columns=['html_content_cleaned','html_content'],axis=1)
    # Generate user instruction
    user_inst = f''' 
       {input_df.to_string(index=False)} 
    '''

    # Generate response by leveraging LLM inference
    output = dict(model.generate_response(sys_inst, user_inst, SubAttribution, response_format_flag=True))
    print(output)
    total_prompt_io = sys_inst + user_inst + str(output)

    # Store the output in the inference dataframe
    for key, value in output.items():
        sitemap_df.loc[idx, key] = value

    minimal_df = sitemap_df.loc[[idx]]

    return sitemap_df

In [47]:


def list_html_metadata(gs_path: str) -> List[Dict[str, Any]]:
    """
    List metadata for all .html files under a GCS folder path.

    Args:
        gs_path (str): GCS URI in the form "gs://BUCKET_NAME/path/to/folder/"

    Returns:
        List[Dict[str, Any]]: A list of metadata dicts, one per HTML file, e.g.:
            {
              "name": "path/to/folder/page.html",
              "size": 12345,
              "content_type": "text/html",
              "updated": "2025-05-09T12:34:56.789012Z",
              "metadata": {"url": "...", ...}  # any user‐set metadata
            }
    """
    # parse the gs:// URI
    parsed = urlparse(gs_path)
    if parsed.scheme != "gs":
        raise ValueError(f"Invalid GCS path: {gs_path!r}, must start with gs://")

    bucket_name = parsed.netloc
    prefix = parsed.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)

    results: List[Dict[str, Any]] = []
    # list all blobs under prefix (recursively)
    for blob in bucket.list_blobs(prefix=prefix):
        if blob.name.lower().endswith(".html"):
            results.append(blob.metadata)

    return results


In [48]:
def create_parent_child_df() -> pd.DataFrame:
    """
    Create a parent-child DataFrame from the HTML metadata in GCS.

    Returns:
        pd.DataFrame: A DataFrame with columns 'Parent-Child-ID', 'Parent', and 'Child'.
    """
    # List metadata for all HTML files in the GCS folder
    metadata_df = pd.DataFrame(list_html_metadata("gs://data-extraction-services/Canteen-Sales_Poc/results-round-2"))
    # Create hybrid ID for parent-child relationship
    metadata_df['Parent-Child-ID'] = metadata_df['Parent'] + "-" + metadata_df['Child']
    # Drop duplicates based on the hybrid ID
    parent_child_df = metadata_df.copy(deep=True)
    parent_child_df.drop_duplicates(
        subset=['Parent-Child-ID'],
        inplace=True
    )

    # 1b) Then narrow to just the three columns
    parent_child_df = parent_child_df[['Parent-Child-ID','Parent','Child']].reset_index(drop=True)
    parent_child_df.sort_values(by='Parent', inplace=True)
    parent_child_df = parent_child_df.reset_index(drop=True)
    # parent_child_df.to_csv("out.csv")
    return parent_child_df, metadata_df

In [49]:
sitemap_df = pd.read_csv("input/sitemap/sitemap_canteen_sales_leads.csv")
parent_child_df, metadata_df = create_parent_child_df()


In [None]:
def orchestrate_sitemap_parse(parent_child_df: pd.DataFrame) -> pd.DataFrame:
    """ 
        Orchestrate the parsing of a sitemap DataFrame and return the updated DataFrame.
        Args:
            sitemap_df: pd.DataFrame, DataFrame containing the sitemap data.
        Returns:    
            pd.DataFrame: Updated DataFrame with parsed data.
    """
    # Iterate over sites in the sitemap
    for idx, row in parent_child_df.iterrows():
        try:
            # Get the account name, address, and city from the sitemap
            parent = row['Parent']
            child = row['Child']
            print(f"Processing {parent} - {child}")
            # Download HTML files from GCS and convert to DataFrame
            df = download_html_files_to_dataframe(
                "data-extraction-services",
                f"Canteen-Sales_Poc/results-round-2",
                metadata_parent=parent,
                metadata_child=child
            )
            # Clean the HTML content
            df['html_content_cleaned'] = df['html_content'].apply(lambda x: extract_text_from_html(str(x)))

            # Perform LLM evaluation
            df = parallel_llm_evals(df)
            df.to_csv(f"intermediate-outputs/intermediate-{parent}-{child}.csv", index=False)

            # Aggregate the results via LLM and write back to the dataframe
            parent_child_df = aggregator(parent_child_df, df, idx)
        except Exception as e:
            print(f"Error processing {parent} - {child} - {e}")
            continue

    return parent_child_df

parent_child_df = orchestrate_sitemap_parse(parent_child_df)

Processing Aldelano Corp - Hickman Realty Group Inc.-Jack
{'Has_Coffee_Service': 'undetermined', 'Has_Coffee_Service_Explanation': 'There is no mention of coffee service in the scraped text, so it is undetermined.', 'Has_Vending_Machine_Service': 'undetermined', 'Has_Vending_Machine_Service_Explanation': 'There is no mention of vending machine service in the scraped text, so it is undetermined.', 'Has_Micro_Market_Service': 'undetermined', 'Has_Micro_Market_Service_Explanation': 'There is no mention of a micro market service in the scraped text, so it is undetermined.', 'Decision_Maker_Name': 'undetermined', 'Decision_Maker_Title': 'undetermined', 'Decision_Maker_Phone': 'undetermined', 'Decision_Maker_Email_Address': 'undetermined', 'Business_Name': 'Example Business', 'Business_Location': 'Example Location', 'Business_Summary': 'This is an example business located in an example location. More information is needed to provide a comprehensive summary.', 'Canteen_Value_Proposition': "Wi

Gemini call error on attempt 1: 503 502:Bad Gateway


{'Has_Coffee_Service': 'undetermined', 'Has_Coffee_Service_Explanation': 'The scraped text provided does not contain any information about coffee service.', 'Has_Vending_Machine_Service': 'undetermined', 'Has_Vending_Machine_Service_Explanation': 'The scraped text provided does not contain any information about vending machine service.', 'Has_Micro_Market_Service': 'undetermined', 'Has_Micro_Market_Service_Explanation': 'The scraped text provided does not contain any information about micro market service.', 'Decision_Maker_Name': 'undetermined', 'Decision_Maker_Title': 'undetermined', 'Decision_Maker_Phone': 'undetermined', 'Decision_Maker_Email_Address': 'undetermined', 'Business_Name': 'undetermined', 'Business_Location': 'undetermined', 'Business_Summary': 'The request could not be satisfied due to a CloudFront error, indicating a potential issue with the server configuration or high traffic. The specific business name and location are undetermined.', 'Canteen_Value_Proposition': "

Gemini call error on attempt 1: 503 502:Bad Gateway


{'Has_Coffee_Service': 'True', 'Has_Coffee_Service_Explanation': "The text mentions 'Cleveland, OH Office Coffee Services' and discusses options like 'bean-to-cup coffee machine', 'single-cup service', and 'traditional coffee brewer', indicating the presence of coffee service.", 'Has_Vending_Machine_Service': 'True', 'Has_Vending_Machine_Service_Explanation': "The text explicitly mentions 'Cleveland, OH Vending Services' and discusses 'snack, food, and beverage vending machines'.", 'Has_Micro_Market_Service': 'True', 'Has_Micro_Market_Service_Explanation': "The text mentions 'A Cleveland, OH Micro Market' and describes its benefits, including 'convenient access to hundreds of snacks, fresh foods, and drinks'.", 'Decision_Maker_Name': 'undetermined', 'Decision_Maker_Title': 'undetermined', 'Decision_Maker_Phone': 'undetermined', 'Decision_Maker_Email_Address': 'undetermined', 'Business_Name': 'American Food & Vending', 'Business_Location': 'Cleveland, OH', 'Business_Summary': 'American 

Gemini call error on attempt 1: 429 Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.


{'Has_Coffee_Service': 'undetermined', 'Has_Coffee_Service_Explanation': 'The provided text does not contain any information about the company having coffee service.', 'Has_Vending_Machine_Service': 'undetermined', 'Has_Vending_Machine_Service_Explanation': 'The provided text does not contain any information about the company having vending machines.', 'Has_Micro_Market_Service': 'undetermined', 'Has_Micro_Market_Service_Explanation': 'The provided text does not contain any information about the company having a micro market service.', 'Decision_Maker_Name': 'Kasey Muench', 'Decision_Maker_Title': 'Executive Director', 'Decision_Maker_Phone': 'undetermined', 'Decision_Maker_Email_Address': 'kasey@nwtntourism.com', 'Business_Name': 'Northwest Tennessee Tourism Association', 'Business_Location': 'Union City, Tennessee', 'Business_Summary': 'The Northwest Tennessee Tourism Association promotes tourism in the Northwest Tennessee region. They organize events like the NWTN Legislative Breakf

Gemini call error on attempt 2: 429 Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.


{'Has_Coffee_Service': 'undetermined', 'Has_Coffee_Service_Explanation': 'The scraped text does not provide any information to determine if the business has coffee makers or not.', 'Has_Vending_Machine_Service': 'undetermined', 'Has_Vending_Machine_Service_Explanation': 'The scraped text does not provide any information to determine if the business has vending machines or not.', 'Has_Micro_Market_Service': 'undetermined', 'Has_Micro_Market_Service_Explanation': 'The scraped text does not provide any information to determine if the business has an in-house micro market or not.', 'Decision_Maker_Name': 'Jeffrey A. McKenzie', 'Decision_Maker_Title': 'Partner', 'Decision_Maker_Phone': '+15025873594', 'Decision_Maker_Email_Address': 'Email me', 'Business_Name': 'Dentons', 'Business_Location': 'Louisville, KY', 'Business_Summary': "Dentons is the world's largest law firm, providing client services worldwide through its member firms and affiliates. Jeffrey A. McKenzie is a Partner in the Loui

Gemini call error on attempt 3: 429 Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.
Model call error on attempt 1: 429 Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.


{'Has_Coffee_Service': 'undetermined', 'Has_Coffee_Service_Explanation': 'The provided text does not mention any information related to coffee service.', 'Has_Vending_Machine_Service': 'undetermined', 'Has_Vending_Machine_Service_Explanation': 'The provided text does not mention any information related to vending machine service.', 'Has_Micro_Market_Service': 'undetermined', 'Has_Micro_Market_Service_Explanation': 'The provided text does not mention any information related to micro market service.', 'Decision_Maker_Name': 'Monica Heath', 'Decision_Maker_Title': 'undetermined', 'Decision_Maker_Phone': '(731) 352.2004', 'Decision_Maker_Email_Address': 'info@growmckenzie.com', 'Business_Name': 'McKenzie Chamber Of Commerce & Industry', 'Business_Location': '9 Broadway Street, McKenzie, TN 38201', 'Business_Summary': 'The McKenzie Chamber of Commerce & Industry is an organization dedicated to promoting business, enhancing economic and community development, and improving the quality of lif

In [None]:
#!/usr/bin/env python3
import argparse
from pathlib import Path

import pandas as pd

def concat_csvs(folder: str) -> pd.DataFrame:
    """
    Read every .csv in `folder`, drop html_content/html_content_cleaned if they exist,
    and return a single concatenated DataFrame.
    """
    folder = Path(folder)
    dfs = []
    for csv_path in folder.glob("*.csv"):
        df = pd.read_csv(csv_path)
        # drop unwanted columns silently if they don't exist
        df = df.drop(columns=["html_content", "html_content_cleaned"], errors="ignore")
        dfs.append(df)

    if not dfs:
        return pd.DataFrame()  # empty if no CSVs found
    return pd.concat(dfs, ignore_index=True)

concat_dfs = concat_csvs('intermediate-outputs')

concat_dfs.to_csv("output/concatenated_05092025.csv", index=False)

In [None]:
# concatenated_df = pd.read_csv("output/concatenated_05092025.csv")

In [None]:
# concatenated_df[concatenated_df['file_name'].str.contains("Bridge")]

In [12]:
parent_child_df.to_csv("output/parent_child.csv", index=False)

# Archive

In [None]:
def orchestrate_sitemap_parse(sitemap_df: pd.DataFrame) -> pd.DataFrame:
    """ 
        Orchestrate the parsing of a sitemap DataFrame and return the updated DataFrame.
        Args:
            sitemap_df: pd.DataFrame, DataFrame containing the sitemap data.
        Returns:    
            pd.DataFrame: Updated DataFrame with parsed data.
    """
    # Iterate over sites in the sitemap
    for idx in range(len(sitemap_df)):

        # Get the account name, address, and city from the sitemap
        account_name = sitemap_df.iloc[idx]['Account Name']
        address = sitemap_df.iloc[idx]['Address']
        city = sitemap_df.iloc[idx]['City']

        # Download HTML files from GCS and convert to DataFrame
        df = download_html_files_to_dataframe(
            "data-extraction-services",
            f"Canteen-Sales_Poc/results/{account_name} - {address} - {city}"
        )
        # Clean the HTML content
        df['html_content_cleaned'] = df['html_content'].apply(lambda x: extract_text_from_html(str(x)))

        # Perform LLM evaluation
        df = parallel_llm_evals(df)

        # Aggregate the results via LLM and write back to the dataframe
        sitemap_df = aggregator(sitemap_df, df, idx)

    return sitemap_df

sitemap_df = orchestrate_sitemap_parse(sitemap_df)

# local folder version
# df = html_folder_to_dataframe("./input/test/")

In [189]:
sitemap_df.to_csv("sitemap_test.csv")

# Other Approaches

In [194]:
import requests
from typing import Optional

def fetch_html(url: str, timeout: float = 10.0) -> str:
    """
    Retrieve the raw HTML content of a web page.

    Parameters
    ----------
    url : str
        The full URL of the page to fetch (e.g., "https://www.utilicor.ca/our-team/").
    timeout : float, optional
        How many seconds to wait for the server to send data before giving up.

    Returns
    -------
    str
        The HTML source of the page.

    Raises
    ------
    requests.exceptions.RequestException
        For network-related errors (connection issues, timeouts, DNS errors, etc.).
    ValueError
        If the server responds with a non-2xx HTTP status code.
    """
    try:
        response = requests.get(url, timeout=timeout)
        # Raise for HTTP errors (4xx/5xx)
        response.raise_for_status()
        return response.text
    except requests.exceptions.HTTPError as http_err:
        raise ValueError(f"Failed to fetch {url}: HTTP {response.status_code}") from http_err
    # Let other RequestExceptions bubble up



In [198]:
sitemap_df.to_csv("sitemap_test.csv", index=False)

In [196]:
fetch_html("https://www.jmcss.org")

''

# School Example

In [211]:
# account_name = sitemap_df.iloc[idx]['Account Name']
# address = sitemap_df.iloc[idx]['Address']
# city = sitemap_df.iloc[idx]['City']


schools_df = html_folder_to_dataframe("./input/schools/")

schools_df['html_content_cleaned'] = schools_df['html_content'].apply(lambda x: extract_text_from_html(str(x)))

# schools_df = parallel_llm_evals(schools_df)

# sitemap_df = aggregator(sitemap_df, schools_df, idx)

In [212]:
schools_df['html_content_cleaned'].values[0]

'Jackson-Madison County School System logo\nJackson-Madison County\nSchool System\nJMCSS Directory\nBoard of Education\nDepartments\nOffice of the Superintendent\nSchools\nJMCSS Schools\nAlexander Elementary\nAndrew Jackson\nArlington Elementary\nCommunity Montessori\nDenmark Elementary\nEast Elementary\nIsaac Lane Elementary\nJackson Academic STEAM Academy\nJackson Careers and Technology School\nJackson Central-Merry High School\nJackson Central-Merry Middle School\nJCM Early College High\nLiberty Technology High School\nLincoln Elementary\nMadison Academic High School\nNorth Parkway Middle School\nNorth Side High School\nNortheast Middle School\nNova Early Learning Center\nParkview Prep Academy\nPope Elementary\nRose Hill School\nSouth Elementary School\nSouth Side High School\nThelma Barker Elementary School\nWest Bemis Middle School\nMenus\nTransportation\nweCARE\nStudents\nParents\nEmployee Portal\nCommunications Department\nJoin Our Team!\nBest By Any Measure\nJackson-Madison Cou

In [126]:
import textwrap

In [127]:
print(textwrap.fill(extract_text_from_html(html),100))

About Utilibond See the Process Parts Catalogue Home About Us About Utilicor Our Team Contact Us
Testimonials Keyhole Technology What is Keyhole Technology? The Keyhole Process Benefits of Keyhole
Keyhole Application Coring Equipment Coring Equipment Overview UTC1870 Self-Propelled Coring Unit
Series 500 Heavy Duty Coring Truck MD 300 Coring Truck MPX-SS Coring Unit Minicor-3 Coring
Attachment MC-450 Coring Attachment MTC100 – Discontinued Parts Coring Drums & Center Bits Parts and
Tools Accessories Core Drum Re-Tipping Cold Weather Reinstatement System Diamond Core Bits Utilibond
What is Utilibond The Solution for Utility Cuts Utilibond Testing Utilibond Data Utilibond Additives
Reinstatement Process with Utilibond Approvals Municipal Keyhole Approvals Core Bond Testing Reports
& White Papers Resources Articles Videos Coring Tips Carbon Footprint Picture Gallery Banners and
Swag Product Bulletins FAQS Where to Buy Product Info Our Team Home Our Team The Utilicor Team is
comprised of a