In [6]:
pip install "pydantic>=2"

Note: you may need to restart the kernel to use updated packages.


# Multi‑Agent Revenue & Churn Analyzer — Planner → Executor → Validator → Reflection

This notebook demonstrates a production‑minded multi‑agent pipeline operating on `data/ar 7.xlsx`.

Goals:
- Show deterministic tooling for all numeric work (Pandas)
- Planner-driven task decomposition
- Memory separation (short-term & long-term)
- Token-aware chunking strategy for LLM contexts
- Validator + retry/self‑healing + Reflection loop


In [2]:
# Section 1 — Environment & repo sanity checks
import os
from pathlib import Path
import pandas as pd

DATA_PATH = Path("data/ar 7.xlsx")
assert DATA_PATH.exists(), f"Input file not found: {DATA_PATH}"
print("Found:", DATA_PATH)

# quick smoke-load (first 5 rows)
df_smoke = pd.read_excel(DATA_PATH, engine="openpyxl")
df_smoke.head()


Found: data/ar 7.xlsx


Unnamed: 0,S. no.,Entity\nUpto Mar 2024,Entity April 2024,Entity grouped,Employee Name,Entity (A),GRR/NRR Tagging,Customer Name,Brand Name (Temp),Cohort ID,...,2024-03-01 00:00:00,2024-04-01 00:00:00,2024-05-01 00:00:00,2024-06-01 00:00:00,2024-07-01 00:00:00,2024-08-01 00:00:00,2024-09-01 00:00:00,2024-10-01 00:00:00,2024-11-01 00:00:00,2024-12-01 00:00:00
0,1,GCSF,GCSF,GCSF,,GCSF,Gaming,2K Games,,,...,54615.32,60466.33,66330.33,16704.666667,5659.98,0.0,17470.0,25503.333333,25133.333333,26034.0
1,2,Core,Core,Core,,Core,One-time,One-time_APAC,One-time_APAC,,...,20482.163,26705.116954,16632.543,17345.0,26832.662169,21430.162892,11369.447711,8329.178313,11112.859759,11699.164016
2,3,Core,Core,Core,,Core,One-time,One-time_USA,One-time_USA,,...,3157.083,1149.5175,3000.0,0.0,6500.179525,1935.0,23400.0,40721.864153,44566.0,21090.34924
3,4,Core,Core,Core,,Core,One-time,One-time_EMEA,One-time_EMEA,,...,0.0,349.2845,580.97,4300.56,-0.179525,4417.62,4417.62,4351.22,2517.06,5243.0
4,5,Core,Core,Core,,Core,Core APAC,Agency Brandmap,Agency Brandmap,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [3]:
# Section 2 — Deterministic data loader for ar 7.xlsx
from agents.tools import DataLoader

# deterministic load
df_full = DataLoader.load_excel(DATA_PATH.as_posix())
print(f"Rows: {len(df_full)}, Revenue sum: {df_full['Revenue'].sum():.2f}")

# show chunking API (deterministic, row-based)
for i, chunk in enumerate(DataLoader.chunk_dataframe(df_full, chunk_size=5000)):
    print(f"chunk {i+1}: {len(chunk)} rows")
    if i >= 1:
        break


Rows: 4656, Revenue sum: 50820390.84
chunk 1: 4656 rows


In [4]:
# Section 3 — Core deterministic tools: assign_quarter, aggregate_by_region, compute_churn
from agents.tools import assign_quarter, Aggregator, ChurnCalculator

# assign quarter
df_q = assign_quarter(df_full, date_col="Date")
assert "Quarter" in df_q.columns

# quarter-region aggregation
agg_q = Aggregator.aggregate_by_quarter_region(df_q)
agg_q.head()

# client x quarter pivot (for churn)
client_q = Aggregator.client_quarterly_revenue(df_q)
client_q.head()


Quarter,Client ID,2024Q1,2024Q2,2024Q3,2024Q4
0,2K Games,172752.3,143501.326667,23129.98,76670.666667
1,2Sodas,4000.0,0.0,0.0,0.0
2,4AllPromos,25850.86,86785.9118,101972.6925,92653.1041
3,A.O. Smith Corporation,87164.52,74300.0,72375.0,86250.0
4,"AEP Energy, Inc.",37725.0,37725.0,63425.0,76275.0


In [5]:
# Section 4 — Pydantic schemas & structured JSON contracts
from pydantic import BaseModel, Field
from typing import List, Dict, Any

class RowModel(BaseModel):
    Client_ID: str = Field(..., alias="Client ID")
    Region: str
    Country: str
    Date: str
    Revenue: float

class ChunkSummary(BaseModel):
    rows: int
    revenue_sum: float

class QuarterAggregate(BaseModel):
    Quarter: str
    Region: str
    Country: str
    TotalRevenue: float
    NumRows: int

# validate a small sample
sample = {"rows": 10, "revenue_sum": 1234.0}
ChunkSummary(**sample)


ChunkSummary(rows=10, revenue_sum=1234.0)

In [6]:
# Section 5 — Token management & chunking strategy
import math

N = len(df_full)
CHUNK_SIZE = 5000
chunks = math.ceil(N / CHUNK_SIZE)
print(f"Total rows={N}, chunk_size={CHUNK_SIZE}, chunks={chunks}")

# simple token estimator: 1 token ≈ 4 chars
def estimate_tokens(text: str) -> int:
    return max(1, len(text) // 4)

# simulate token usage for summaries
summaries = [f"chunk_{i}: rows={min(CHUNK_SIZE, max(0, N - i*CHUNK_SIZE))}" for i in range(chunks)]
total_tokens = sum(estimate_tokens(s) for s in summaries) + estimate_tokens("analysis_prompt")
print(f"Estimated tokens for summaries + prompt: {total_tokens}")

# guard: if token budget exceeded, reduce summaries (keep top-K)
TOKEN_BUDGET = 16000
if total_tokens > TOKEN_BUDGET:
    print("Token budget exceeded — will compress summaries before sending to any LLM")
else:
    print("Within token budget — safe to include aggregated summaries in context")


Total rows=4656, chunk_size=5000, chunks=1
Estimated tokens for summaries + prompt: 7
Within token budget — safe to include aggregated summaries in context


In [None]:
# Section 6 — Memory manager: short-term vs long-term + compression
from agents.tools import MemoryManager

mem = MemoryManager(path="outputs/memory.json")
mem.store_step_summary("demo_chunk", {"rows": 1000, "revenue": 50000.0})
print("Short-term keys:", list(mem.short_term.keys()))
print("Long-term keys (compressed):", list(mem.long_term.keys())[:10])


Short-term keys: ['demo_chunk']
Long-term keys (compressed): ['summary:demo_chunk']


In [8]:
# Section 7 — Stateful Orchestrator (Planner → Tool Executor → Validator → Reflection Loop)
from agents import Orchestrator

orc = Orchestrator(data_path="data/ar 7.xlsx")
print("Planned steps:", [s['step'] for s in orc.planner.plan('Revenue + Churn')])
res = orc.run()
print("Run completed. Suggestions:", res['suggestions'])


Planned steps: ['load_data', 'assign_quarter', 'aggregate_quarter_region', 'client_quarterly', 'compute_churn', 'validate', 'reflect']
Run completed. Suggestions: [{'pair': '2024Q1->2024Q2', 'suggestion': 'Investigate top lost clients and run retention offers.'}, {'pair': '2024Q2->2024Q3', 'suggestion': 'Investigate top lost clients and run retention offers.'}, {'pair': '2024Q3->2024Q4', 'suggestion': 'Investigate top lost clients and run retention offers.'}]


In [9]:
# Section 8 — Planner agent: task decomposition & tool decision-making
plan = orc.planner.plan("Revenue + Churn analysis")
for step in plan:
    print(step)

# Planner emits ordered subtasks and can attach resource hints or idempotency keys (demo)
plan[0]['idempotency_key'] = 'load-data-v1'
plan[0]['resource_hint'] = {'memory_mb': 200}
plan[0]


{'step': 'load_data'}
{'step': 'assign_quarter'}
{'step': 'aggregate_quarter_region'}
{'step': 'client_quarterly'}
{'step': 'compute_churn'}
{'step': 'validate'}
{'step': 'reflect'}


{'step': 'load_data',
 'idempotency_key': 'load-data-v1',
 'resource_hint': {'memory_mb': 200}}

In [10]:
# Section 9 — Execution agent: tool-first executor (deterministic calculations)
# Demonstrate that all numeric calculations are performed with Pandas/Numpy
sample_agg = agg_q.sample(3)
print(sample_agg)

# Example: compute top-5 regions for a given quarter (deterministic)
q = agg_q['Quarter'].unique()[0]
top_regions = agg_q[agg_q['Quarter']==q].nlargest(5, 'TotalRevenue')
print(top_regions[['Region','Country','TotalRevenue']])


   Quarter Region   Country   TotalRevenue  NumRows
42  2024Q4   APAC     India  889964.538474       36
43  2024Q4   APAC  Malaysia       0.000000        3
38  2024Q3  LATAM    Mexico   29813.485096       12
   Region    Country  TotalRevenue
13    USA        USA  7.838531e+06
8    EMEA         UK  1.754655e+06
0    APAC      India  1.025424e+06
2    APAC  Singapore  4.759419e+05
6    EMEA     Sweden  4.031967e+05


In [11]:
# Section 10 — Validator agent: assertions, schema validation & retry logic
from agents.tools import Validator
ok, msg = Validator.totals_match(df_q, agg_q)
print('Validator.totals_match =>', ok, msg)

# Example: simulate mismatch and show retry decision (conceptual)
if not ok:
    print('Triggering retry for aggregation step...')


Validator.totals_match => True Totals match


In [12]:
# Section 11 — Reflection agent: critique, contradiction checks & autonomous repair
# Reflections were produced by the orchestrator and persisted in memory
print('Reflections:', res['reflections'])
print('Suggestions:', res['suggestions'])


Reflections: [{'pair': '2024Q1->2024Q2', 'net_change': -191660.75, 'note': 'significant_loss'}, {'pair': '2024Q2->2024Q3', 'net_change': -185108.63290973334, 'note': 'significant_loss'}, {'pair': '2024Q3->2024Q4', 'net_change': -245642.69294534653, 'note': 'significant_loss'}]
Suggestions: [{'pair': '2024Q1->2024Q2', 'suggestion': 'Investigate top lost clients and run retention offers.'}, {'pair': '2024Q2->2024Q3', 'suggestion': 'Investigate top lost clients and run retention offers.'}, {'pair': '2024Q3->2024Q4', 'suggestion': 'Investigate top lost clients and run retention offers.'}]


In [13]:
# Section 12 — Orchestration: state transitions, idempotency & retry/backoff
# show execution trace/log
for e in res['log'][:10]:
    print(e)

# idempotency keys and checkpointing are supported via step metadata (Planner may attach keys)


{'step': {'step': 'load_data'}, 'status': 'ok', 'attempts': 1}
{'step': {'step': 'assign_quarter'}, 'status': 'ok', 'attempts': 1}
{'step': {'step': 'aggregate_quarter_region'}, 'status': 'ok', 'attempts': 1}
{'step': {'step': 'client_quarterly'}, 'status': 'ok', 'attempts': 1}
{'step': {'step': 'compute_churn'}, 'status': 'ok', 'attempts': 1}
{'step': {'step': 'validate'}, 'status': 'ok', 'attempts': 1}
{'step': {'step': 'reflect'}, 'status': 'ok', 'attempts': 1}


In [14]:
# Section 13 — Observability: structured logging, metrics, trace events
import json
print('Memory long-term snapshot (keys):', list(mem.long_term.keys())[:10])

trace_event = {"run_id": "demo-1", "step": "aggregate_quarter_region", "duration_ms": 23}
print('Trace event (JSON):', json.dumps(trace_event))


Memory long-term snapshot (keys): ['summary:demo_chunk']
Trace event (JSON): {"run_id": "demo-1", "step": "aggregate_quarter_region", "duration_ms": 23}
