## Parallel  node execution (Execução paralela de nós)
#### Revisão

No módulo 3, aprofundamos o conceito de interação humana, mostrando 3 casos de uso comuns:

(1) Aprovação - Podemos interromper nosso agente, expor o estado a um usuário e permitir que ele aceite uma ação.

(2) Depuração - Podemos retroceder o grafo para reproduzir ou evitar problemas.

(3) Edição - Você pode modificar o estado.

#### Objetivos
Este módulo aprofundará o conceito de interação humana, bem como os conceitos de memória discutidos no módulo 2.

Vamos explorar fluxos de trabalho multiagentes e construir um assistente de pesquisa multiagente que integre todos os módulos deste curso.

Para construir este assistente de pesquisa multiagente, primeiro discutiremos alguns tópicos de controlabilidade do LangGraph.

Começaremos com a paralelização.

Expansão e refinamento
Vamos construir um grafo linear simples que sobrescreve o estado a cada passo.

In [None]:
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

In [None]:
from IPython.display import Image, display

from typing import Any, List
from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # Note, no reducer function. 
    state: List[str]

class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['state']}")
        return {"state": [self._value]}

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
graph.invoke({"state": []})

Agora, vamos executar b e c em paralelo.

E depois executar d.

Podemos fazer isso facilmente com fan-out de a para b e c, e depois fan-in para d.

As atualizações de estado são aplicadas ao final de cada etapa.

Vamos executar.

In [None]:
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

Isso ocorre porque tanto b quanto c estão escrevendo na mesma chave de estado/canal na mesma etapa.

In [None]:
from langgraph.errors import InvalidUpdateError
try:
    graph.invoke({"state": []})
except InvalidUpdateError as e:
    print(f"An error occurred: {e}")

Ao usar o recurso de "fan out", precisamos garantir que estamos usando um reducer se as etapas estiverem escrevendo no mesmo canal/chave.

Como mencionado no Módulo 2, `operator.add` é uma função do módulo `operator` do Python.

Quando `operator.add` é aplicado a listas, ele realiza a concatenação das listas.

In [None]:
import operator
from typing import Annotated

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    state: Annotated[list, operator.add]

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
graph.invoke({"state": []})

Agora vemos que adicionamos ao estado as atualizações feitas em paralelo por b e c.

Aguardando a conclusão dos nós
Agora, vamos considerar um caso em que um caminho paralelo tenha mais etapas do que o outro.

In [None]:
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

Neste caso, b, b2 e c fazem parte da mesma etapa.

O gráfico aguardará a conclusão de todas essas etapas antes de prosseguir para a etapa d.

In [None]:
graph.invoke({"state": []})

### Definindo a ordem das atualizações de estado

No entanto, em cada etapa, não temos controle específico sobre a ordem das atualizações de estado!

Em termos simples, trata-se de uma ordem determinística definida pelo LangGraph com base na topologia do grafo, que não controlamos.

Acima, vemos que c é adicionado antes de b2.

Contudo, podemos usar um reducer personalizado para personalizar isso, por exemplo, ordenando as atualizações de estado.

In [None]:
def sorting_reducer(left, right):
    """ Combines and sorts the values in a list"""
    if not isinstance(left, list):
        left = [left]

    if not isinstance(right, list):
        right = [right]
    
    return sorted(left + right, reverse=False)

class State(TypedDict):
    # sorting_reducer will sort the values in state
    state: Annotated[list, sorting_reducer]

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

Agora, o reducer ordena os valores de estado atualizados!

O exemplo sorting_reducer ordena todos os valores globalmente. Também podemos:

1. Escrever as saídas em um campo separado no estado durante a etapa paralela.
2. Usar um nó "sink" após a etapa paralela para combinar e ordenar essas saídas.
3. Limpar o campo temporário após a combinação.

### Trabalhando com LLMs

Agora, vamos adicionar um exemplo realista!

Queremos coletar contexto de duas fontes externas (Wikipedia e Busca na Web) e pedir a um LLM que responda a uma pergunta.

In [None]:
from langchain_openai import ChatOpenAI
llm= ChatOpenAI(model="gpt-4o", temperature= 0)

In [None]:
class State(TypedDict):
    question: str
    answer: str
    context: Annotated[list, operator.add]

In [None]:
import os, getpass
def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("TAVILY_API_KEY")

In [None]:

from langchain_core.messages import HumanMessage, SystemMessage

from langchain_community.document_loaders import WikipediaLoader
from langchain_tavily import TavilySearch  # updated since filming

def search_web(state):
    
    """ Retrieve docs from web search """

    # Search
    tavily_search = TavilySearch(max_results=3)
    data = tavily_search.invoke({"query": state['question']})
    search_docs = data.get("results", data)

     # Format
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
            for doc in search_docs
        ]
    )

    return {"context": [formatted_search_docs]} 

def search_wikipedia(state):
    
    """ Retrieve docs from wikipedia """

    # Search
    search_docs = WikipediaLoader(query=state['question'], 
                                  load_max_docs=2).load()

     # Format
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
            for doc in search_docs
        ]
    )

    return {"context": [formatted_search_docs]} 

def generate_answer(state):
    
    """ Node to answer a question """

    # Get state
    context = state["context"]
    question = state["question"]

    # Template
    answer_template = """Answer the question {question} using this context: {context}"""
    answer_instructions = answer_template.format(question=question, 
                                                       context=context)    
    
    # Answer
    answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"Answer the question.")])
      
    # Append it to state
    return {"answer": answer}

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)

# Flow
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
result = graph.invoke({"question": "How were Nvidia's Q2 2025 earnings"})
result['answer'].content