![Image](https://images.edrawmax.com/article2023/what-is-flowchart-parallel-process/example-of-parallel-processing-flowchart.jpg)

![Image](https://www.openfaas.com/images/2022-fan-out-and-back-in-using-functions/fan-out-in-example.png)

![Image](https://www.researchgate.net/publication/320536604/figure/fig2/AS%3A644547914788865%401530683606498/Flowchart-of-parallel-processing.png)

![Image](https://miro.medium.com/1%2ALIUnNdbVbOBbxsXjNR2Sxg.png)



---

## Parallel Workflows in LangGraph (Conceptual Model)

Parallel workflows allow **independent computation paths** to execute concurrently from the same input state and later converge. This pattern is essential when tasks do not depend on each other‚Äôs intermediate results.

### Key Architectural Properties

**Fan-out:**
A single upstream state triggers multiple downstream nodes simultaneously.

**Fan-in:**
Results from parallel nodes converge into a single downstream node.

**Deterministic Merge:**
State updates must be conflict-free or explicitly reducible.

---

## Core Rules That Make Parallelism Work

### 1. Simultaneous Execution

Nodes that:

* Depend on the same input keys
* Do **not** depend on each other‚Äôs outputs

can run in parallel. This reduces latency and avoids unnecessary sequencing.

---

### 2. Partial State Updates (Critical Constraint)

Each parallel node **must return only the keys it owns**.

**Why:**
If two parallel nodes return the full state, LangGraph cannot deterministically decide which version of unchanged keys (e.g., `essay`) is authoritative ‚Üí conflict error.

**Correct pattern**

```python
return {"clarity_fb": "...", "scores": [8]}
```

**Incorrect pattern**

```python
return state  # causes merge conflict
```

---

### 3. Reducers for Shared Keys

When multiple parallel nodes update the **same state key**, a reducer defines how values are merged.

In this project:

* `scores` is written by three parallel nodes
* Reducer: `operator.add`
* Behavior: list concatenation

Without a reducer, LangGraph rejects concurrent writes.

---

### 4. Structured Outputs for Reliability

Each LLM node uses a **Pydantic schema** to enforce:

* Typed outputs
* Bounded values
* Machine-safe merging

This prevents malformed or ambiguous data during fan-in.

---

## Project: UPSC Essay Evaluator (Parallel Evaluation)

![Image](https://www.researchgate.net/publication/329284298/figure/fig2/AS%3A870872009166849%401584643475359/Flow-diagram-of-proposed-automatic-answer-scoring-system.png)

![Image](https://miro.medium.com/v2/resize%3Afit%3A1400/1%2AlBLej7kiabKajpJAdnkxeA.png)

![Image](https://promptengineering.org/content/images/2023/08/Prompt-engineering---Large-Language-Model-LLM-Structure-PromptEngineering.org.jpg)

### Evaluation Dimensions (Parallel)

* Clarity of Thought
* Depth of Analysis
* Language Quality

Each dimension is evaluated **independently** and simultaneously.

---

## Step 1: State and Schema Definition

```python
import operator
from typing import Annotated, List, TypedDict
from pydantic import BaseModel, Field

class EvalSchema(BaseModel):
    feedback: str = Field(description="Detailed feedback")
    score: int = Field(description="Score from 0-10", ge=0, le=10)

class UPSCState(TypedDict):
    essay: str
    language_fb: str
    analysis_fb: str
    clarity_fb: str
    scores: Annotated[List[int], operator.add]
    final_score: float
    summary: str
```

**Why this works**

* `scores` uses a reducer
* All other fields are written by exactly one node
* No ambiguous ownership

---

## Step 2: Parallel Evaluation Nodes (Partial Updates Only)

```python
def eval_language(state: UPSCState):
    output = structured_model.invoke(f"Evaluate language: {state['essay']}")
    return {"language_fb": output.feedback, "scores": [output.score]}

def eval_analysis(state: UPSCState):
    output = structured_model.invoke(f"Evaluate depth: {state['essay']}")
    return {"analysis_fb": output.feedback, "scores": [output.score]}

def eval_clarity(state: UPSCState):
    output = structured_model.invoke(f"Evaluate clarity: {state['essay']}")
    return {"clarity_fb": output.feedback, "scores": [output.score]}
```

**Invariant**

* Each node writes:

  * One feedback field
  * One score contribution

---

## Step 3: Fan-In Aggregator Node

```python
def final_eval(state: UPSCState):
    avg = sum(state["scores"]) / len(state["scores"])
    summary_prompt = (
        f"Summarize: {state['language_fb']}, "
        f"{state['analysis_fb']}, {state['clarity_fb']}"
    )
    summary = model.invoke(summary_prompt).content
    return {"final_score": avg, "summary": summary}
```

**Aggregator responsibilities**

* Read-only access to parallel outputs
* Deterministic reduction
* No further parallel writes

---

## Step 4: Graph Construction (Fan-Out / Fan-In)

![Image](https://www.gettingstarted.ai/content/images/2024/10/GS-LangGraph-Graph-Diagram-1.jpg)

![Image](https://www.researchgate.net/publication/335990932/figure/fig3/AS%3A806524733640704%401569301889114/Fan-in-fan-out-and-moderate-scale-circuits-a-Fan-in-by-a-four-input-OR-gate-b.png)

```python
from langgraph.graph import StateGraph, START, END

builder = StateGraph(UPSCState)

builder.add_node("language", eval_language)
builder.add_node("analysis", eval_analysis)
builder.add_node("clarity", eval_clarity)
builder.add_node("aggregator", final_eval)

builder.add_edge(START, "language")
builder.add_edge(START, "analysis")
builder.add_edge(START, "clarity")

builder.add_edge("language", "aggregator")
builder.add_edge("analysis", "aggregator")
builder.add_edge("clarity", "aggregator")

builder.add_edge("aggregator", END)

workflow = builder.compile()
```

**Parallelism is defined purely by edges**, not threads or async code.

---

## Why This Design Is Correct

* No shared mutable state without reducers
* Deterministic merges
* Independent reasoning paths
* Scales linearly with additional evaluators

---

## Interview Questions and Answers (Parallel Workflows in LangGraph)

### Q1. Why do parallel nodes in LangGraph return partial state instead of full state?

**Answer:**
Because multiple nodes execute concurrently. Returning the full state would cause conflicting writes for keys that were not actually modified. Partial updates ensure deterministic merges.

---

### Q2. What happens if two parallel nodes update the same key without a reducer?

**Answer:**
LangGraph raises a conflict error and halts execution. Concurrent writes to the same key require an explicit reducer to define merge semantics.

---

### Q3. Why is `operator.add` commonly used as a reducer?

**Answer:**
It provides deterministic, associative merging for lists or numeric accumulations. This makes it ideal for collecting scores, messages, or logs from parallel nodes.

---

### Q4. Can parallel nodes depend on each other‚Äôs outputs?

**Answer:**
No. Any dependency introduces an execution order, which violates parallel execution. Dependent logic must be placed downstream in a fan-in node.

---

### Q5. How does LangGraph achieve parallelism without explicit async code?

**Answer:**
Parallelism is derived from graph topology. Nodes with no interdependencies and the same upstream edge are executed concurrently by the runtime.

---

### Q6. Why are structured outputs critical in parallel LLM workflows?

**Answer:**
They enforce type safety and bounded values, preventing malformed outputs from corrupting shared state during merges.

---

### Q7. How would you add another evaluation dimension?

**Answer:**

* Add a new node
* Add an edge from `START` to that node
* Add an edge from the node to the aggregator
* Append its score using the same reducer

No other changes are required.

---




# 1Ô∏è‚É£ Install Dependencies (Notebook Cell)

```python
!pip install langgraph==0.0.46 langchain-openai==0.2.6 langchain-core==0.3.29 pydantic==2.6.4
```

---

# 2Ô∏è‚É£ Imports

```python
import operator
from typing import TypedDict, List, Annotated

from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
```

---

# 3Ô∏è‚É£ Initialize GPT Model (Direct API Key)

```python
llm = ChatOpenAI(
    model="gpt-4o-mini",
    api_key="sk-PASTE-YOUR-API-KEY-HERE",
    temperature=0.0,
    max_retries=2,
    timeout=30
)
```

---

# 4Ô∏è‚É£ Structured Output Schema (LLM Contract)

```python
class EvalSchema(BaseModel):
    feedback: str = Field(description="Detailed feedback")
    score: int = Field(description="Score from 0 to 10", ge=0, le=10)
```

This ensures:

* Deterministic output
* Safe numeric bounds
* Machine-readable responses

---

# 5Ô∏è‚É£ Define Global Graph State (With Reducer)

```python
class UPSCState(TypedDict):
    essay: str

    language_fb: str
    analysis_fb: str
    clarity_fb: str

    scores: Annotated[List[int], operator.add]

    final_score: float
    summary: str
```

Key rule applied:

* `scores` uses `operator.add` to merge parallel writes

---

# 6Ô∏è‚É£ Helper Function for Structured GPT Calls

```python
def run_structured_eval(prompt: str) -> EvalSchema:
    structured_llm = llm.with_structured_output(EvalSchema)
    return structured_llm.invoke(prompt)
```

---

# 7Ô∏è‚É£ Parallel Evaluation Nodes

Each node:

* Reads `essay`
* Writes only its own keys
* Appends one score

---

### üü¶ Language Evaluation Node

```python
def eval_language(state: UPSCState):
    result = run_structured_eval(
        f"Evaluate the language quality of this UPSC essay:\n\n{state['essay']}"
    )
    return {
        "language_fb": result.feedback,
        "scores": [result.score]
    }
```

---

### üü¶ Depth of Analysis Node

```python
def eval_analysis(state: UPSCState):
    result = run_structured_eval(
        f"Evaluate the depth of analysis in this UPSC essay:\n\n{state['essay']}"
    )
    return {
        "analysis_fb": result.feedback,
        "scores": [result.score]
    }
```

---

### üü¶ Clarity of Thought Node

```python
def eval_clarity(state: UPSCState):
    result = run_structured_eval(
        f"Evaluate the clarity of thought in this UPSC essay:\n\n{state['essay']}"
    )
    return {
        "clarity_fb": result.feedback,
        "scores": [result.score]
    }
```

---

# 8Ô∏è‚É£ Aggregator Node (Fan-In)

This node runs **after all parallel nodes finish**.

```python
def final_evaluation(state: UPSCState):
    avg_score = sum(state["scores"]) / len(state["scores"])

    summary_prompt = f"""
    Combine the following feedback into one final evaluation:

    Language Feedback: {state['language_fb']}
    Analysis Feedback: {state['analysis_fb']}
    Clarity Feedback: {state['clarity_fb']}
    """

    summary = llm.invoke(summary_prompt).content

    return {
        "final_score": avg_score,
        "summary": summary
    }
```

---

# 9Ô∏è‚É£ Build the LangGraph (Parallel Topology)

```python
builder = StateGraph(UPSCState)

# Register nodes
builder.add_node("language", eval_language)
builder.add_node("analysis", eval_analysis)
builder.add_node("clarity", eval_clarity)
builder.add_node("aggregator", final_evaluation)

# Fan-out (parallel execution)
builder.add_edge(START, "language")
builder.add_edge(START, "analysis")
builder.add_edge(START, "clarity")

# Fan-in
builder.add_edge("language", "aggregator")
builder.add_edge("analysis", "aggregator")
builder.add_edge("clarity", "aggregator")

builder.add_edge("aggregator", END)

workflow = builder.compile()
```

---

# üîü Execute the Workflow

```python
essay_text = """
Democracy is not merely a system of governance but a way of life.
It depends on informed citizens, ethical leadership, and strong institutions.
"""

initial_state = {
    "essay": essay_text,
    "scores": []
}

result = workflow.invoke(initial_state)

print("Final Score:", result["final_score"])
print("\nFinal Summary:\n", result["summary"])
```

---

## Execution Flow (What Actually Happens)

1. Essay enters at `START`
2. Three evaluator nodes run **in parallel**
3. Each node:

   * Calls GPT independently
   * Returns partial state
4. `scores` list is merged via reducer
5. Aggregator calculates average + summary
6. Graph exits at `END`

---



