In [1]:
import time
import subprocess
from typing import Any, Annotated, TypedDict, Literal

from langchain_openai import ChatOpenAI
from langgraph.graph import MessagesState, StateGraph, START
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool, StructuredTool

import pyrealsense2 as rs
import numpy as np
import cv2
import base64
from openai import OpenAI
from PIL import Image
import io
from IPython.display import display, Image as IPImage, HTML


In [3]:
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

_set_env("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "langchain-academy"

In [27]:
def initialize_camera():

    # Initialize the pipeline
    pipe = rs.pipeline()
    config = rs.config()
    
    # Enable color stream 
    config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)

    # Start Streaming 
    pipe.start(config)
    return pipe

def capture_frame(pipe):

    # Skip the initial frames to stabilize the camera
    for _ in range(20):
        pipe.wait_for_frames()

    frames = pipe.wait_for_frames()
    color_frame = frames.get_color_frame()

    if not color_frame:
        return None
    
    # Convert images to numpy arrays
    color_image = np.asanyarray(color_frame.get_data())

    # Convert from BGR to RGB
    color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
    return color_image_rgb

def prepare_image_for_api(image):
    # Convert numpy array to PIL Image
    pil_image = Image.fromarray(image)

    # Create a byte stream 
    byte_stream = io.BytesIO()

    # Save the image to the byte stream in JPEG format 
    pil_image.save(byte_stream, format='JPEG')

    # Get the byte value and encode to base64
    img_bytes = byte_stream.getvalue()
    base64_image = base64.b64encode(img_bytes).decode('utf-8')

    return base64_image, byte_stream.getvalue()


def analyze_image_with_openai(base64_image, prompt):
    client = OpenAI()

    try:
        response = client.chat.completions.create(
            model="gpt-4o",
             messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{base64_image}"
                            }
                        }
                    ]
                }
            ],
            max_tokens=300
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Error analyzing image: {str(e)}"



In [43]:
def capture_and_analyze_frame(prompt: str):
    """
    Captures a frame from the camera, prepares it for API analysis, and analyzes it using OpenAI.

    Returns:
        result: The analysis result from OpenAI or None if the frame couldn't be captured.
    """
    pipe = initialize_camera()

    try:
        # Capture a frame from the camera
        frame = capture_frame(pipe)

        if frame is not None:
            # Prepare the frame for API analysis
            base64_image, jpeg_bytes = prepare_image_for_api(frame)

            # Analyze the prepared image using OpenAI
            result = analyze_image_with_openai(base64_image, prompt)

            return result
        else:
            print("No frame captured.")
            return None
    
    finally:
        # Ensure the camera pipeline is stopped
        pipe.stop()


def list_usb_devices() -> list:
    """
    Retrieves a list of all USB devices connected to the system using the lsusb command.
    
    Returns:
        list: A list of strings representing USB devices with their details
    """
    devices = []
    result = subprocess.run(['lsusb'], capture_output=True, text=True, check=True)
    for line in result.stdout.strip().split('\n'):
        devices.append(line)
    return devices


In [44]:
# This is the default state same as "MessagesState" TypedDict but allows us accessibility to custom keys
class GraphsState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages] 
    # Custom keys for additional data can be added here such as - conversation_id: str

In [45]:
tools = [capture_and_analyze_frame, list_usb_devices]

# Initialize LLM 
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)


You are a helpful assistant with two primary capabilities:

   1. Vision Analysis
        You can analyze images captured by a camera and provide insightful, detailed observations in response to user queries.
        When a vision-related question is asked, carefully examine the provided image and describe relevant features, objects, contexts, or any other notable elements.

    2. Device Management
        You can identify and manage devices connected to the system, offering precise information and instructions based on their specifications or status.
        When a device-related question is asked, reference the current list of connected devices (or relevant data) to provide accurate, in-depth answers, including guidance on setup, configuration, troubleshooting, or usage.

Instructions

    For vision-related queries, provide clear, comprehensive image analyses.
    For device-related queries, deliver precise, actionable details that address the user’s request.
    Always strive to give concise, user-friendly, and contextually appropriate responses.

In [56]:
# System Message
sys_msg = SystemMessage(
    content=(""" You are a helpful assistant with two main capabilities:
    1. Vision Analysis: You can analyze images captured by the camera and provide detailed descriptions based on the user's query.
    2. Device Management: You can identify and manage devices connected to the system.
    
    For vision-related queries, you'll analyze images and provide detailed observations.
    For device-related queries, you'll provide precise and detailed responses based on the input lists"""
    )
)

# Node 
def assistant(state):
    """Process messages in the state and return LLM response."""
    return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

# Graph setup 
graph = StateGraph(GraphsState)
graph.add_node("assistant", assistant)
graph.add_node("tools", ToolNode(tools))

# Define edges
graph.add_edge(START, "assistant")
graph.add_conditional_edges(
    "assistant",
    tools_condition,
)
graph.add_edge("tools", "assistant")

# Compile graph
graph_runnable = graph.compile()

In [59]:
messages = [HumanMessage(content="What are the objects you see in the image")]
messages = graph_runnable.invoke({"messages": messages})

for m in messages['messages']:
    m.pretty_print()


What are the objects you see in the image
Tool Calls:
  capture_and_analyze_frame (call_koLTWaQHg3LOH1nefiIHWjAA)
 Call ID: call_koLTWaQHg3LOH1nefiIHWjAA
  Args:
    prompt: Identify the objects present in the scene from the captured image.
Name: capture_and_analyze_frame

The image shows a few objects and elements:

1. A person wearing a jacket and a gray shirt.
2. A television or monitor displaying an image of a person, located to the left of the scene.
3. A small green and black object on the table, possibly a computer mouse or a remote.
4. A table or desk surface.
5. A window with blinds in the background. 

The setting appears to be indoors, in a room with a desk or table.

The image contains the following objects and elements:

1. A person wearing a jacket and a gray shirt.
2. A television or monitor displaying an image of a person, located to the left of the scene.
3. A small green and black object on the table, possibly a computer mouse or remote.
4. A table or desk surface.
5

In [61]:
messages = [HumanMessage(content="is there a camera connected")]
messages = graph_runnable.invoke({"messages": messages})

for m in messages['messages']:
    m.pretty_print()


is there a camera connected
Tool Calls:
  list_usb_devices (call_r5bDSXhnJlNhBldhgqvW25AM)
 Call ID: call_r5bDSXhnJlNhBldhgqvW25AM
  Args:
Name: list_usb_devices

["Bus 002 Device 010: ID 8086:0b07 Intel Corp. RealSense D435", "Bus 002 Device 002: ID 0bda:0420 Realtek Semiconductor Corp. 4-Port USB 3.0 Hub", "Bus 002 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub", "Bus 001 Device 005: ID 062a:38df MosArt Semiconductor Corp. TIETI UltraSlim KB", "Bus 001 Device 004: ID 1bcf:08a0 Sunplus Innovation Technology Inc. Gaming mouse [Philips SPK9304]", "Bus 001 Device 003: ID 0bda:5420 Realtek Semiconductor Corp. 4-Port USB 2.0 Hub", "Bus 001 Device 002: ID 13d3:3549 IMC Networks Bluetooth Radio", "Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub"]

Yes, there is a camera connected. The connected camera is an Intel RealSense D435, identified by the device ID `8086:0b07`.
