### Parallel Execution

In [None]:
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END


class InputState(TypedDict):
    # string_value: str
    string_value: Annotated[str, operator.add]


"""
In this case ( not only in this case though ) :

A node completes its execution and returns a partial state (e.g., {"string_value": "a"})
LangGraph determines where this execution should go next based on the graph structure
If multiple execution paths are converging at the next node (like in our parallel execution examples), LangGraph needs to combine their results
This is when the annotation's operation executes - LangGraph uses the specified operator (like operator.add) to combine the values from all converging paths for each annotated key
The combined state is then passed to the next node in the graph
"""

In [None]:
def append_a(state: InputState) -> Any:
    print(f"append_a: Current string_value='{state['string_value']}'")
    state["string_value"] += "a"
    return state 
    """
    return {"string_value": "a"}
    
    could also return this, as in the state we already have the annotated method, so this returned state value , 
    would just be added to the state's string_value , because of the operator.add annotation. 
    """


def append_b(state: InputState) -> Any:
    print(f"append_b: Current string_value='{state['string_value']}'")
    state["string_value"] += "b"
    return state


def append_c(state: InputState) -> Any:
    print(f"append_c: Current string_value='{state['string_value']}'")
    state["string_value"] += "c"
    return state


def combine_strings(state: InputState) -> Any:
    combined_string = state["string_value"]
    print(f"combine_strings: Combined string_value='{combined_string}'")
    final_string = combined_string.upper()
    print(f"combine_strings: Final string_value='{final_string}'")
    state["string_value"] = final_string
    return state

In [None]:
"""
The edges define the flow:

From START to start_node
From start_node to all three append nodes (creating parallel paths)
From each append node to combine_node (merging parallel paths)
From combine_node to END
"""

builder = StateGraph(InputState)

builder.add_node("start_node", lambda state: state)
builder.add_node("append_a_node", append_a)
builder.add_node("append_b_node", append_b)
builder.add_node("append_c_node", append_c)
builder.add_node("combine_node", combine_strings)

builder.add_edge(START, "start_node")

builder.add_edge("start_node", "append_a_node")
builder.add_edge("start_node", "append_b_node")
builder.add_edge("start_node", "append_c_node")

builder.add_edge("append_a_node", "combine_node")
builder.add_edge("append_b_node", "combine_node")
builder.add_edge("append_c_node", "combine_node")

builder.add_edge("combine_node", END)

graph = builder.compile()

initial_state = {"string_value": ""}
"""
Result Explanation
When you run this graph:

The start_node passes the initial state to all three append nodes simultaneously
Each append node adds its character to its copy of the state
When the results flow to combine_node, the Annotated[str, operator.add] type annotation causes the three string values to be combined using string concatenation
combine_node converts the combined string to uppercase
The final result is a string containing all three appended characters (in some order) in uppercase
"""

In [None]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
result = graph.invoke(initial_state)
print(result)

### Mapreduce with Send API

In [None]:
import operator
import random
from typing import Any, Annotated, List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from IPython.display import Image, display


class State(TypedDict):
    string_value: Annotated[str, operator.add]
    letters: List[str]


def generate_letters(_: State) -> Any:
    num_nodes = random.randint(5, 10)
    letters = random.choices(["a", "b", "c", "d", "e"], k=num_nodes)
    print(f"Generated letters: {letters}")
    return {"letters": letters}



"""
For each letter: The code loops through each character in the letters list (e.g., if letters is ['a', 'b', 'c'])
Creates a Send object: For each letter, it creates a new Send object with:

First parameter "append_letter": This is the name of the target node in the graph
Second parameter {"letter": letter}: This is a dictionary that will be added to the state when that node is executed

Meaning, each Send object will call the append_letter node, for all the letter in letters according to the for loop.

This is the "mapping" phase of MapReduce

Result is a list of Send objects: This gives us something like:

[
  Send("append_letter", {"letter": "a"}),
  Send("append_letter", {"letter": "b"}),
  Send("append_letter", {"letter": "c"})
]

When a Send object like Send("append_letter", {"letter": "a"}) is processed, LangGraph:

Takes the current state
Merges the provided dictionary {"letter": "a"} into it
Passes this merged state to the append_letter function


So the actual state received by append_letter looks like:

{
  "string_value": "",
  "letters": ["a", "b", "c", ...],
  "letter": "a"  # This is added by the Send operation
}

Inside append_letter, we can access state["letter"] even though it's not part of the original State type definition. This is one of the flexible aspects of using TypedDict - the static type hints help development but don't restrict the actual runtime behavior.
"""
def generate_random_nodes(state: State):
    letters = state["letters"]
    print(f"Using letters from state: {letters}")
    return [Send("append_letter", {"letter": letter}) for letter in letters]


"""
This function gets executed once for each letter
It extracts the letter from the state and returns it as part of the string_value
This is equivalent to the "map" workers in MapReduce
"""
def append_letter(state: State) -> Any:
    letter = state["letter"]
    print(f"Appending '{letter}'")
    return {"string_value": letter}


def combine_strings(state: State) -> Any:
    combined_string = state["string_value"]
    print(f"combine_strings: Combined string_value='{combined_string}'")
    final_string = combined_string.upper()
    print(f"combine_strings: Final string_value='{final_string}'")
    return {"string_value": final_string}


builder = StateGraph(State)
builder.add_node("start_node", lambda state: state)
builder.add_node("generate_letters", generate_letters)
builder.add_node("append_letter", append_letter)
builder.add_node("combine_node", combine_strings)

builder.add_edge(START, "start_node")


builder.add_edge("start_node", "generate_letters")

"""
add_conditional_edges is used with generate_random_nodes to create the dynamic mapping
The third parameter ["append_letter"] tells LangGraph which nodes can be targeted by the Send objects
This allows the graph to dynamically spawn multiple executions of the append_letter node
"""
builder.add_conditional_edges(
    "generate_letters", generate_random_nodes, ["append_letter"]
)
builder.add_edge("append_letter", "combine_node")
builder.add_edge("combine_node", END)

graph = builder.compile()

In [None]:
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
initial_state = {"string_value": ""}
result = graph.invoke(initial_state)

print(result)