In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import nest_asyncio

nest_asyncio.apply()

# Imports

In [None]:
from typing import Literal, Optional

from dotenv import load_dotenv
from IPython.display import Image, display
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langsmith import traceable
from pydantic import BaseModel
from typing_extensions import TypedDict

load_dotenv()

# Vanilla workflow

In [None]:
class State(BaseModel):
    input: str
    type: Optional[
        Literal["write_article", "generate_table_of_contents", "review_article"]
    ] = None
    output: Optional[str] = None


class MessageType(BaseModel):
    type: Literal["write_article", "generate_table_of_contents", "review_article"]


model = ChatOpenAI(model="gpt-4.1-mini")


@traceable
def classify_message(state: State) -> State:
    model_with_str_output = model.with_structured_output(MessageType)
    messages = [
        SystemMessage(
            content="You are a helpful assistant. You will classify the message into one of the following categories: 'write_article', 'generate_table_of_contents', 'review_article'."
        ),
        HumanMessage(content=f"Classify the message: {state.input}"),
    ]
    return model_with_str_output.invoke(messages).type


@traceable
def write_article(state: State) -> State:
    messages = [
        SystemMessage(
            content="You are a writer. You will write an article about the topic provided."
        ),
        HumanMessage(content=f"Write an article about {state.input}"),
    ]
    return model.invoke(messages).content


@traceable
def generate_table_of_contents(state: State) -> State:
    messages = [
        SystemMessage(
            content="You are a writer. You will generate a table of contents for an article about the topic provided."
        ),
        HumanMessage(
            content=f"Generate a table of contents for an article about {state.input}"
        ),
    ]
    return model.invoke(messages).content


@traceable
def review_article(state: State) -> State:
    messages = [
        SystemMessage(
            content="You are a writer. You will review the article for the topic provided."
        ),
        HumanMessage(content=f"Review the article for the topic {state.input}"),
    ]
    return model.invoke(messages).content


@traceable
def run_workflow(message: str) -> str:
    state = State(input=message)
    state.type = classify_message(state)
    if state.type == "write_article":
        return write_article(state)
    elif state.type == "generate_table_of_contents":
        return generate_table_of_contents(state)
    elif state.type == "review_article":
        return review_article(state)
    else:
        return "I'm sorry, I don't know how to handle that message."


print(run_workflow("Write an article about the meaning of life"))

In [None]:
print(run_workflow("Make a table of contents for an article about the meaning of life"))

# LangGraph implementation

In [None]:
class State(TypedDict):
    input: str
    type: Literal["write_article", "generate_table_of_contents", "review_article"]
    output: str


class MessageType(BaseModel):
    type: Literal["write_article", "generate_table_of_contents", "review_article"]


def classify_message(message: str) -> dict:
    model_with_str_output = model.with_structured_output(MessageType)
    messages = [
        SystemMessage(
            content="You are a writer. You will classify the message into one of the following categories: 'write_article', 'generate_table_of_contents', 'review_article'."
        ),
        HumanMessage(content=f"Classify the message: {message}"),
    ]
    return {"type": model_with_str_output.invoke(messages).type}


def route_message(state: State) -> State:
    if state["type"] == "write_article":
        return "generate_article_content"
    elif state["type"] == "generate_table_of_contents":
        return "generate_table_of_contents"
    elif state["type"] == "review_article":
        return "revise_article_content"
    else:
        raise ValueError(f"Invalid message type: {state['type']}")


def generate_table_of_contents(state: State) -> State:
    messages = [
        SystemMessage(
            content="You are an expert writer specialized in SEO. Provided with a topic, you will generate the table of contents for a short article."
        ),
        HumanMessage(
            content=f"Generate the table of contents of an article about {state['input']}"
        ),
    ]
    return {"output": model.invoke(messages).content}


def generate_article_content(state: State) -> str:
    messages = [
        SystemMessage(
            content="You are an expert writer specialized in SEO. Provided with a topic and a table of contents, you will generate the content of the article."
        ),
        HumanMessage(
            content=f"Generate the content of an article about {state['input']}"
        ),
    ]
    return {"output": model.invoke(messages).content}


def revise_article_content(state: State) -> str:
    messages = [
        SystemMessage(
            content="You are an expert writer specialized in SEO. Provided with a topic, a table of contents and a content, you will revise the content of the article to make it less than 1000 characters."
        ),
        HumanMessage(
            content=f"Revise the content of the following article:\n\n{state['input']}"
        ),
    ]
    return {"output": model.invoke(messages).content}


workflow = StateGraph(State)

workflow.add_node("classify_message", classify_message)
workflow.add_conditional_edges(
    "classify_message",
    route_message,
    {
        "generate_article_content": "generate_article_content",
        "generate_table_of_contents": "generate_table_of_contents",
        "revise_article_content": "revise_article_content",
    },
)

workflow.add_node("generate_table_of_contents", generate_table_of_contents)
workflow.add_node("generate_article_content", generate_article_content)
workflow.add_node("revise_article_content", revise_article_content)

workflow.add_edge(START, "classify_message")
workflow.add_edge("generate_table_of_contents", END)
workflow.add_edge("generate_article_content", END)
workflow.add_edge("revise_article_content", END)

chain = workflow.compile()

display(Image(chain.get_graph().draw_mermaid_png()))

In [None]:
ARTICLE = """
LangGraph is a library for building workflows. It helps you build workflows that are easy to understand and maintain.
"""

state = chain.invoke({"input": "I wrote this article please improve it:\n\n" + ARTICLE})

if "type" in state and state["type"] is not None:
    print("Type:")
    print(state["type"])
    print("\n--- --- ---\n")

if "output" in state:
    print("Output:")
    print(state["output"])
else:
    print("No output detected!")

## Exercise

Implement a LangGraph workflow that takes the content of a PDF file and depending on the type of document, it will process the document in a different way. Mock the processing for now.

In [None]:
model = ChatOpenAI(model="gpt-4.1-mini", temperature=0)


class State(TypedDict):
    path: str
    type: Literal["financial", "legal", "marketing", "pets", "other"]
    document: str
    output: str


class DocumentType(BaseModel):
    type: Literal["financial", "legal", "marketing", "pets", "other"]


def get_first_n_pages(file_path: str, n: int = 5):
    loader = PyPDFLoader(file_path)
    pages = []
    for page in loader.lazy_load():
        pages.append(page)
    return "\n\n".join([p.page_content for p in pages[:n]])


def classify_document(state: State) -> dict:
    document = get_first_n_pages(state["path"])
    model_with_str_output = model.with_structured_output(DocumentType)
    messages = [
        SystemMessage(
            content="You are an expert document classifier. You will classify a document into one of the following categories: 'financial', 'legal', 'marketing', 'pets', 'other'."
        ),
        HumanMessage(content=f"Classify the document: {document}"),
    ]
    return {"type": model_with_str_output.invoke(messages).type, "document": document}


def route_message(state: State) -> State:
    if state["type"] == "financial":
        return "process_financial_document"
    elif state["type"] == "legal":
        return "process_legal_document"
    elif state["type"] == "marketing":
        return "process_marketing_document"
    elif state["type"] == "pets":
        return "process_pets_document"
    elif state["type"] == "other":
        return "process_other_document"
    else:
        raise ValueError(f"Invalid message type: {state['type']}")


def process_financial_document(state: State) -> State:
    return {"output": "Financial document processed"}


def process_legal_document(state: State) -> State:
    return {"output": "Legal document processed"}


def process_marketing_document(state: State) -> State:
    return {"output": "Marketing document processed"}


def process_pets_document(state: State) -> State:
    return {"output": "Pets document processed"}


def process_other_document(state: State) -> State:
    return {"output": "Other document processed"}


workflow = StateGraph(State)

workflow.add_node("classify_document", classify_document)
workflow.add_node("process_financial_document", process_financial_document)
workflow.add_node("process_legal_document", process_legal_document)
workflow.add_node("process_marketing_document", process_marketing_document)
workflow.add_node("process_pets_document", process_pets_document)
workflow.add_node("process_other_document", process_other_document)

workflow.add_conditional_edges(
    "classify_document",
    route_message,
    {
        "process_financial_document": "process_financial_document",
        "process_legal_document": "process_legal_document",
        "process_marketing_document": "process_marketing_document",
        "process_pets_document": "process_pets_document",
        "other": "process_other_document",
    },
)

workflow.add_edge(START, "classify_document")
workflow.add_edge("process_financial_document", END)
workflow.add_edge("process_legal_document", END)
workflow.add_edge("process_marketing_document", END)
workflow.add_edge("process_pets_document", END)
workflow.add_edge("process_other_document", END)

chain = workflow.compile()

display(Image(chain.get_graph().draw_mermaid_png()))

In [None]:
state = chain.invoke({"path": "assets/dogs.pdf"})