# Reducers

In computational systems, reducers are functions or mechanisms that determine how to merge, update, or aggregate state changes. In the context of tools like LangGraph, reducers play a crucial role in handling state updates during the execution of workflows or pipelines, especially when multiple nodes or processes interact with the same part of the state.

A reducer takes the current state and the updates proposed by different components (nodes, agents, or functions) and produces a consolidated state based on predefined rules. Reducers ensure that updates are merged logically and consistently, avoiding conflicts and ambiguity. 

Reducers avoid conflicts in modifying state from different nodes at once, and give a structured way of updating state values.

## Practical example

LangGraph provides a default reducer, the `operator.add` reducer: it adds values to the current state in a list. This is very useful, but somewhat limited in agentic systems with broader scopes. But no problem: we can implement our custom reducer, and its easy enough to do it. 

Let's make a simple example: let's say that we want our agent to store files in a dictionary in state. 

In this way our agent has access to a "virtual file system" that he can access. 

It's a useful practice for context engineering, that replicates writing to file system and accessing the files with `file.read()`. 

### Custom Reducer *(the right way)*

Let's first see how **it should** be done

In [None]:
from typing import List, Union
from typing_extensions import Dict

def add_file(current_files: Union[Dict[str, str], None] = None, 
             new_file: Union[Dict[str, str], None] = None
)-> Dict[str, str]:
    """
    Reducer to add a file to the state `equations` dictionary
    """

    left = current_files or {}
    right = new_file or {}

    return {**left, **right}

>**Note:** the syntax `{**left, **right}` is python for: merge right dictionary into left, updating only new keys that are not already in left. Very useful.

Reducers should **always** follow the logic: 
```bash
(left, right) -> merged
```
In the sense that they should ALWAYS produce a new element, rather than updating existing ones. 

This is incredibly important, since updating existing elements can cause deep memory issues and conflicts in state serialization when we use memory checkpointers. 

Let's see an example with `List` reducers:

In [None]:
def add_to_list(current_list: Union[List[str], None] = None,
                new_item: Union[str, None] = None) -> List[str]:
    """
    Reducer to add an item to the state list
    """
    left = current_list or []
    if new_item is None:
        return left[:]   # copia immutabile
    if new_item in left:
        return left[:]   # già presente, restituisco copia
    return left + [new_item]   # nuova lista, aggiunto in coda

Let us show how it **should not** be done as well

### Custom Reducer: **the wrong way**

It would be wrong to update existing dictionaries or strings:

In [None]:
def wrong_dict_merge(existing_dict: Union[dict, None] = None, new_dict: Union[dict, None] = None) -> dict:  
    
    if existing_dict is None:
        existing_dict = {}
    if new_dict is None:
        new_dict = {}

    for filename, content in new_dict.items(): 
        if filename in existing_dict.keys(): 
            return existing_dict 
        existing_dict[filename] = content   # WRONG!

    return existing_dict

In [None]:
def wrong_list_add(current_list: Union[List[str], None] = None, 
                   new_entry: Union[str, None] = None
)-> List[str]:
    if current_list is None:
        current_list = []
    if new_entry is None:
        return current_list
    
    if new_entry in current_list:
        return current_list
    else:
        new_list = current_list.append(new_entry)   # WRONG! 

    return new_list

As you can see both these examples **do not** follow the logic

```bash
(left, right) -> merged
```

## RAGV4 example - integrating a custom reducer 

Let's try and refine our custom `DatasetState` class from `09.3_InjectedState.ipynb`, adding a custom reducer to it. 

In [None]:
def merge_dictionary_entries(existing_dict: Union[dict, None] = None, new_dict: Union[dict, None] = None) -> dict:
    """
    Custom reducer to merge dictionary updates:
    adds keys from new_dict only if they are not already in existing_dict.
    """

    if not existing_dict:
        existing_dict = {}
    if not new_dict:
        new_dict = {}
    
    return {**existing_dict, **new_dict}


>**Note:** Reducers should be pure functions, without any print/debug statements.

Now we can annotated our custom state schema like this:

In [None]:
from typing import Annotated
from langgraph.prebuilt.chat_agent_executor import AgentState
from typing import Dict, Union
import pandas as pd
import geopandas as gpd

class DatasetState(AgentState):
    loaded: Annotated[Dict[str, Union[pd.DataFrame, gpd.GeoDataFrame]], merge_dictionary_entries]
    descriptions: Annotated[Dict[str, str], merge_dictionary_entries]

And we can try running our agent with this refined `state_schema`:

### Update RAGV4 to use custom reducer


In [None]:
from langchain_core.tools import tool, InjectedToolCallId
from langchain_core.messages import ToolMessage
import geopandas as gpd
import pandas as pd
import os
from pathlib import Path
from langgraph.prebuilt import InjectedState
from typing_extensions import Annotated
from langchain_experimental.utilities import PythonREPL
from langgraph.types import Command

DATASET_FOLDER = "./LLM_data"

The following tools stay the same...

In [None]:
# ----------------------
# Tool: list datasets
# ----------------------
@tool
def list_loadable_datasets() -> str:
    """Lists all available parquet datasets in the dataset folder."""
    files = [f for f in os.listdir(DATASET_FOLDER) if f.endswith(".parquet")]
    return "\n".join(files) if files else "No parquet datasets found."

@tool
def list_inmemory_datasets(
    state: Annotated[DatasetState, InjectedState],
    tool_call_id: Annotated[str, InjectedToolCallId]
) -> Command:
    """Lists all loaded datasets and their type (DataFrame or GeoDataFrame)."""
    if not state["loaded"]:
        output = "No loaded datasets in memory. Use list_loadable_datasets() to see available files."
    else:
        lines = [
            f"- {name}: {'GeoDataFrame' if isinstance(df, gpd.GeoDataFrame) else 'DataFrame'} (shape={df.shape})"
            for name, df in state["loaded"].items()
        ]
        output = "\n".join(lines)

    return Command(update={
        "messages": [ToolMessage(content=output, tool_call_id=tool_call_id)],
    })

# ----------------------
# Tool: python repl
# ----------------------
repl = PythonREPL()
# Now use the tool with your injected REPL
@tool
def python_repl_tool(
    code: Annotated[str, "The python code to execute"], state: Annotated[DatasetState, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId]
) -> Command:
    """
    Use this to execute python code. If you want to see the output of a value,
    print it out with `print(...)`. This is visible to the user.
    """

    for name, df in state["loaded"].items():
        repl.globals[name] = df

    # Inject descriptions as a dictionary
    repl.globals["descriptions"] = state["descriptions"]
    
    try:
        result = repl.run(code)
    except BaseException as e:
        tool_err_1 = f"Failed to execute. Error: {repr(e)}"
        return Command(update={"messages": [ToolMessage(content=tool_err_1, tool_call_id=tool_call_id)]})
    
    tool_output = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
    return Command(update={"messages": [ToolMessage(content=tool_output, tool_call_id=tool_call_id)]})


We only change the loader tools, separating the logic of loading the description and enriching it. 

In this way the tools could be parallelized, otherwise we would need to read `state['loaded']` while also reading and updating `state['descriptions]`. 

In [None]:
@tool
def load_dataset(file_name: str, tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
    """
    Loads a Parquet dataset (optionally as GeoDataFrame) and updates state['loaded'][name].
    """
    update = {}

    file_stem = Path(file_name).stem
    file_name = f"{file_stem}.parquet"
    path = Path(DATASET_FOLDER) / file_name

    if not path.exists():
        tool_err = f"File '{file_name}' not found."
        return Command(
            update={
                "messages" : [ToolMessage(content=tool_err, tool_call_id=tool_call_id)]
            }
        )

    try:
        df = pd.read_parquet(path)
        if "geometry" in df.columns:
            try:
                df = gpd.read_parquet(path)
            except Exception as geo_err:
                tool_err2 = f"Failed to load as GeoDataFrame: {geo_err}"
                return Command(
                    update={
                        "messages" : [ToolMessage(content=tool_err2, tool_call_id=tool_call_id)]
                    }
                )
        # add df to update dictionary
        update[file_stem] = df

    except Exception as e:
        tool_err3 = f"Error loading dataset: {e}"
        return Command(
                    update={
                        "messages" : [ToolMessage(content=tool_err3, tool_call_id=tool_call_id)]
                    }
                )
    return Command(update={
        "loaded": update,   
        "messages": [
            ToolMessage(f"Loaded dataset '{file_stem}' into memory.", tool_call_id=tool_call_id)
        ]
    })


@tool
def load_description(file_name: str, tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
    """
    Loads and the description file for a dataset and updates state['descriptions'][name].
    """
    update = {}

    file_stem = Path(file_name).stem
    desc_path = Path(DATASET_FOLDER) / f"{file_stem}.txt"

    try:
        with open(desc_path, "r", encoding="utf-8") as f:
            desc = f.read()
    except Exception:
        desc = "Description file missing or unreadable."

    update[file_stem] = desc

    return Command(update={
        "descriptions": update,
        "messages": [
            ToolMessage(content=f"Loaded description for '{file_stem}'.", tool_call_id=tool_call_id)
        ]
    })


# ----------------------
# Tool: describe_dataset
# ----------------------
@tool
def describe_dataset(name: str, state: Annotated[DatasetState, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
    """
    Generates a detailed description for a loaded dataset by combining its raw metadata 
    with structural information (type, columns, and preview).

    This function assumes the dataset and its description have already been loaded into state.
    It returns an enriched summary including:
    - the dataset type (DataFrame or GeoDataFrame),
    - the raw description (from text file),
    - a preview of the first few rows,
    - and the list of column names.

    Raises:
        ValueError: If the dataset or its description is missing from state.
    """
    loaded = state.get('loaded')
    descriptions = state.get('descriptions')

    df = loaded.get(name)
    if df is None:
        loaded_keys = list(loaded.keys())
        available_data = [f for f in os.listdir(DATASET_FOLDER) if f.endswith(".parquet")]
        tool_err = f"Dataset '{name}' not found. \nLoaded datasets are: {loaded_keys} \nAvailable datasets to load are {available_data}"
        return Command(update={"messages": [ToolMessage(content=tool_err, tool_call_id=tool_call_id)]})
        
    raw_desc = descriptions.get(name)
    if raw_desc is None:
        tool_err2 = f"Description for dataset '{name}' is missing. Be sure to run 'load_description' first."
        return Command(update={"messages": [ToolMessage(content=tool_err2, tool_call_id=tool_call_id)]})

    dtype_str = type(df).__name__
    head_str = df.head().to_string(index=False)
    cols_str = ", ".join(df.columns)

    tool_result = f"{dtype_str}\n{raw_desc}\n---\nPreview (first rows):\n{head_str}\n\nColumns: {cols_str}"
    return Command(
        update={
            "messages": [
                ToolMessage(content=tool_result, tool_call_id=tool_call_id)
                ]
            }
        )

In [14]:
prompt = (
    "You are a data analyst. Use your tools to explore and load datasets relevant to the task.\n"
    "The files you need to load are in the subdirectory at ./LLM_data\n"
    "Datasets are stored as `file_name.parquet`, while their descriptions are stored as `file_name.txt`.\n\n"
    "Always check the description before working with a DataFrame, to understand the columns and data values.\n"
    "You can check which datasets are currently loaded with the `list_inmemory_datasets()` tool, \
    and which datasets are available to load using the `list_loadable_datasets()` tool.\n\n"
    "In your python_repl_tool(), loaded datasets will appear as variables (e.g., `quartieri`), and descriptions are stored in the `descriptions` dictionary (e.g., `descriptions['quartieri']`).\n"
    "When printing Python code, ALWAYS use `print(...)`. Do NOT rely on implicit output like `quartieri.head()` — it will not be visible."
)

An helper function for pretty printing:

In [15]:
from langchain_core.messages import convert_to_messages


def pretty_print_message(message, indent=False):
    pretty_message = message.pretty_repr(html=True)
    if not indent:
        print(pretty_message)
        return

    indented = "\n".join("\t" + c for c in pretty_message.split("\n"))
    print(indented)


def pretty_print_messages(update, last_message=False):
    is_subgraph = False
    if isinstance(update, tuple):
        ns, update = update
        # skip parent graph updates in the printouts
        if len(ns) == 0:
            return

        graph_id = ns[-1].split(":")[0]
        print(f"Update from subgraph {graph_id}:")
        print("\n")
        is_subgraph = True

    for node_name, node_update in update.items():
        update_label = f"Update from node {node_name}:"
        if is_subgraph:
            update_label = "\t" + update_label

        print(update_label)
        print("\n")

        messages = convert_to_messages(node_update["messages"])
        if last_message:
            messages = messages[-1:]

        for m in messages:
            pretty_print_message(m, indent=is_subgraph)
        print("\n")

Now we should be able to use gpt-4o without desabling parallel tool calls! Let's see...

In [18]:
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage

analyst_agent = create_react_agent(
    model="openai:gpt-4o-mini",  # (!) this will use parallel tool calls by default 
    tools=[list_loadable_datasets, list_inmemory_datasets, load_dataset, load_description, describe_dataset, python_repl_tool],
    prompt=prompt,
    name="data_analyst",
    state_schema=DatasetState
)

initial_state = {
    "messages": [HumanMessage(content="Load two datasets of your choice and their descriptions. Then tell me something about the datasets.")],
    "remaining_steps": 20,
    "loaded": {},
    "descriptions": {}
}

for chunk in analyst_agent.stream(initial_state):
    pretty_print_messages(chunk)

Update from node agent:


Name: data_analyst
Tool Calls:
  list_loadable_datasets (call_xWlbyqQn7MBvi6iOoTYD8tfR)
 Call ID: call_xWlbyqQn7MBvi6iOoTYD8tfR
  Args:


Update from node tools:


Name: list_loadable_datasets

neighborhoods.parquet
public_bathrooms.parquet
median_income_by_statistical_area.parquet
neighborhood_residents_data_1986to2024.parquet
neighborhood_socio_demographic_data_lastupdated2019.parquet
statistical_zones.parquet
pharmacies.parquet
points_of_interest.parquet


Update from node agent:


Name: data_analyst
Tool Calls:
  load_dataset (call_JIMjQIkd0HBEzDDjavU5m7wk)
 Call ID: call_JIMjQIkd0HBEzDDjavU5m7wk
  Args:
    file_name: neighborhoods.parquet
  load_description (call_e60Lqujs8ciGKdjG42LoLufF)
 Call ID: call_e60Lqujs8ciGKdjG42LoLufF
  Args:
    file_name: neighborhoods.txt
  load_dataset (call_LykDwIXr2SkjHLsw7oWiansA)
 Call ID: call_LykDwIXr2SkjHLsw7oWiansA
  Args:
    file_name: public_bathrooms.parquet
  load_description (call_9wws2I4sWijSLC8d7F3l4DpL)
 Ca

Working! Very nice. Let's try to load the same dataset twice:

In [21]:
another_initial_state = {
    "messages": [HumanMessage(content="Load a dataset of your choice and its descriptions, then try to load it again. Then list the datasets in memory." \
    "List the datasets you have in memory at the end to see if we have duplicates.")],
    "remaining_steps": 20,
    "loaded": {},
    "descriptions": {}
}

for chunk in analyst_agent.stream(another_initial_state):
    pretty_print_messages(chunk)

Update from node agent:


Name: data_analyst
Tool Calls:
  list_loadable_datasets (call_X1KdpqOQbRa83SvkosEcQ4Er)
 Call ID: call_X1KdpqOQbRa83SvkosEcQ4Er
  Args:


Update from node tools:


Name: list_loadable_datasets

neighborhoods.parquet
public_bathrooms.parquet
median_income_by_statistical_area.parquet
neighborhood_residents_data_1986to2024.parquet
neighborhood_socio_demographic_data_lastupdated2019.parquet
statistical_zones.parquet
pharmacies.parquet
points_of_interest.parquet


Update from node agent:


Name: data_analyst
Tool Calls:
  load_dataset (call_bklNo0c7pbQb6x04t8RjxgGH)
 Call ID: call_bklNo0c7pbQb6x04t8RjxgGH
  Args:
    file_name: neighborhoods.parquet
  load_description (call_QSJzsw3kmNVEhaXoSAK9VAGq)
 Call ID: call_QSJzsw3kmNVEhaXoSAK9VAGq
  Args:
    file_name: neighborhoods.txt


Update from node tools:


Name: load_description

Loaded and enriched description for 'neighborhoods'.


Update from node tools:


Name: load_dataset

Loaded dataset 'neighborhoods' into 

This is funny, he allucinates that the dataset was duplicated even though the custom reducer avoided this. The model sees that it called load_dataset twice with the same name and makes a logical deduction - which is wrong. 

Everything is fine, and our `state_schema` looks perfect now.