[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/langchain-ai/langchain-academy/blob/main/module-4/map-reduce.ipynb) [![Open in LangChain Academy](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66e9eba12c7b7688aa3dbb5e_LCA-badge-green.svg)](https://academy.langchain.com/courses/take/intro-to-langgraph/lessons/58239947-lesson-3-map-reduce)

# Map-reduce

## Review

We're building up to a multi-agent research assistant that ties together all of the modules from this course.

To build this multi-agent assistant, we've been introducing a few LangGraph controllability topics.

We just covered parallelization and sub-graphs.

## Goals

Now, we're going to cover [map reduce](https://docs.langchain.com/oss/python/langgraph/use-graph-api#map-reduce-and-the-send-api).

In [2]:
%%capture --no-stderr
%pip install -U langchain_openai langgraph

In [20]:
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("Open ai API key is to be used HERE")

KeyboardInterrupt: Interrupted by user

We'll use [LangSmith](https://docs.langchain.com/langsmith/home) for [tracing](https://docs.langchain.com/langsmith/observability-concepts).

In [None]:
_set_env("LANGSMITH key is to be used here")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"

## Problem

Map-reduce operations are essential for efficient task decomposition and parallel processing.

It has two phases:

(1) `Map` - Break a task into smaller sub-tasks, processing each sub-task in parallel.

(2) `Reduce` - Aggregate the results across all of the completed, parallelized sub-tasks.

Let's design a system that will do two things:

(1) `Map` - Create a set of jokes about a topic.

(2) `Reduce` - Pick the best joke from the list.

We'll use an LLM to do the job generation and selection.

In [6]:
subjects_prompt = """Generate a list of 3 sub-topics related to this overall theme: {topic}."""
quote_prompt = """Write a short inspirational quote about {subject}."""
best_quote_prompt = """Below are several quotes about {topic}. Choose the most inspiring one and return its ID (starting from 0). Quotes:\n\n {quotes}"""


## State

### Parallelizing joke generation

First, let's define the entry point of the graph that will:

* Take a user input topic
* Produce a list of joke topics from it
* Send each joke topic to our above joke generation node

Our state has a `jokes` key, which will accumulate jokes from parallelized joke generation

In [8]:
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel  # ✅ this import is required

class Subjects(BaseModel):
    subjects: list[str]

class BestQuote(BaseModel):
    id: int

class OverallState(TypedDict):
    topic: str
    subjects: list
    quotes: Annotated[list, operator.add]
    best_selected_quote: str


Generate subjects for jokes.

In [9]:
def generate_topics(state: OverallState):
    prompt = f"List 3 themes or ideas related to {state['topic']} that can inspire people."
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}


Here is the magic: we use the  [Send](https://docs.langchain.com/oss/python/langgraph/graph-api/#send) to create a joke for each subject.

This is very useful! It can automatically parallelize joke generation for any number of subjects.

* `generate_joke`: the name of the node in the graph
* `{"subject": s`}: the state to send

`Send` allow you to pass any state that you want to `generate_joke`! It does not have to align with `OverallState`.

In this case, `generate_joke` is using its own internal state, and we can populate this via `Send`.

In [10]:
from langgraph.types import Send
def continue_to_quotes(state: OverallState):
    return [Send("generate_quote", {"subject": s}) for s in state["subjects"]]


### Joke generation (map)

Now, we just define a node that will create our jokes, `generate_joke`!

We write them back out to `jokes` in `OverallState`!

This key has a reducer that will combine lists.

In [11]:
class QuoteState(TypedDict):
    subject: str

class Quote(BaseModel):
    quote: str

def generate_quote(state: QuoteState):
    prompt = f"Write a short inspirational quote about {state['subject']}."
    response = model.with_structured_output(Quote).invoke(prompt)
    return {"quotes": [response.quote]}


### Best joke selection (reduce)

Now, we add logic to pick the best joke.

In [12]:
def best_quote(state: OverallState):
    quotes = "\n\n".join(state["quotes"])
    prompt = f"Here are several quotes about {state['topic']}. Choose the most inspiring one and return its ID (starting from 0). Quotes:\n\n{quotes}"
    response = model.with_structured_output(BestQuote).invoke(prompt)
    return {"best_selected_quote": state["quotes"][response.id]}


## Compile

In [None]:
# ✅ Imports
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
from IPython.display import Image
import os, getpass

# ✅ Set API key
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("OPen AI API Key is to be used here")

# ✅ Prompts
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
fact_prompt = """Generate an interesting fact about {subject}."""
best_fact_prompt = """Below are a bunch of facts about {topic}. Select the most fascinating one!
Return the ID of the best one, starting 0 as the ID for the first fact. Facts:\n\n{facts}"""
summary_prompt = """Write a short 2-line summary combining all the best points about {topic} from these facts:\n\n{facts}"""

# ✅ LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)


In [19]:
# ✅ MAP-REDUCE (FACTS VERSION — OFFLINE SIMULATION)

# --- Imports ---
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
from IPython.display import Image

# --- Mock Model to Simulate LLM Behavior ---
class MockModel:
    def with_structured_output(self, schema):
        # returns itself to allow chaining
        return self

    def invoke(self, prompt):
        # Return dummy structured outputs based on the schema
        if "list of 3 sub-topics" in prompt:
            class Dummy:
                subjects = ["Planets", "Stars", "Galaxies"]
            return Dummy()
        elif "interesting fact about" in prompt:
            class Dummy:
                fact = f"Fun fact about {prompt.split('about ')[-1].strip('.')}: It's fascinating!"
            return Dummy()
        elif "Select the most fascinating one" in prompt:
            class Dummy:
                id = 1
            return Dummy()
        else:
            class Dummy:
                text = "Default response"
            return Dummy()

# --- Initialize Mock Model ---
model = MockModel()

# --- Prompts (kept for learning structure) ---
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
fact_prompt = """Generate an interesting fact about {subject}."""
best_fact_prompt = """Below are a bunch of facts about {topic}. Select the most fascinating one!
Return the ID of the best one, starting 0 as the ID for the first fact. Facts:\n\n{facts}"""

# --- Data Models (for structure) ---
class Subjects(BaseModel):
    subjects: list[str]

class BestFact(BaseModel):
    id: int

class Fact(BaseModel):
    fact: str

# --- State Definitions ---
class OverallState(TypedDict):
    topic: str
    subjects: list
    facts: Annotated[list, operator.add]
    best_selected_fact: str

class FactState(TypedDict):
    subject: str

# --- Node 1: Generate sub-topics ---
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}

# --- Node 2: Continue to facts (Map phase dispatcher) ---
def continue_to_facts(state: OverallState):
    return [Send("generate_fact", {"subject": s}) for s in state["subjects"]]

# --- Node 3: Generate facts (Map phase worker) ---
def generate_fact(state: FactState):
    prompt = fact_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Fact).invoke(prompt)
    return {"facts": [response.fact]}

# --- Node 4: Select best fact (Reduce phase) ---
def best_fact(state: OverallState):
    facts = "\n\n".join(state["facts"])
    prompt = best_fact_prompt.format(topic=state["topic"], facts=facts)
    response = model.with_structured_output(BestFact).invoke(prompt)
    return {"best_selected_fact": state["facts"][response.id]}

# --- Build the Graph ---
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_fact", generate_fact)
graph.add_node("best_fact", best_fact)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_facts, ["generate_fact"])
graph.add_edge("generate_fact", "best_fact")
graph.add_edge("best_fact", END)

# --- Compile the Graph ---
app = graph.compile()
Image(app.get_graph().draw_mermaid_png())

# --- Execute the Graph ---
for s in app.stream({"topic": "space"}):
    print(s)

print("\n✅ Best Selected Fact:")
print(s["best_fact"]["best_selected_fact"])


{'generate_topics': {'subjects': ['Planets', 'Stars', 'Galaxies']}}
{'generate_fact': {'facts': ["Fun fact about Planets: It's fascinating!"]}}
{'generate_fact': {'facts': ["Fun fact about Stars: It's fascinating!"]}}
{'generate_fact': {'facts': ["Fun fact about Galaxies: It's fascinating!"]}}
{'best_fact': {'best_selected_fact': "Fun fact about Stars: It's fascinating!"}}

✅ Best Selected Fact:
Fun fact about Stars: It's fascinating!


## Studio

**⚠️ Notice**

Since filming these videos, we've updated Studio so that it can now be run locally and accessed through your browser. This is the preferred way to run Studio instead of using the Desktop App shown in the video. It is now called _LangSmith Studio_ instead of _LangGraph Studio_. Detailed setup instructions are available in the "Getting Setup" guide at the start of the course. You can find a description of Studio [here](https://docs.langchain.com/langsmith/studio), and specific details for local deployment [here](https://docs.langchain.com/langsmith/quick-start-studio#local-development-server).  
To start the local development server, run the following command in your terminal in the `/studio` directory in this module:

```
langgraph dev
```

You should see the following output:
```
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API Docs: http://127.0.0.1:2024/docs
```

Open your browser and navigate to the **Studio UI** URL shown above.

Let's load the above graph in the Studio UI, which uses `module-4/studio/map_reduce.py` set in `module-4/studio/langgraph.json`.