In [1]:
import os
import cv2

from langchain.agents import tool
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    HumanMessage,
    ToolMessage,
)

from datetime import datetime

from typing import Annotated, Sequence
from typing_extensions import TypedDict

from dotenv import dotenv_values

from motor_control import StepperMotorController


# Initialization
## API KEY
env_values = dotenv_values(".env")
os.environ["OPENAI_API_KEY"] = env_values["OPENAI_API_KEY"]

## Base chatbot
llm = init_chat_model("openai:gpt-4.1")

## Stepper motor controller
# controller = StepperMotorController("COM3")
# controller.connect()

In [2]:
def enumerate_cameras_detailed():
    """Enumerate cameras with more details"""
    index = 0
    cameras = []

    while True:
        cap = cv2.VideoCapture(index, cv2.CAP_DSHOW)  # Use DirectShow on Windows

        if not cap.isOpened():
            break

        # Get camera properties
        width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
        height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
        fps = cap.get(cv2.CAP_PROP_FPS)

        cameras.append(
            {
                "index": index,
                "width": int(width),
                "height": int(height),
                "fps": int(fps),
            }
        )

        cap.release()
        index += 1

    return cameras


# Get detailed camera info
cameras = enumerate_cameras_detailed()
for cam in cameras:
    print(f"Camera {cam['index']}: {cam['width']}x{cam['height']}")

Camera 0: 640x480
Camera 1: 1280x720
Camera 2: 640x480


In [3]:
## Message State
class State(TypedDict):
    # Messages have the type "list". The `add_messages` function
    # in the annotation defines how this state key should be updated
    # (in this case, it appends messages to the list, rather than overwriting them)
    messages: Annotated[Sequence[BaseMessage], add_messages]
    # The 'next' field indicates where to route to next
    next: str
    sender: str

In [5]:
def Capture_Image(camera_index: int = 0, save_path: str | None = None) -> dict:
    """
    Capture an image from the USB microscope (webcam).

    Args:
        camera_index: Camera device index (usually 0 for default camera)
        save_path: Optional path to save the captured image

    Returns:
        dict: Contains status, image data, and file path
    """
    try:
        # Initialize camera
        cap = cv2.VideoCapture(camera_index)

        if not cap.isOpened():
            return {"status": "error", "message": "Could not open camera"}

        # Set camera properties for better quality
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        cap.set(cv2.CAP_PROP_AUTOFOCUS, 0)  # Disable autofocus for manual control

        # Capture frame
        ret, frame = cap.read()
        cap.release()

        if not ret:
            return {"status": "error", "message": "Failed to capture image"}

        # Save image if path provided
        if save_path:
            cv2.imwrite(save_path, frame)
        else:
            # Generate timestamped filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            save_path = f"microscope_image_{timestamp}.jpg"
            cv2.imwrite(save_path, frame)

        return {
            "status": "success",
            "message": "Image captured successfully",
            "image_path": save_path,
            "image_data": frame,
            "shape": frame.shape,
        }

    except Exception as e:
        return {"status": "error", "message": f"Capture failed: {str(e)}"}


Capture_Image(2)

{'status': 'success',
 'message': 'Image captured successfully',
 'image_path': 'microscope_image_20250619_142607.jpg',
 'image_data': array([[[121, 125, 119],
         [120, 124, 118],
         [115, 129, 112],
         ...,
         [ 76, 112,  83],
         [ 86, 114,  90],
         [ 83, 111,  87]],
 
        [[126, 129, 120],
         [122, 125, 116],
         [114, 128, 111],
         ...,
         [ 80, 111,  78],
         [ 83, 111,  87],
         [ 78, 106,  82]],
 
        [[126, 131, 122],
         [124, 129, 120],
         [118, 126, 113],
         ...,
         [ 89, 109,  89],
         [ 84, 107,  91],
         [ 82, 105,  89]],
 
        ...,
 
        [[ 90,  98,  93],
         [ 98, 106, 101],
         [100, 105, 101],
         ...,
         [ 89, 125,  91],
         [ 92, 123,  90],
         [ 93, 124,  91]],
 
        [[100, 104, 103],
         [100, 104, 103],
         [100, 105, 101],
         ...,
         [ 86, 124,  90],
         [ 88, 125,  93],
         [ 88, 

In [None]:
@tool
def Change_Position(steps: int) -> None:
    if steps < 0:
        controller.move_motor("right", abs(steps))
    controller.move_motor("left", steps)


@tool
def Capture_Image(camera_index: int = 0, save_path: str | None = None) -> dict:
    """
    Capture an image from the USB microscope (webcam).

    Args:
        camera_index: Camera device index (usually 0 for default camera)
        save_path: Optional path to save the captured image

    Returns:
        dict: Contains status, image data, and file path
    """
    try:
        # Initialize camera
        cap = cv2.VideoCapture(camera_index)

        if not cap.isOpened():
            return {"status": "error", "message": "Could not open camera"}

        # Set camera properties for better quality
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        cap.set(cv2.CAP_PROP_AUTOFOCUS, 0)  # Disable autofocus for manual control

        # Capture frame
        ret, frame = cap.read()
        cap.release()

        if not ret:
            return {"status": "error", "message": "Failed to capture image"}

        # Save image if path provided
        if save_path:
            cv2.imwrite(save_path, frame)
        else:
            # Generate timestamped filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            save_path = f"microscope_image_{timestamp}.jpg"
            cv2.imwrite(save_path, frame)

        return {
            "status": "success",
            "message": "Image captured successfully",
            "image_path": save_path,
            "image_data": frame,
            "shape": frame.shape,
        }

    except Exception as e:
        return {"status": "error", "message": f"Capture failed: {str(e)}"}

In [None]:
graph_builder = StateGraph(State)