In [8]:

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage, ChatMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from tavily import TavilyClient
import os
from IPython.display import Image
from langgraph.checkpoint.memory import MemorySaver

from dotenv import load_dotenv

_ = load_dotenv()

In [9]:
# Set-up the model and the memory
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

In [10]:
# Create classes 
class AgentState(TypedDict):
    task: str
    plan: str
    draft: str
    critique: str
    content: List[str]
    revision_number: int
    max_revisions: int

class Queries(BaseModel):
    queries: List[str]

In [11]:
# Create the prompts for the different nodes

PLAN_PROMPT = """You are an expert writer tasked with writing a high level outline of an essay. \
Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes \
or instructions for the sections."""

WRITER_PROMPT = """You are an essay assistant tasked with writing excellent 5-paragraph essays.\
Generate the best essay possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of your previous attempts. \
Utilize all the information below as needed: 

------

{content}"""

REFLECTION_PROMPT = """You are a teacher grading an essay submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc."""

RESEARCH_PLAN_PROMPT = """You are a researcher charged with providing information that can \
be used when writing the following essay. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max."""


RESEARCH_CRITIQUE_PROMPT = """You are a researcher charged with providing information that can \
be used when making any requested revisions (as outlined below). \
Generate a list of search queries that will gather any relevant information. Only generate 3 queries max."""


In [12]:
# Create the nodes

def plan_node(state: AgentState):
    messages = [
        SystemMessage(content=PLAN_PROMPT), 
        HumanMessage(content=state['task'])
    ]
    response = model.invoke(messages)
    return {"plan": response.content}

def research_plan_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_PLAN_PROMPT),
        HumanMessage(content=state['task'])
    ])
    content = state.get('content', [])
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response['results']:
            content.append(r['content'])
    return {"content": content}

def generation_node(state: AgentState):
    content = "\n\n".join(state['content'] or [])
    user_message = HumanMessage(
        content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}")
    messages = [
        SystemMessage(
            content=WRITER_PROMPT.format(content=content)
        ),
        user_message
        ]
    response = model.invoke(messages)
    return {
        "draft": response.content, 
        "revision_number": state.get("revision_number", 0) + 1
    }


def reflection_node(state: AgentState):
    messages = [
        SystemMessage(content=REFLECTION_PROMPT), 
        HumanMessage(content=state['draft'])
    ]
    response = model.invoke(messages)
    return {"critique": response.content}

def research_critique_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
        HumanMessage(content=state['critique'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response['results']:
            content.append(r['content'])
    return {"content": content}

def should_continue(state):
    if state["revision_number"] > state["max_revisions"]:
        return END
    return "reflect"


In [13]:
# Build the graph
builder = StateGraph(AgentState)

builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

builder.set_entry_point("planner")

builder.add_conditional_edges(
    "generate", 
    should_continue, 
    {END: END, "reflect": "reflect"}
)


builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")

builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")



# Visualize the graph
# Image(graph.get_graph().draw_png())

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

In [14]:
# Set up the stream
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream({
    'task': "What is OpenTelemetry and how does it work?",
    "max_revisions": 2,
    "revision_number": 0,
}, thread):
    print(s)


{'planner': {'plan': '**Outline for Essay: "What is OpenTelemetry and How Does It Work?"**\n\n**I. Introduction**\n   A. Definition of OpenTelemetry\n      1. Overview of its purpose in observability\n      2. Importance in modern software development\n   B. Brief history and evolution\n      1. Origins of OpenTelemetry from OpenTracing and OpenCensus\n      2. The merging of projects and establishment of OpenTelemetry\n   C. Thesis statement\n      1. OpenTelemetry is a powerful framework for observability that provides a standardized way to collect, process, and export telemetry data from applications.\n\n**II. Understanding Observability**\n   A. Definition of observability in software systems\n      1. Explanation of key concepts: metrics, logs, and traces\n   B. Importance of observability in cloud-native applications\n      1. Challenges of distributed systems\n      2. Need for real-time monitoring and troubleshooting\n\n**III. Components of OpenTelemetry**\n   A. Data Collectio

In [15]:
print(s.get('generate').get('draft'))

**What is OpenTelemetry and How Does It Work?**

**I. Introduction**

OpenTelemetry is an open-source observability framework designed to enhance the monitoring and analysis of software applications. Its primary purpose is to provide a unified, vendor-neutral approach for collecting and exporting telemetry data, which is crucial for understanding application performance and behavior. In modern software development, where applications are increasingly complex and distributed, OpenTelemetry plays a vital role in ensuring that developers and operators can effectively monitor their systems. The framework has evolved from the merging of two significant projects, OpenTracing and OpenCensus, which laid the groundwork for a standardized observability solution. This essay will explore the components and workings of OpenTelemetry, demonstrating its significance as a powerful framework for observability that standardizes the collection, processing, and exportation of telemetry data from applicati