In [1]:
import toml
import traceback
from typing import List, Dict, Any, Optional, TypedDict
from pydantic import BaseModel, Field

from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END

import pymupdf4llm

from arcgis.gis import GIS
from arcgis.geocoding import geocode

from rich.pretty import pprint
from IPython.display import Markdown

In [None]:
# using GPT-4o
# TODO: teset with GPT-4.1
llm_config = toml.load("../config.toml")["configs"][2]
llm = AzureChatOpenAI(
    api_version=llm_config["api_version"],
    azure_deployment=llm_config["deployment_name"],
    api_key=llm_config["api_key"],
    azure_endpoint=llm_config["api_endpoint"],
    model=llm_config["model_name"],
    temperature=0,
)
llm.invoke("hi")

Setup a connection to GIS so the geocoder can be used


In [5]:
gis = GIS()

Load in the PDF file and export it to Markdown to preview it.


In [3]:
file_name = "PST_Inspection_Report_Ohio_University.pdf"
path = f"../files/{file_name}"
md_text = pymupdf4llm.to_markdown(path)
Markdown(md_text)

Create pydantic classes to control the output and manage the Graph state


In [6]:
class GeocodedAddress(BaseModel):
    site_id: str = Field(
        description="This will be provided by the incoming data.",
    )
    address: str
    latitude: float
    longitude: float


class InspectionReportAddress(BaseModel):
    site_id: str = Field(
        description="A random id of the address",
    )
    address: str
    is_site_address: bool = Field(
        default=False,
        description="Whether this address is the main address in the document.",
    )


class InspectionReportDetail(BaseModel):
    customer_id: str
    university_name: str
    inspection_summary: str = Field(
        description="A summary of the inspection, including the date, type, and any other relevant information. Limit to 5-6 sentences.",
    )
    addresses_from_document: List[InspectionReportAddress] = Field(
        description="List of any addresses found in the document. Do not add any '\n' characters to the address.",
    )
    certificate_number: str
    site_number: str
    inspection_type: str
    inspection_date: str


# Define the state for our graph
class GraphState(TypedDict):
    pdf_path: str  # Input PDF path
    pdf_text_as_markdown: Optional[str] = None  # Text extracted by pymupdf4llm
    inspection_report: Optional[InspectionReportDetail] = (
        None  # Structured data from LLM
    )
    geocoded_addresses: List[GeocodedAddress] = []
    arcgis_json: Optional[Dict[str, Any]] = None
    error: Optional[str] = None

Define a function to be used by the graph when we need to geocode an address


In [7]:
def geocode_address(
    address: InspectionReportAddress,
) -> Optional[GeocodedAddress]:  # Note Optional return type
    """Geocode an address using the arcgis api for python"""
    try:
        geocoded_result = geocode(address.address.strip().replace("\n", " "))
        if geocoded_result:
            return GeocodedAddress(
                site_id=address.site_id,
                address=address.address.strip().replace("\n", " "),
                latitude=geocoded_result[0]["location"]["y"],
                longitude=geocoded_result[0]["location"]["x"],
            )
        else:
            print(f"Geocoding failed for address: {address}")
            return None  # Return None if not found
    except Exception as e:
        print(f"Error geocoding address {address}: {e}")
        return None  # Return None on error

Create our base pdf parser chain to participate in the graph


In [8]:
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """
            You are an expert at scraping PDFs for information. You will be given the text of a PDF document in Markdown format. You will return a JSON object with the information you find that matches the given schema.
            
            PDF text: {text}
            """,
        )
    ]
)

pdf_parser_chain = prompt | llm.with_structured_output(
    InspectionReportDetail, method="function_calling"
)

Setup our "nodes" that will make up our graph


In [16]:
# Node 1: Read the PDF file to markdown text
def read_pdf(state: GraphState) -> Dict[str, Any]:
    """Reads the PDF from the path specified in the state."""
    print("--- Reading PDF ---")
    pdf_path = state.get("pdf_path")
    if not pdf_path:
        return {"error": "PDF path not provided in state."}
    try:
        md_text = pymupdf4llm.to_markdown(pdf_path)
        return {"pdf_text_as_markdown": md_text}
    except Exception as e:
        print(f"Error reading PDF: {e}")
        return {"error": f"Failed to read PDF: {traceback.format_exc()}"}


# Node 2: Parse the PDF text using the LLM chain
def parse_pdf_content(state: GraphState) -> Dict[str, Any]:
    """Parses the PDF text using the pre-defined LLM chain."""
    print("--- Parsing PDF Content ---")
    pdf_text_as_markdown = state.get("pdf_text_as_markdown")
    if not pdf_text_as_markdown:
        return {"error": "PDF text not available for parsing."}
    try:
        # Use the existing pdf_parser_chain defined in cell 9 (index 8)
        parsed_data = pdf_parser_chain.invoke({"text": pdf_text_as_markdown})
        return {"inspection_report": parsed_data}
    except Exception as e:
        print(f"Error parsing PDF content: {e}")
        return {"error": f"LLM failed to parse PDF content: {traceback.format_exc()}"}


# Node 3: Geocode addresses found in the report
def geocode_report_addresses(state: GraphState) -> Dict[str, Any]:
    """Geocodes the addresses found in the inspection report."""
    print("--- Geocoding Addresses ---")
    inspection_report = state.get("inspection_report")
    if not inspection_report or not inspection_report.addresses_from_document:
        print("No addresses found in the report to geocode.")
        # It's okay if there are no addresses, just move on.
        return {"geocoded_addresses": []}

    geocoded_list = []
    errors = []
    for address in inspection_report.addresses_from_document:
        if not address or not isinstance(
            address, InspectionReportAddress
        ):  # Skip empty or invalid entries
            print(f"Skipping invalid address entry: {address.address}")
            continue
        print(f"Geocoding: {address.address.strip()}")  # Added strip() for cleaner logs
        try:
            # Ensure the geocode_address function exists and is callable
            geocoded_result = geocode_address(address)  # Call the modified function
            if geocoded_result:
                geocoded_list.append(geocoded_result)
        except Exception as e:
            error_msg = f"Failed to geocode address '{address.address.strip()}': {e}"
            print(error_msg)
            errors.append(error_msg)  # Collect errors

    update_dict = {"geocoded_addresses": geocoded_list}
    if errors:
        # Optionally add accumulated errors to the state
        update_dict["error"] = (
            state.get("error", "") + "\nGeocoding Errors:\n" + "\n".join(errors)
        )

    return update_dict


# Node 4: Convert all properties to an ArcGIS compliant JSON object
def convert_to_arcgis_json(state: GraphState) -> Dict[str, Any]:
    """Converts all properties to an ArcGIS compliant JSON object."""
    print("--- Converting to ArcGIS JSON ---")
    inspection_report = state.get("inspection_report")
    if not inspection_report:
        return {"error": "Inspection report not found in state."}

    # Create a dictionary to store the ArcGIS compliant JSON object
    arcgis_json = {"attributes": {}, "geometry": {}}

    # loop through each property in the inspection report
    for prop in inspection_report:
        if isinstance(prop[1], list):
            continue

        # add the property to the attributes dictionary
        arcgis_json["attributes"][prop[0]] = prop[1]

    # add the geometry to the geometry dictionary
    addr = state.get("geocoded_addresses")[0]
    arcgis_json["geometry"] = {
        "x": addr.longitude,
        "y": addr.latitude,
        "spatialReference": {"wkid": 4326},
    }

    # return the ArcGIS compliant JSON object
    return {"arcgis_json": {"features": [arcgis_json]}}

Initialize the graph nodes, entry points and edges that control the flow of the graph


In [None]:
workflow = StateGraph(GraphState)

# Add nodes
workflow.add_node("read_pdf", read_pdf)
workflow.add_node("parse_pdf", parse_pdf_content)
workflow.add_node("geocode_addresses", geocode_report_addresses)
workflow.add_node("convert_to_arcgis_json", convert_to_arcgis_json)
# Define edges
workflow.set_entry_point("read_pdf")
workflow.add_edge("read_pdf", "parse_pdf")
workflow.add_edge("parse_pdf", "geocode_addresses")
workflow.add_edge("geocode_addresses", "convert_to_arcgis_json")
workflow.add_edge("convert_to_arcgis_json", END)  # End after geocoding


# Compile the graph
app = workflow.compile()

print("Graph compiled successfully!")

Execute the graph and print out statements and end result


In [None]:
# --- Configuration ---
print_debug_steps = False  # Set to False to hide intermediate step outputs

# --- Execution ---
# Define the input PDF path (use the 'path' variable from cell 3 (index 2))
initial_state = {"pdf_path": path}

print(f"--- Running Graph with Input: {initial_state} ---")
print(f"--- Debug Steps Printing: {'ENABLED' if print_debug_steps else 'DISABLED'} ---")


final_state_from_stream: Optional[GraphState] = (
    None  # Initialize to hold the last state
)

# Stream the execution step by step, yielding the full state after each step
for state_after_step in app.stream(initial_state, stream_mode="values"):
    # Print intermediate steps only if the flag is True
    if print_debug_steps:
        print("-" * 30)
        print("Current State:")
        # state_after_step is the full GraphState dictionary at this point
        pprint(state_after_step, expand_all=True)
        print("-" * 30)

    # Always keep track of the latest state, regardless of printing
    final_state_from_stream = state_after_step


# After the loop, final_state_from_stream holds the final state
print("\\n" + "=" * 40)
print("--- Final State (from stream) ---")
print("=" * 40)

if final_state_from_stream:
    # Print the final state regardless of the debug flag
    pprint(final_state_from_stream, expand_all=True)

    # Access and print final state elements
    print("\\nAccessing final state elements:")
    report = final_state_from_stream.get("inspection_report")
    if report:
        print(f"  University Name: {report.university_name}")
        print(f"  Inspection Date: {report.inspection_date}")
    else:
        print("  Inspection report not found in final state.")

    geocoded_addrs = final_state_from_stream.get("geocoded_addresses", [])
    print(f"  Number of Geocoded Addresses: {len(geocoded_addrs)}")
    for gc_addr in geocoded_addrs:
        print(f"    - {gc_addr.address}: ({gc_addr.latitude}, {gc_addr.longitude})")

    error = final_state_from_stream.get("error")
    if error:
        print(f"\\n  Errors encountered during execution: {error}")
    else:
        print("\\n  No errors reported in final state.")
else:
    print("Graph execution did not yield a final state.")