Session 3: Feedback Loops & Multi-Agent Systems.
In Session 2, you focused on LangroidAgent and CrewAgentWrapper.
In Session 3, we use CrewAI’s orchestration engine + Agno’s fault-tolerant pipeline to coordinate multi-agent collaboration.

In [None]:
%pip install agno
# Introduction to agno
## reference https://www.agno.com/, https://github.com/agno-agi/agno

In [None]:
# Cell 1 — Imports

import os, time, asyncio, json, logging
from typing import List, Dict, Any

from crewai import Agent as CrewAgent, Crew, Task as CrewTask
from agno import Pipeline, RetryPolicy, Task as AgnoTask, PipelineLogger

from src.agents.orchestration import crew_agents, Orchestrator
from src.utils.logging_utils import setup_logging

# Initialize structured logger
log = setup_logging("session3.log", level=logging.INFO, logger_name="Session3")
log.info("Multi-agent session 3 starting...")


In [None]:
# Cell 2 — Initialize CrewAI multi-agent system

# Initialize orchestrator with pre-defined crew_agents
orchestrator = Orchestrator(crew_agents=crew_agents)

# Inspect agents
for role, agent in crew_agents.items():
    print(f"{role}: name={agent.name}, role={agent.role}")

# 📁 Edit in:
# src/agents/agent_setup.py
# TODO1 (Session 3) :  
# - Initialize crew_agents (retrieval, summarizer, writer)
# - Initialize Orchestrator with crew_agents
# - Return orchestrator object


In [None]:
# Cell 3 — Define workflow pipeline (retrieval -> summarizer -> writer)

workflow_definition = [
    {"step": "retrieval", "desc": "Retrieve related documents"},
    {"step": "summarizer", "desc": "Summarize retrieved content"},
    {"step": "writer", "desc": "Write summary report to file"}
]

print(json.dumps(workflow_definition, indent=2))


In [None]:
# Cell 4 — Create feedback/retry mechanism

retry_policy = RetryPolicy(max_attempts=2, backoff_factor=1.5)

async def feedback_condition(task_result):
    """Check if a task failed and needs retry."""
    if not task_result or "failed" in str(task_result).lower():
        log.warning("Feedback: Detected failure -> triggering retry")
        return True
    return False

# 📁 Edit in:
# src/agents/fault_tolerance.py
# TODO2 (Session 3): 
# - Implement `retry_task_with_feedback()`
# - Wrap any agent.handle coroutine
# - Use RetryPolicy to retry on failure


In [None]:
# Cell 5 — Execute workflow with feedback

user_goal = "Summarize all available order and payment policies into a single report."

# Orchestrator will run the workflow with feedback & retries
results = await orchestrator.run_plan_with_crew(
    plan=workflow_definition
)

print(json.dumps(results, indent=2))

# 📁 Edit in:
# src/agents/orchestration.py
# TODO3 (Session 3): Lines ~80–140
# - Implement `run_plan_with_crew()`
# - Call corresponding CrewAgentWrapper for each role
# - Integrate retry_task_with_feedback() for fault tolerance
# - Aggregate results


In [None]:
# Cell 6 — Structured logging for monitoring

pipeline_logger = PipelineLogger(log_dir="./logs", json_format=True)

pipeline = Pipeline(
    tasks=[
        AgnoTask(name="Retrieval", func=lambda: print("retrieval")),
        AgnoTask(name="Summarizer", func=lambda: print("summarize")),
        AgnoTask(name="Writer", func=lambda: print("write"))
    ],
    retry_policy=retry_policy,
    logger=pipeline_logger,
)

await pipeline.run()


Output:

./logs/session3.log

session3_results.json

Multi-agent workflow reports for feedback vs no-feedback experiments