# aMapReduce Framework

Agentics enable scalable execution of LLM workflows by implementing a MapReduce framework which enable the async use of LLM blended with regular python code.

In [1]:
! uv pip install agentics-py


import os
from pathlib import Path
import sys
from getpass import getpass

from dotenv import find_dotenv, load_dotenv

CURRENT_PATH = ""

IN_COLAB = "google.colab" in sys.modules
print("In Colab:", IN_COLAB)


if IN_COLAB:
    CURRENT_PATH = "/content/drive/MyDrive/"
    # Mount your google drive
    from google.colab import drive

    drive.mount("/content/drive")
    from google.colab import userdata

    os.environ["GEMINI_API_KEY"] = userdata.get("GEMINI_API_KEY")
else:

    CURRENT_PATH = os.getcwd()
    load_dotenv(find_dotenv())

if not os.getenv("GEMINI_API_KEY"):
    os.environ["GEMINI_API_KEY"] = getpass("Enter your GEMINI_API_KEY:")

base = Path(CURRENT_PATH)

[2mUsing Python 3.11.13 environment at: /Users/millendroy/anaconda3/envs/agentics[0m
[2mAudited [1m1 package[0m [2min 19ms[0m[0m
In Colab: False


Let us first define an aType to represent StockMarket Data for the DowJones index, and populate it with historical data

In [3]:
from agentics import Agentics as AG
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime

## Define the data model for stock market data


class StockMarketState(BaseModel):
    date: Optional[str] = None
    open: Optional[float] = None
    high: Optional[float] = None
    low: Optional[float] = None
    close: Optional[float] = None
    volume: Optional[float] = None
    daily_range: Optional[float] = Field(
        None,
        description="""The difference between the high and low prices for the day.""",
    )
    news: Optional[str] = Field(
        None,
        description="""Text reporting a list of news headlines relevant to the stock for the day.""",
    )
    explanation_report: Optional[str] = Field(
        None,
        description="A detailed explanation of the stock market state for the day.",
    )


## import the data
dj_data = AG.from_csv(base / "data/dow_jones.csv", atype=StockMarketState)
for state in dj_data[:3]:
    print(state)

[32m2025-10-04 13:02:02.605[0m | [34m[1mDEBUG   [0m | [36magentics.core.agentics[0m:[36mfrom_csv[0m:[36m312[0m - [34m[1mImporting Agentics of type StockMarketState from CSV /Users/millendroy/Projects/Agentic_Energy/Agentics_for_EnergyArbitrage_Battery/tutorials/data/dow_jones.csv[0m


date='2016-07-01' open=17924.240234 high=18002.380859 low=17916.910156 close=17949.369141 volume=82160000.0 daily_range=None news=None explanation_report=None
date='2016-06-30' open=17712.759766 high=17930.609375 low=17711.800781 close=17929.990234 volume=133030000.0 daily_range=None news=None explanation_report=None
date='2016-06-29' open=17456.019531 high=17704.509766 low=17456.019531 close=17694.679688 volume=106380000.0 daily_range=None news=None explanation_report=None


In [4]:
dj_data.pretty_print()

"Atype : <class '__main__.StockMarketState'>\ndate: '2016-07-01'\nopen: 17924.240234\nhigh: 18002.380859\nlow: 17916.910156\nclose: 17949.369141\nvolume: 82160000.0\ndaily_range: null\nnews: null\nexplanation_report: null\n\ndate: '2016-06-30'\nopen: 17712.759766\nhigh: 17930.609375\nlow: 17711.800781\nclose: 17929.990234\nvolume: 133030000.0\ndaily_range: null\nnews: null\nexplanation_report: null\n\ndate: '2016-06-29'\nopen: 17456.019531\nhigh: 17704.509766\nlow: 17456.019531\nclose: 17694.679688\nvolume: 106380000.0\ndaily_range: null\nnews: null\nexplanation_report: null\n\ndate: '2016-06-28'\nopen: 17190.509766\nhigh: 17409.720703\nlow: 17190.509766\nclose: 17409.720703\nvolume: 112190000.0\ndaily_range: null\nnews: null\nexplanation_report: null\n\ndate: '2016-06-27'\nopen: 17355.210938\nhigh: 17355.210938\nlow: 17063.080078\nclose: 17140.240234\nvolume: 138740000.0\ndaily_range: null\nnews: null\nexplanation_report: null\n\ndate: '2016-06-24'\nopen: 17946.630859\nhigh: 17946.630

## Amap

Amap functions enable async execution of functions over all the states of an AG. Agentics supports 1:1 maps that maps all states of an AG into states of the same type.

In the following example we define a simple function to compute the daily_range of the stock and we pass that to an amap fuction which applies that to all states asyncronously

In [None]:
## Note that input and output are both StockMarketState objects
async def get_daily_variation_percentage(state: StockMarketState) -> StockMarketState:
    state.daily_range = (float(state.high) - float(state.low)) / float(state.low) * 100
    return state


## Apply the function to all states using amap
dj_data = await dj_data.amap(get_daily_variation_percentage)

for state in dj_data[:3]:
    print(f"Date: {state.date}, Daily Range: {state.daily_range}")

## aReduce

Reduce functions enable executing operations on the entire list of elements (states) within an Agentics group. Although reduce operations are intrinsically synchronous—since they consider all states at once—they are defined as async functions to allow for internal async calls (such as fetching news or running LLMs).

In the following example we will use a reduce function to analyze get the top 10 days with highest variation in the market

In [None]:
async def get_highest_volatility_days(
    states: list[StockMarketState],
) -> list[StockMarketState]:

    # sort the states by volatility and return the top 10, define a new AG with these states
    return sorted(
        states,
        key=lambda x: abs(x.daily_range) if x.daily_range is not None else 0,
        reverse=True,
    )[:10]


# apply the reduce function to get the top 10 days with highest volatility
highest_volatility_days = await dj_data.areduce(get_highest_volatility_days)
print(highest_volatility_days.pretty_print())

Now let's use self transduction to provide an explanation for the market volatility

## Complex AMAPs

aMaps function can contain external API and LLM calls. This way we can use agentics as a scaleout frameworks for complex workflows. 

In [None]:
from ddgs import DDGS


## Define a function to get news for a given date using the DDGS search engine
## Note that the similar functionalities can be implemented using MCP tools in AGs
async def get_news(state):
    state.news = str(
        DDGS().text(
            f"What happended to the stock market and dow jones on {state.date}",
            max_results=10,
        )
    )
    return state


# Now get news for the top 10 days with highest volatility using amap
highest_volatility_days = await highest_volatility_days.amap(get_news)

# print the first result for brevity
print(highest_volatility_days[0])
# print(f"Date: {highest_volatility_days[0].date}, Daily Range: {highest_volatility_days[0].daily_range}, News: {highest_volatility_days[0].news[:200]}")

In [None]:
from agentics.core.llm_connections import get_llm_provider

highest_volatility_days.instructions = """Explain the reasons why the market went down or up 
given the high volatility in the stock market on this day based on the news provided. 
Provide a concise summary."""
highest_volatility_days.llm = (
    get_llm_provider()
)  ## You can choose between "openai", "watsonx", "gemini", "vllm_crewai"
highest_volatility_explanations = await highest_volatility_days.self_transduction(
    ["date", "open", "high", "low", "close", "volume", "daily_range", "news"],
    ["explanation_report"],
)

for state in highest_volatility_explanations:
    print(
        f"Date: {state.date}, Daily Range: {state.daily_range}\nExplanation: {state.explanation_report}..."
    )

## Well Done
You are now fully equipped to work with agentics and apply it to your data.
Congratulations and please contribute back to the community if you feel this is exciting. 