# Building LLM Agents with LLama3 and LangGraph

## What You'll Learn

In this notebook, you will learn how to build and deploy LLM (Large Language Model) agents using LLama3 and LangGraph. These agents will be capable of processing natural language inputs, generating comprehensive plans, and handling complex workflows. By the end of this notebook, you will understand how to:

1. Set up and configure a language model using LLama3.
2. Define and manage the state for agents in a workflow.
3. Implement and customize agent classes for specific tasks.
4. Construct and compile a workflow graph using LangGraph.
5. Execute the workflow and handle outputs effectively.

## Basic Concepts

Before diving into the implementation, it's essential to understand some basic concepts:

- **LLM (Large Language Model):** A machine learning model trained on vast amounts of text data to understand and generate human-like language. LLama3 is an example of such a model.

- **Agent:** A software component that interacts with the LLM to perform specific tasks, such as generating responses or processing information. In this notebook, we implement agents that can handle various aspects of a workflow.

- **State:** A shared data structure that stores the context and data required by agents. The state is critical for maintaining continuity and passing information between different parts of the workflow.

- **Workflow Graph:** A structured representation of the workflow, where nodes represent agents and edges define the flow of information and control. LangGraph is used to construct and manage these workflow graphs.

- **Prompt Engineering:** The process of crafting prompts that guide the LLM's responses. Proper prompt engineering is crucial for ensuring the model generates relevant and accurate outputs.

Understanding these concepts will provide a solid foundation as we proceed with the practical implementation of LLM agents and workflows in this notebook.

## 1. Setting Up the Environment

Before we dive into building our agent, we need to set up the necessary environment. This involves installing required packages and ensuring our Python environment is ready for development.

In [None]:
# Install termcolor for colored terminal outputs
%pip install termcolor langgraph pyaudio faster-whisper

In [None]:
# Import necessary libraries
from termcolor import colored
import json
import requests
from faster_whisper import WhisperModel
import queue

## 2. Model Configuration

In this section, we set up the configuration for the Ollama model.

### Setting Up the Ollama Model

The `setup_ollama_model` function configures the model settings, including the endpoint, model name, system prompt, and other parameters. This setup is essential for initializing the model with the correct configuration, ensuring it can process queries and utilize the tools effectively.

In [None]:
def setup_ollama_model(model, temperature=0, stop=None):
    """
    Sets up the Ollama model configuration.

    Parameters:
    model (str): The name of the model to use.
    temperature (float): The temperature setting for the model.
    stop (str): The stop token for the model.

    Returns:
    dict: Configuration for the Ollama model.
    """
    return {
        "model_endpoint": "http://localhost:11434/api/generate",
        "model": model,
        "temperature": temperature,
        "headers": {"Content-Type": "application/json"},
        "stop": stop,
    }


# Example configuration
ollama_config = setup_ollama_model(model="llama3:instruct")

In [None]:
# Model settings

# MODEL_TYPE: Defines the type of Whisper model to use. Options include "small", "medium", "large", etc.
# Smaller models are faster but less accurate, while larger models are more accurate but require more resources.
MODEL_TYPE = "small"

# RUN_TYPE: Specifies whether the model should run on a CPU or GPU. Set to "gpu" for GPU acceleration if available.
RUN_TYPE = "cpu"  # Change to "gpu" if you have a GPU available

# For CPU usage:
# NUM_WORKERS: Number of worker threads used by the model for CPU operations. More workers can speed up processing.
NUM_WORKERS = 10

# CPU_THREADS: Number of threads to use for CPU operations. This should ideally match the number of CPU cores available.
CPU_THREADS = 4

# For GPU usage:
# GPU_DEVICE_INDICES: List of GPU indices to use. For example, [0, 1] will use the first two GPUs.
GPU_DEVICE_INDICES = [0, 1, 2, 3]

# VAD_FILTER: Voice Activity Detection filter flag. When True, the model will filter out non-speech audio segments.
VAD_FILTER = True

# Visualization (expected max number of characters for LENGTH_IN_SEC audio)
# MAX_SENTENCE_CHARACTERS: The maximum number of characters expected in a single line of transcription.
# This helps in formatting the display of transcribed text.
MAX_SENTENCE_CHARACTERS = 80

# Audio settings

# STEP_IN_SEC: The length of each audio chunk in seconds. This defines the duration of audio data captured in one go.
STEP_IN_SEC: int = 1

# LENGTH_IN_SEC: Maximum duration of audio data to process at once. This sets the maximum length of audio data that will be processed together.
LENGTH_IN_SEC: int = 6

# NB_CHANNELS: The number of audio channels. 1 for mono, 2 for stereo.
NB_CHANNELS = 1

# RATE: The sample rate of the audio data (in Hz). Common rates include 16000 (16kHz) and 44100 (44.1kHz).
RATE = 16000

# CHUNK: The number of audio samples per frame. This typically matches the sample rate for 1 second of audio data.
CHUNK = RATE

## INPUT_DEVICE_ID
INPUT_DEVICE_ID = 3



In [None]:
# Function to create the Whisper model
def create_whisper_model() -> WhisperModel:
    if RUN_TYPE.lower() == "gpu":
        whisper = WhisperModel(
            MODEL_TYPE,
            device="cuda",
            compute_type="float16",
            device_index=GPU_DEVICE_INDICES,
            download_root="./models",
        )
    elif RUN_TYPE.lower() == "cpu":
        whisper = WhisperModel(
            MODEL_TYPE,
            device="cpu",
            compute_type="int8",
            num_workers=NUM_WORKERS,
            cpu_threads=CPU_THREADS,
            download_root="./models",
        )
    else:
        raise ValueError(f"Invalid model type: {RUN_TYPE}")

    print("Loaded model")
    return whisper

# Load the model
print("Whisper model is ready to use.")

## 3: Defining the Agent Graph State

In this step, we define the structure of the state that our agents will use to store and communicate information. This state acts as a shared memory that different components of the system can access and modify. We use the `TypedDict` from the `typing` module to define the expected structure and types of data within the state. This helps ensure consistency and correctness when accessing or updating the state, making it easier to manage complex workflows and data dependencies.

The `AgentGraphState` class includes fields for the research question, responses from the planner agent, and any final outputs or end states. The `get_agent_graph`_state function is used to retrieve specific parts of the state based on a key, facilitating modular and reusable access to the state data.

In [None]:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

# Reducer for queues
def queue_reducer(existing_queue: queue.Queue, new_data) -> queue.Queue:
    if isinstance(new_data, list):
        for item in new_data:
            existing_queue.put(item)
    else:
        existing_queue.put(new_data)
    return existing_queue


# Define the state object for the agent graph
class AgentGraphState(TypedDict):
    """
    This class defines the structure of the agent graph state.
    
    Attributes:
    research_question (str): The main research question the agent is working on.
    planner_response (list): A list to store responses from the planner agent.
    end_chain (list): A list to store the final outputs or end states.
    """
    start_chain: Annotated[list, add_messages]
    audio_queue: Annotated[queue.Queue, queue_reducer]
    length_queue: Annotated[queue.Queue, queue_reducer]
    recording: bool
    recording_response: Annotated[list, add_messages]
    transcription_response: Annotated[list, add_messages]
    end_chain: Annotated[list, add_messages]


# Function to retrieve specific parts of the agent state
def get_agent_graph_state(state: AgentGraphState, state_key: str):
    """
    Retrieves specific parts of the agent state based on the provided key.
    
    Parameters:
    state (AgentGraphState): The current state of the agent.
    state_key (str): The key indicating which part of the state to retrieve.
    
    Returns:
    list or None: The requested state data or None if the key is not recognized.
    """
    if state_key == "transcription_all":
        return state["transcription_response"]
    elif state_key == "transcription_latest":
        return state["transcription_response"][-1] if state["transcription_response"] else []

    if state_key == "audio_queue":
        return state["audio_queue"]
    elif state_key == "audio_latest":
        return state["audio_queue"][-1] if state["audio_queue"] else []

    if state_key == "length_queue":
        return state["length_queue"]
    elif state_key == "length_latest":
        return state["length_queue"][-1] if state["length_queue"] else []

    if state_key == "recording_all":
        return state["recording_response"]
    elif state_key == "recording_latest":
        return state["recording_response"][-1] if state["recording_response"] else []

    else:
        return None


# Initial state setup
state = {
    "start_chain": [],
    "recording": False,
    "audio_queue": queue.Queue(),
    "length_queue": queue.Queue(maxsize=LENGTH_IN_SEC),
    "recording_response": [],
    "transcription_response": [],
    "end_chain": [],
}

## 4. Agent Class Definition

In this section, we define the `Agent` class, which serves as a base class for different types of agents in our system. An agent is a component that interacts with the language model to perform specific tasks, such as generating responses or processing information. The `Agent` class manages the configuration and state associated with the language model, allowing for easy setup and reuse of model configurations across different agents.

The class includes methods for initializing the agent with a specific model configuration and updating the agent's state. The state encapsulates the context or memory of the agent, enabling it to maintain continuity across interactions.

In [None]:
class Agent:
    def __init__(self, state: AgentGraphState, model_config: dict):
        """
        Initializes the agent with a state and model configuration.

        Parameters:
        state (AgentGraphState): The initial state of the agent, containing necessary context and data.
        model_config (dict): Configuration settings for the model, including endpoint, model name, temperature, etc.
        """
        self.state = state
        self.model_endpoint = model_config.get("model_endpoint")
        self.model_name = model_config.get("model")
        self.temperature = model_config.get(
            "temperature", 0
        )  # Default temperature is 0
        self.headers = model_config.get("headers", {"Content-Type": "application/json"})
        self.stop = model_config.get("stop")

    def update_state(self, key: str, value: any):
        """
        Updates the agent's state with a new key-value pair.

        Parameters:
        key (str): The key in the state dictionary to update.
        value (any): The new value to associate with the specified key.
        """
        # Print all keys in the state dictionary
        
        if key in self.state:
            self.state[key] = value
        else:
            print(f"Warning: Attempting to update a non-existing state key '{key}'.")

## 5. Utility Functions

Utility functions are auxiliary functions that assist with various common tasks within the notebook. They help keep the codebase clean and modular by encapsulating frequently used logic in separate functions. In this case, we have two utility functions: `check_for_content` and `get_current_utc_datetime`.

- `check_for_content`: This function checks if a variable has a content attribute and returns its value if it exists. This is useful for handling different data types that may or may not have a content attribute.
- `get_current_utc_datetime`: This function returns the current date and time in UTC format. This can be useful for timestamping events or logging.

In [None]:
from datetime import datetime, timezone

# Check if an attribute of the state dict has content
def check_for_content(var):
    """
    Checks if the provided variable has a 'content' attribute and returns it.

    Parameters:
    var (Any): The variable to check.

    Returns:
    Any: The 'content' attribute if it exists, otherwise the original variable.
    """
    try:
        return var.content
    except AttributeError:
        return var


# Get the current date and time in UTC
def get_current_utc_datetime():
    """
    Returns the current date and time in UTC.

    Returns:
    str: The current date and time in UTC, formatted as 'YYYY-MM-DD HH:MM:SS UTC'.
    """
    now_utc = datetime.now(timezone.utc)
    return now_utc.strftime("%Y-%m-%d %H:%M:%S UTC")

## 6. AudioAgent

In [None]:
import threading
import os
import wave
import pyaudio
import logging

class AudioAgent(Agent):

    def __init__(
        self,
        state: AgentGraphState,
        audio_folder: str = "mp3_audio_files",
        listen_filename: str = "tmp_listen",
    ):
        super().__init__(
            state, model_config={}
        )
        self.audio_folder = audio_folder
        self.listen_filename = listen_filename
        self.recording = False
        self.record_lock = threading.Lock()

    def listen(self) -> str:
        thread = threading.Thread(target=self.record_audio)
        input("Press ENTER to START recording...")
        with self.record_lock:
            self.recording = True
            self.update_state("recording", True)
        thread.start()
        input("Press ENTER to STOP recording...")
        with self.record_lock:
            self.recording = False
            # self.update_state("recording", False)
        thread.join()

    # Function to record audio from the microphone
    def record_audio(self):
        audio = pyaudio.PyAudio()
        stream = audio.open(
            format=pyaudio.paInt16,
            channels=NB_CHANNELS,
            rate=RATE,
            input=True,
            frames_per_buffer=CHUNK,  # 1 second of audio
            input_device_index=INPUT_DEVICE_ID,  # Specify the selected input device
        )

        print("-" * 80)
        print("Microphone initialized, recording started...")

        print("-" * 80)
        print("TRANSCRIPTION")
        print("-" * 80)
        audio_frames = []
        try:
            while self.recording:
                audio_data = b""
                for _ in range(STEP_IN_SEC):
                    chunk = stream.read(RATE)  # Read 1 second of audio data
                    audio_data += chunk
                    audio_frames.append(chunk)
                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                    file_name = f"{self.audio_folder}/{self.listen_filename}_{timestamp}.wav"
                    self.save_audio(audio, file_name, audio_frames)
                    self.update_state(
                        "audio_queue", audio_data
                    )  # Put the 1-second audio data into the queue
                    self.update_state("transcription_response", file_name)
                    print(colored(f"Audio File Recorder 🎧: {file_name}", "cyan"))

        finally:
            stream.stop_stream()
            stream.close()
            audio.terminate()
            # self.update_state("recording", False)
            print("Microphone recording stopped.")

    def save_audio(self, audio, file_name, frames):
        try:
            with wave.open(file_name, "wb") as wf:
                wf.setnchannels(NB_CHANNELS)
                wf.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
                wf.setframerate(RATE)
                wf.writeframes(b"".join(frames))
        except Exception as e:
            logging.error(f"Failed when trying to save audio: {e}")

    def invoke(
        self,
    ) -> dict:

        try:
            listen = self.listen()
            return self.state
        except requests.RequestException as e:
            print(f"Error in invoking model! {str(e)}")
            return {"error": str(e)}

In [None]:
class TranscriptAgent(Agent):

    # Function to process audio and get transcription
    def process_audio(audio_queue, length_queue):
        while audio_queue.empty():
            if length_queue.qsize() >= LENGHT_IN_SEC:
                with length_queue.mutex:
                    length_queue.queue.clear()
                    print()

            try:
                audio_data = audio_queue.get(timeout=1)
            except queue.Empty:
                continue

            transcription_start_time = time.time()
            length_queue.put(audio_data)

            # Concatenate audio data in the length_queue
            audio_data_to_process = b""
            for i in range(length_queue.qsize()):
                # We index it so it won't get removed
                audio_data_to_process += length_queue.queue[i]

            try:
                # Convert to NumPy array and normalize
                audio_np = (
                    np.frombuffer(audio_data_to_process, np.int16).astype(np.float32)
                    / 255.0
                )
                transcription, language, language_probability = (
                    execute_whisper_transcription(model, audio_np)
                )
                transcription = re.sub(r"\[.*\]", "", transcription)
                transcription = re.sub(r"\(.*\)", "", transcription)
            except Exception as e:
                print(e)
                transcription = "Error"

            transcription_end_time = time.time()

            # Display transcription
            transcription_to_visualize = transcription.ljust(MAX_SENTENCE_CHARACTERS, " ")
            transcription_postprocessing_end_time = time.time()

            sys.stdout.write("\033[K" + transcription_to_visualize + "\r")

            audio_queue.task_done()

            # overall_elapsed_time = (
            #     transcription_postprocessing_end_time - transcription_start_time
            # )
            # transcription_elapsed_time = transcription_end_time - transcription_start_time
            # postprocessing_elapsed_time = (
            #     transcription_postprocessing_end_time - transcription_end_time
            # )
            # stats["overall"].append(overall_elapsed_time)
            # stats["transcription"].append(transcription_elapsed_time)
            # stats["postprocessing"].append(postprocessing_elapsed_time)

        print("Audio processing stopped.")

    def invoke(self, audio_queue, length_queue) -> dict:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        print(colored(f"Transcript 📝: {timestamp}", "yellow"))
        self.update_state("recording", True)
        try:
            # self.process_audio(audio_queue, length_queue)
            # self.update_state("researcher_response", response_formatted)
            # print(colored(f"Researcher 🕵️‍♂️: {response_formatted}", "yellow"))
            return self.state
        except requests.RequestException as e:
            print(f"Error in invoking model! {str(e)}")
            return {"error": str(e)}

## 7. End Node

The `EndNodeAgent` class is a specialized agent that marks the conclusion of the workflow in the agent graph. It extends the base `Agent` class and is primarily responsible for updating the state to indicate the end of the process. This agent is useful for workflows that require a clear termination point, ensuring that the system knows when all processing is complete.

In [None]:
class EndNodeAgent(Agent):
    def invoke(self) -> AgentGraphState:
        """
        Marks the end of the workflow by updating the state.

        This method updates the 'end_chain' key in the state to signify that
        the workflow has reached its conclusion. It can be used to perform any
        finalization tasks or simply to denote that the agent has completed its role.

        Returns:
        AgentGraphState: The updated state of the agent.
        """
        self.update_state("end_chain", "end_chain")
        return self.state

In [None]:
class StartNodeAgent(Agent):
    def invoke(self) -> AgentGraphState:
        """
        Marks the end of the workflow by updating the state.

        This method updates the 'end_chain' key in the state to signify that
        the workflow has reached its conclusion. It can be used to perform any
        finalization tasks or simply to denote that the agent has completed its role.

        Returns:
        AgentGraphState: The updated state of the agent.
        """
        self.update_state("start_chain", "start_chain")
        return self.state

In [None]:
def should_continue(data):
    """
    Determines the next step in the workflow based on the agent's output.

    Parameters:
    data (dict): The data containing the agent's state.

    Returns:
    str: The next node to execute ('continue' for tool execution, 'end' to finish).
    """
    # Check if the tools_response contains a final answer or indication to stop

    recording = data["recording"]

    # Determine the next step based on the verification status
    if recording:
        print(colored(f"Still recording...: ", "green"))
        return "transcript"
    else:
        print(colored(f"Recording has finished", "red"))
        return "end"

## 8. Creating and Compiling the Agent Graph

In this section, we define the structure and flow of the agent-based system using the `StateGraph` class from the `langgraph` library. The graph consists of nodes, each representing a specific agent, and edges, which define the flow or sequence of operations. This setup enables the modeling of complex workflows where different agents can interact and pass information.

- **`create_graph`:** This function initializes the `StateGraph` with a specific state structure (`AgentGraphState`). It then adds nodes for the `PlannerAgent` and `EndNodeAgent`, specifying the operations these agents should perform. The function sets the "planner" node as the entry point and the "end" node as the finish point, with an edge connecting them to define the workflow sequence.
- **`compile_workflow`:** This function compiles the defined graph into a workflow that can be executed. The compiled workflow manages the execution of the nodes in the defined order, handling the flow of data and control through the system.


In [None]:
from langgraph.graph import StateGraph, END

def create_graph() -> StateGraph:
    """
    Creates and configures the state graph for the agent workflow.

    This function initializes the graph, adds the necessary nodes (agents), and
    sets up the edges defining the flow of the workflow.

    Returns:
    StateGraph: The configured state graph for the workflow.
    """
    graph = StateGraph(AgentGraphState)
    # graph.add_node(
    #     "planner",
    #     lambda state: PlannerAgent(
    #         state=state,
    #         model_config=ollama_config,
    #     ).invoke(
    #         research_question=state["research_question"],
    #         feedback=lambda: get_agent_graph_state(
    #             state=state, state_key="reviewer_latest"
    #         ),
    #         prompt=planner_prompt_template,
    #     ),
    # )

    graph.add_node(
        "start",
        lambda state: StartNodeAgent(
            state=state,
            model_config=ollama_config,
        ).invoke(),
    )

    graph.add_node(
        "audio",
        lambda state: AudioAgent(
            state=state,
        ).invoke(
        ),
    )

    graph.add_node(
        "transcript",
        lambda state: TranscriptAgent(
            state=state,
            model_config=ollama_config,
        ).invoke(
            audio_queue=lambda: get_agent_graph_state(
                state=state, state_key="audio_queue"
            ),
            length_queue=lambda: get_agent_graph_state(
                state=state, state_key="length_queue"
            ),
        ),
    )

    graph.add_node(
        "end",
        lambda state: EndNodeAgent(
            state=state,
            model_config=ollama_config,
        ).invoke(),
    )

    # Set the entry and finish points for the workflow
    graph.set_entry_point("start")
    graph.set_finish_point("end")

    # Define the flow of the graph
    graph.add_edge("start", "audio")
    graph.add_edge("start", "transcript")

    graph.add_conditional_edges(
        "transcript", should_continue, {"transcript": "transcript", "end": "end"}
    )
    graph.add_edge("audio", "end")

    return graph


def compile_workflow(graph: StateGraph):
    """
    Compiles the state graph into an executable workflow.

    This function compiles the graph, enabling the defined nodes and edges to
    be executed in sequence as per the workflow's logic.

    Parameters:
    graph (StateGraph): The state graph defining the workflow.

    Returns:
    Any: The compiled workflow ready for execution.
    """
    workflow = graph.compile()
    return workflow

## Running the Workflow

In this final section, we execute the workflow defined in the agent graph. We start by creating the graph using the `create_graph` function and then compiling it into an executable workflow with `compile_workflow`. The workflow is then run with specific inputs and configurations.

- **`graph = create_graph()`:** This initializes the graph structure, including all nodes and edges as defined previously.
- **`workflow = compile_workflow(graph)`:** This compiles the graph into a runnable workflow, preparing it for execution.
- **`iterations = 10`:** This variable sets the recursion limit for the workflow, determining how many iterations the workflow should allow.
- **`verbose = True`:** If set to `True`, the system will print detailed information about each state change during the workflow execution.
- **`query = "Who's the president of the USA?"`:** The research question provided as input to the workflow, which the planner agent will process.
- **`dict_inputs = {"research_question": query}`:** A dictionary containing the initial inputs to the workflow, including the research question.
- **`limit = {"recursion_limit": iterations}`:** This sets the limit for the number of iterations, preventing infinite loops or excessive processing.

The loop iterates over the events generated by the workflow, printing the state at each step if `verbose` is enabled.

In [None]:
# Create the graph and compile the workflow
graph = create_graph()
workflow = compile_workflow(graph)
print("Graph and workflow created.")

# Define workflow parameters
iterations = 10
verbose = True
dict_inputs = {
    "audio_queue": queue.Queue(),
    "length_queue": queue.Queue(maxsize=LENGTH_IN_SEC),
}
limit = {"recursion_limit": iterations}

# Execute the workflow and print state changes
for event in workflow.stream(dict_inputs, limit):
    if verbose:
        print("\nState Dictionary:", event)
    else:
        print("\n")