# Building a Scraper tool

In [None]:
from langchain_core.runnables import Runnable
from langchain_openai import ChatOpenAI
from langchain_groq import ChatGroq

In [None]:
import nest_asyncio
nest_asyncio.apply() # to allow asynchronous calls in a jupyter notebook

In [None]:
import json
import re
from typing import List

from langchain_core.messages import AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field


class PriceInformation(BaseModel):
    """Information about a pricing"""
    price: str = Field(..., description="The price value as a string (e.g. $19.99)")
    currency: str = Field(..., description="The currency symbol or code (e.g. $ or USD)")
    description: str = Field(..., description="A brief description of the item or service")

class Prices(BaseModel):
    """Identifying all pricing information present in the screenshot or text."""
    prices: List[PriceInformation] = Field(..., description="A list of pricing information found on the page")


parser = PydanticOutputParser(pydantic_object=Prices)

In [None]:
scraper_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "user", 
            [
                {
                    "type": "text",
                    "text": """You are a specialized AI who is a part of a multi-agent system tasked with the scraping pricing information from screenshots of websites.
                    Below you are given a screenshot of a website. Analyze the image and extract all pricing details.
                    Return them as a JSON-formatted string that conforms to the following schema:
                    {format_instructions}
                    Return only a JSON-formatted string and nothing else.
                    If no pricing information is found, return `None`."""
                },
                {
                    "type": "image_url",
                    "image_url": {"url": "data:image/png;base64,{img}"}
                }
            ]
        )
    ]
).partial(format_instructions=parser.get_format_instructions())

In [None]:
import base64
import os
import json

def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')
  
class Scraper:
    def __init__(self, mm_llm, prompt, save_file : str = "scraped_data.jsonl"):
        self.mm_llm = mm_llm
        self.prompt = prompt
        self.parser = parser
        self.chain = prompt | mm_llm 

        self.save_file = save_file

        if not os.path.isfile(self.save_file):
           # create file
           with open(self.save_file, "w"):
              pass

    def run(self, img_path: str):
        # TODO(dominic): Currently just playing with images but eventually this needs to take in an AgentState
        # TODO(dominic): Investigate if we should use the parser as well to create a pydantic object.
        img = encode_image(image_path=img_path)

        result = self.chain.invoke({"img": img})
        # We may need to parse result
        with open(self.save_file, "a") as f:
           json_line = json.dumps(result.content)
           f.write(json_line + "\n")

        return result

        
scraper = Scraper(
    mm_llm=ChatGroq(model="llama-3.2-90b-vision-preview"),
    prompt = scraper_prompt,
)
result = scraper.run("../my_screenshots/lightning_ai_pricing.png")
print(result.content)

### A simplified integration into a graph

#### Synchronous setup

In [None]:
import random
from langchain_core.runnables import RunnableLambda, Runnable
from langgraph.graph import START, END, StateGraph
from langchain_groq import ChatGroq
from typing import TypedDict, Optional, Union

class AgentState(TypedDict):
    should_scrape: bool
    image_path: Optional[str]  # Image path can be None initially
    prediction: Optional[dict]  # Holds the agent's action decision
    observation: Optional[Union[str, dict]]  # The result from the Scraper or any output


# Step 1: Define a simple decision agent
class RandomDecisionAgent(Runnable):
    def run(self, state: AgentState):
        # Randomly decide if the image should be scraped
        state['should_scrape'] = random.choice([True, False])
        if state['should_scrape']:
            state['prediction'] = {"action": "Scraper"}
        else:
            state['prediction'] = {"action": "END"}
        return state

    def invoke(self, state: AgentState, *args, **kwargs):
        # Directly call the run method
        return self.run(state)


# Instantiate the decision agent
decision_agent = RandomDecisionAgent()

# Step 2: Define the Scraper class that uses a multimodal model chain
class Scraper(Runnable):
    def __init__(self, mm_llm, prompt, save_file: str = "scraped_data.jsonl"):
        super().__init__()
        self.chain = prompt | mm_llm
        self.save_file = save_file

        if not os.path.isfile(self.save_file):
            with open(self.save_file, "w"):
                pass

    def run(self, state: AgentState):
        img_path = state.get('image_path')
        if not img_path:
            raise ValueError("Image path is missing from the state.")
        
        # Encode the image (use your existing encode_image function)
        img = encode_image(img_path)
        
        # Run the multimodal LLM chain
        result = self.chain.invoke({"img": img})
        state['observation'] = result

        # save result to save_path
        with open(self.save_file, "a") as f:
           json_line = json.dumps(result.content)
           f.write(json_line + "\n")
           
        return state

    def invoke(self, state: AgentState, *args, **kwargs):
        # Directly call the run method
        return self.run(state)


# Step 3: Instantiate the LLM and Scraper tool
llm = ChatGroq(model="llama-3.2-90b-vision-preview", max_tokens=4096)
scraper_tool = Scraper(mm_llm=llm, prompt=scraper_prompt)  # Replace `scraper_prmopt` with your actual prompt

# Step 4: Build the LangGraph
graph_builder = StateGraph(AgentState)

# Add the decision agent node
graph_builder.add_node("agent", decision_agent)
graph_builder.add_edge(START, "agent")

# Add the Scraper node
graph_builder.add_node("Scraper", scraper_tool)
graph_builder.add_edge("Scraper", END)

# Define the function for conditional routing based on `should_scrape`
def select_tool(state: AgentState):
    action = state["prediction"]["action"]
    if action == "END":
        return END
    return action  # This will route to "Scraper" or "END"

graph_builder.add_conditional_edges("agent", select_tool)

# Compile the graph
graph = graph_builder.compile()

# Step 5: Run the graph with an initial state
initial_state = AgentState(image_path="../my_screenshots/lightning_ai_pricing.png")

# Execute the graph synchronously
result = graph.invoke(initial_state)

# Print the final observation
print(result)


In [None]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [None]:
def stream_graph_updates(initial_state: AgentState):
    # Stream the graph updates with the given initial state
    for event in graph.stream(initial_state):
        # Iterate over the event outputs and print them
        for node_name, output in event.items():
            if "observation" in output:
                print(f"{node_name} output: {output['observation']}")
            else:
                print(f"{node_name} event: {output}")

stream_graph_updates(initial_state=initial_state)

#### Asynchronous setup

In [None]:
import random
from langchain_core.runnables import RunnableLambda, Runnable
from langgraph.graph import START, END, StateGraph
from langchain_groq import ChatGroq
import asyncio
import aiofiles

from typing import TypedDict, Optional, Union

class AgentState(TypedDict):
    should_scrape: bool
    image_path: Optional[str] 
    prediction: Optional[dict]  # Holds the agent's action decision
    observation: Optional[Union[str, dict]]  # The result from the Scraper or any tool more generally


# Step 1: Define a simple decision agent
class RandomDecisionAgent(Runnable):
    """Randomly decides if we should scrape, to simulate an Agent"""
    async def run(self, state: AgentState):
        # Randomly decide if the image should be scraped
        state['should_scrape'] = random.choice([True, False])
        if state['should_scrape']:
            state['prediction'] = {"action": "Scraper"}
        else:
            state['prediction'] = {"action": "END"}
        return state

    async def ainvoke(self, state: AgentState, *args, **kwargs):
        
        return await self.run(state)

    def invoke(self, state: AgentState, *args, **kwargs):
        
        return asyncio.create_task(self.run(state))

decision_agent = RandomDecisionAgent()

# Step 2: Define the Scraper class that uses a multimodal model chain
# This can't be a chain built out of LCEL since we need to encode the image first. 
# ideally in our actual use case, one of the parameters of AgentState will be the encoded screenshot
class Scraper(Runnable):
    def __init__(self, mm_llm, prompt, save_file: str = "scraped_data.jsonl"):
        super().__init__()
        self.chain = prompt | mm_llm
        self.save_file = save_file

        if not os.path.isfile(self.save_file):
            with open(self.save_file, "w"):
                pass

    async def run(self, state: AgentState):
        img_path = state.get('image_path')
        if not img_path:
            raise ValueError("Image path is missing from the state.")
        
        print(f"Encoding image...")
        img = encode_image(img_path)
        
        print(f"Passing image to chain...")
        result = await self.chain.ainvoke({"img": img})        
        state['observation'] = result

        async with aiofiles.open(self.save_file, "a") as f:
            json_line = json.dumps(result.content)
            await f.write(json_line + "\n")

        return state

    async def ainvoke(self, state: AgentState, *args, **kwargs):
        return await self.run(state)

    def invoke(self, state: AgentState, *args, **kwargs):
        # TODO(dominic) probably not best practice to do this...
        return asyncio.create_task(self.run(state))


# Step 3: Instantiate the LLM and Scraper tool
llm = ChatGroq(model="llama-3.2-90b-vision-preview", max_tokens=4096)
scraper_tool = Scraper(mm_llm=llm, prompt=scraper_prompt)  # Replace `scraper_prmopt` with your actual prompt

# Step 4: Build the LangGraph
graph_builder = StateGraph(AgentState)

# Add the decision agent node
graph_builder.add_node("agent", decision_agent)
graph_builder.add_edge(START, "agent")

# Add the Scraper node
graph_builder.add_node("Scraper", scraper_tool)
graph_builder.add_edge("Scraper", END)

# Define the function for conditional routing based on `should_scrape`
def select_tool(state: AgentState):
    action = state["prediction"]["action"]
    if action == "END":
        return END
    return action  # This will route to "Scraper" or "END"

graph_builder.add_conditional_edges("agent", select_tool)

# Compile the graph
graph = graph_builder.compile()

In [None]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [None]:
# Step 5: Run the graph with an initial state using astream()
initial_state = AgentState(image_path="../my_screenshots/lightning_ai_pricing.png")
async def run_graph_and_collect_result(initial_state: AgentState):
    event_stream = graph.astream(initial_state)
    final_state = None

    async for event in event_stream:
        # Process each event (e.g., for debugging, step logging, etc.)
        # Here, you could add print statements or store intermediate results.
        print("Intermediate event:", event)
        print("Intermediate observation:", event['Scraper']['observation'].content if 'Scraper' in event else "")

        # Collect the final state once streaming is complete
        final_state = event

    return final_state

# Run the async function to execute the graph
result = await run_graph_and_collect_result(initial_state)

# Print the final observation
if result:
    print(result['Scraper']['observation'].content if "Scraper" in result else "No scraping")


In [None]:
result

### Extracting JSON string from result

Die folge code ist von https://python.langchain.com/docs/how_to/structured_output/

In [None]:
from langchain_groq import ChatGroq

llm = ChatGroq(model="llama3-8b-8192")



In [None]:
import json
import re
from typing import List

from langchain_core.messages import AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field


class PriceInformation(BaseModel):
    """Information about a pricing"""
    price: str = Field(..., description="The price value as a string (e.g. $19.99)")
    currency: str = Field(..., description="The currency symbol or code (e.g. $ or USD)")
    description: str = Field(..., description="A brief description of the item or service")

class Prices(BaseModel):
    """Identifying all pricing information present in the screenshot or text."""
    prices: List[PriceInformation] = Field(..., description="A list of pricing information found on the page")


parser = PydanticOutputParser(pydantic_object=Prices)




In [None]:
print(parser.get_format_instructions())

In [None]:
scraper_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "user", 
            [
                {
                    "type": "text",
                    "text": """You are a specialized AI who is a part of a multi-agent system tasked with the scraping pricing information from screenshots of websites.
                    Below you are given a screenshot of a website. Analyze the image and extract all pricing details.
                    Return them as a JSON-formatted string that conforms to the following schema:
                    {format_instructions}
                    Return only a JSON-formatted string and nothing else.
                    If no pricing information is found, return `None`."""
                },
                {
                    "type": "image_url",
                    "image_url": {"url": "data:image/png;base64,{img}"}
                }
            ]
        )
    ]
).partial(format_instructions=parser.get_format_instructions())

In [None]:
print(scraper_prompt.messages[0].prompt[0].template)

In [None]:
import base64
import os
import json

def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')
  
class Scraper:
    def __init__(self, mm_llm, prompt, save_file: str = "scraped_data.jsonl"):
        self.mm_llm = mm_llm
        self.prompt = prompt
        self.chain = prompt | mm_llm

        self.save_file = save_file

        if not os.path.isfile(self.save_file):
           # create file
           with open(self.save_file, "w"):
              pass

    def run(self, img_path: str):
        # TODO(dominic): Currently just playing with images but eventually this needs to take in an AgentState

        img = encode_image(image_path=img_path)

        result = self.chain.invoke({"img": img})
        # We may need to parse result
        with open(self.save_file, "a") as f:
           json_line = json.dumps(result.content)
           f.write(json_line + "\n")

        return result

        
scraper = Scraper(
    mm_llm=ChatGroq(model="llama-3.2-90b-vision-preview"),
    prompt = scraper_prompt
)
result = scraper.run("../my_screenshots/lightning_ai_pricing.png")
print(result.content)

In [None]:
from langchain_groq import ChatGroq

llm = ChatGroq(model="llama3-8b-8192")

In [None]:
from typing import List

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field


class Person(BaseModel):
    """Information about a person."""

    name: str = Field(..., description="The name of the person")
    height_in_meters: float = Field(
        ..., description="The height of the person expressed in meters."
    )


class People(BaseModel):
    """Identifying information about all people in a text."""

    people: List[Person]


# Set up a parser
parser = PydanticOutputParser(pydantic_object=People)

# Prompt
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Answer the user query. Wrap the output in `json` tags\n{format_instructions}",
        ),
        ("human", "{query}"),
    ]
).partial(format_instructions=parser.get_format_instructions())

In [None]:
query = "Anna is 23 years old and she is 6 feet tall"

print(prompt.invoke(query).to_string())

In [None]:
chain = prompt | llm | parser

chain.invoke({"query": query})