In [1]:
from langchain_openai import ChatOpenAI

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, MessagesState, StateGraph
from langgraph.graph.message import add_messages

from langchain_core.messages import (
    HumanMessage, 
    AIMessage, 
    SystemMessage,
    BaseMessage,
    trim_messages
)

from langchain_core.prompts import (
    ChatPromptTemplate, 
    MessagesPlaceholder,
    HumanMessagePromptTemplate,
    SystemMessagePromptTemplate
)

from typing import Sequence

from typing_extensions import Annotated, TypedDict

import base64
from mimetypes import guess_type

import os

In [2]:
os.environ["OPENAI_API_KEY"] = ""

In [3]:
model = ChatOpenAI(model="gpt-4o")

## Image to URL

In [4]:
# Function to encode a local image into data URL 
def local_image_to_data_url(image_path):
    mime_type, _ = guess_type(image_path)
    # Default to png
    if mime_type is None:
        mime_type = 'image/png'

    # Read and encode the image file
    with open(image_path, "rb") as image_file:
        base64_encoded_data = base64.b64encode(image_file.read()).decode('utf-8')

    # Construct the data URL
    return f"data:{mime_type};base64,{base64_encoded_data}"

In [5]:
trimmer = trim_messages(
    max_tokens=5,
    strategy="last",
    token_counter=len,
    include_system=True,
    allow_partial=False,
    # start_on="human",
)

## Prompt Template

In [None]:
# image_prompt_template = ChatPromptTemplate.from_messages(
#     messages=[
#         (
#             "system",
#             """You are a robot controller. You will give instructions to a jetbot to navigate to a {object} based on an image input on what the robot sees.
#             If there is a wall in front, turn away from the wall. """,
#        # Roleplay setting, tell it what it is, setting, location, objective, details on the task.
#         ),
#         # Examples [3 examples of it spotting a dog, empty screen, 3 obstacles, Wall, Examples of continuation.]
#         HumanMessage(content="These are the instructions you have given so far."), 
#         # Previous instructions given by the llm.
#         MessagesPlaceholder(variable_name="messages"), 
        
#         # Prompt: Describe the image, in the context of previous instructions, reason on direction of travel. 
#         HumanMessage(content="""Now, quickly summarize the recent instructions you have given so far. Describe the following image. 
#                      If there is a {object} describe which direction the robot should travel to approach the {object}. 
#                      Else, specify a direction the robot should move to find a new {object}. Strictly write your response in this format:
#                      '<reasoning>.[<command><value>]', where reasoning is the rationale behind the chosen move. Available commands are t for turn value (int) in degrees
#                      positive for left and negative for right. And f for forward value (int) seconds"""),
#         HumanMessagePromptTemplate.from_template(
#             [{'image_url': {'url': '{image_path}', 'detail': '{detail_parameter}'}}]
#         )
#     ]
# )

## Example

## Test

In [8]:

def message_to_string(trimmings):
    n = 1
    result = ""
    for message in trimmings:
        result += "\n" + f"Instruction Number {n}:" + message.content
        n += 1
    result += "End of Instructioins you have given so far."
    return result

In [9]:
class State(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    detail_parameter: str
    image_path: str
    


workflow = StateGraph(state_schema=State)


def call_model(state: State):
    trimmed_messages = trimmer.invoke(state["messages"])
    prompt = image_prompt_template.invoke(
        {"messages": message_to_string(trimmed_messages), "detail_parameter": state["detail_parameter"], "image_path": state["image_path"]}
    )
    print(prompt)
    response = model.invoke(prompt)
    return {"messages": [response]}


workflow.add_edge(START, "model")
workflow.add_node("model", call_model)

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

In [None]:
config = {"configurable": {"thread_id": "abc456"}}
object = "dog"
detail_parameter = 'high'
path = "images/imageofdog.png"
# url = local_image_to_data_url(path)
url = "https://thumbs.dreamstime.com/b/empty-grass-field-24508096.jpg"

output = app.invoke(
    {"object": object, "detail_parameter": detail_parameter, "image_path": url},
    config,
)
output["messages"][-1].pretty_print()

In [None]:
# object = "dog"
# detail_parameter = 'high'
path = "images/corner.png"
# url = local_image_to_data_url(path)
url = "https://hips.hearstapps.com/hmg-prod/images/dog-puppy-on-garden-royalty-free-image-1586966191.jpg?crop=0.752xw:1.00xh;0.175xw,0&resize=1200:*"
output = app.invoke(
    {"object": object, "detail_parameter": detail_parameter, "image_path": url},
    config,
)
output["messages"][-1].pretty_print()

In [None]:
state = app.get_state(config).values

for message in state["messages"]:
    message.pretty_print()

## BRG8 to PNG

In [None]:
from PIL import Image
import numpy as np

def brg8_to_png(brg8_data, width, height, output_path):
    """
    Converts BRG8 data to PNG format and saves it.
    
    :param brg8_data: A flat byte array or numpy array with BRG8 data.
    :param width: Width of the image.
    :param height: Height of the image.
    :param output_path: Path to save the PNG file.
    """
    # Ensure the input data is a numpy array
    if not isinstance(brg8_data, np.ndarray):
        brg8_data = np.array(brg8_data, dtype=np.uint8)

    # Reshape the data into (height, width, 3) format
    brg_image = brg8_data.reshape((height, width, 3))

    # Convert BRG to RGB by rearranging the channels
    rgb_image = brg_image[:, :, [1, 2, 0]]  # [Red, Green, Blue]

    # Create a Pillow Image from the RGB data
    img = Image.fromarray(rgb_image, 'RGB')

    # Save as PNG
    img.save(output_path)
    print(f"Image saved to {output_path}")

# Example Usage:
# Simulated BRG8 data (flat array)
width, height = 640, 480  # Image dimensions
brg8_data = np.random.randint(0, 256, size=(height * width * 3), dtype=np.uint8)

# Convert and save to PNG
brg8_to_png(brg8_data, width, height, "output.png")
