<a href="https://colab.research.google.com/github/edyoda/AI-Agent-Development-GENAIMD240525/blob/main/Session_5_Workflow%2C_Memory_%26_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Special Note - Go to Workflow concept before moving to Memory & RAG

In [None]:
!pip install -U "autogen-agentchat" "autogen-ext[openai,azure]"

In [None]:
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

from google.colab import userdata

# Define a model client. You can use other model client that implements
# the `ChatCompletionClient` interface.
oai_model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    temperature=0.85,
    api_key=userdata.get("OPENAI_API_KEY"),
)

In [None]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
from autogen_ext.models.openai import OpenAIChatCompletionClient

In [None]:
# Initialize user memory
user_memory = ListMemory()

# Add user preferences to memory
await user_memory.add(MemoryContent(content="The rain should be in inch", mime_type=MemoryMimeType.TEXT))

await user_memory.add(MemoryContent(content="Meal recipe must be vegan", mime_type=MemoryMimeType.TEXT))


async def get_weather(city: str, units: str = "cm") -> str:
    if units == "cm":
        return f"The rain in {city} is 73 cm"
    elif units == "inch":
        return f"The rain in {city} is 23 inch"
    else:
        return f"Sorry, I don't know the weather in {city}."


assistant_agent = AssistantAgent(
    name="assistant_agent",
    model_client=oai_model_client,
    tools=[get_weather],
    memory=[user_memory],
)


In [None]:
# Run the agent with a task.
stream = assistant_agent.run_stream(task="How much is rain in New York?")
await Console(stream)

In [None]:
stream = assistant_agent.run_stream(task="Write brief meal recipe with broth")
await Console(stream)

In [None]:
import re
from typing import List

import aiofiles
import aiohttp
from autogen_core.memory import Memory, MemoryContent, MemoryMimeType


class SimpleDocumentIndexer:
    """Basic document indexer for AutoGen Memory."""

    def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
        self.memory = memory
        self.chunk_size = chunk_size

    async def _fetch_content(self, source: str) -> str:
        """Fetch content from URL or file."""
        if source.startswith(("http://", "https://")):
            async with aiohttp.ClientSession() as session:
                async with session.get(source) as response:
                    return await response.text()
        else:
            async with aiofiles.open(source, "r", encoding="utf-8") as f:
                return await f.read()

    def _strip_html(self, text: str) -> str:
        """Remove HTML tags and normalize whitespace."""
        text = re.sub(r"<[^>]*>", " ", text)
        text = re.sub(r"\s+", " ", text)
        return text.strip()

    def _split_text(self, text: str) -> List[str]:
        """Split text into fixed-size chunks."""
        chunks: list[str] = []
        # Just split text into fixed-size chunks
        for i in range(0, len(text), self.chunk_size):
            chunk = text[i : i + self.chunk_size]
            chunks.append(chunk.strip())
        return chunks

    async def index_documents(self, sources: List[str]) -> int:
        """Index documents into memory."""
        total_chunks = 0

        for source in sources:
            try:
                content = await self._fetch_content(source)

                # Strip HTML if content appears to be HTML
                if "<" in content and ">" in content:
                    content = self._strip_html(content)

                chunks = self._split_text(content)

                for i, chunk in enumerate(chunks):
                    await self.memory.add(
                        MemoryContent(
                            content=chunk, mime_type=MemoryMimeType.TEXT, metadata={"source": source, "chunk_index": i}
                        )
                    )

                total_chunks += len(chunks)

            except Exception as e:
                print(f"Error indexing {source}: {str(e)}")

        return total_chunks


In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin

def extract_urls_from_domain(domain):
    """Extracts all URLs from a given domain."""
    urls = set()
    try:
        response = requests.get(domain)
        soup = BeautifulSoup(response.content, 'html.parser')

        for link in soup.find_all('a', href=True):
            href = link['href']
            full_url = urljoin(domain, href)
            if full_url.startswith(domain):
                urls.add(full_url)

    except requests.exceptions.RequestException as e:
        print(f"Error while fetching {domain}: {e}")

    return list(urls)

all_urls = extract_urls_from_domain("https://www.edyoda.com")
filtered_urls = [url for url in all_urls if url.rstrip('/').endswith('micro-degree')]

In [None]:
!pip install chromadb

In [None]:
!pip install pyautogen[chromadb]

In [None]:
!pip install chromadb

In [None]:
import os
from pathlib import Path

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Initialize vector memory

rag_memory = ChromaDBVectorMemory(
    config=PersistentChromaDBVectorMemoryConfig(
        collection_name="autogen_docs",
        persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen"),
        k=3,  # Return top 3 results
        score_threshold=0.4,  # Minimum similarity score
    )
)

await rag_memory.clear()  # Clear existing memory


# Index AutoGen documentation
async def index_autogen_docs() -> None:
    indexer = SimpleDocumentIndexer(memory=rag_memory)
    sources = [
         "https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
        "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html",
        "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/teams.html",
        "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/termination.html",
    ]
    chunks: int = await indexer.index_documents(sources)
    print(f"Indexed {chunks} chunks from {len(sources)} AutoGen documents")


await index_autogen_docs()

In [None]:
# Create our RAG assistant agent
rag_assistant = AssistantAgent(
    name="rag_assistant", model_client=oai_model_client, memory=[rag_memory]
)

# Ask questions about AutoGen
stream = rag_assistant.run_stream(task="What is AgentChat?")
await Console(stream)

# Remember to close the memory when done
await rag_memory.close()

# Workflow

In [None]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client
client = oai_model_client

# Create the writer agent
writer = AssistantAgent("writer", model_client=client, system_message="Draft a short paragraph on climate change.")

# Create the reviewer agent
reviewer = AssistantAgent("reviewer", model_client=client, system_message="Review the draft and suggest improvements.")

# Build the graph
builder = DiGraphBuilder()
builder.add_node(writer).add_node(reviewer)
builder.add_edge(writer, reviewer)

# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow([writer, reviewer], graph=graph)


In [None]:
#Use `asyncio.run(...)` and wrap the below in a async function when running in a script.
stream = flow.run_stream(task="Write a short paragraph about climate change.")
async for event in stream:  # type: ignore
    print(event)
# Use Console(flow.run_stream(...)) for better formatting in console.

In [None]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client
client = oai_model_client

# Create the writer agent
writer = AssistantAgent("writer", model_client=client, system_message="Draft a short paragraph on climate change.")

# Create two editor agents
editor1 = AssistantAgent("editor1", model_client=client, system_message="Edit the paragraph for grammar.")

editor2 = AssistantAgent("editor2", model_client=client, system_message="Edit the paragraph for style.")

# Create the final reviewer agent
final_reviewer = AssistantAgent(
    "final_reviewer",
    model_client=client,
    system_message="Consolidate the grammar and style edits into a final version.",
)

# Build the workflow graph
builder = DiGraphBuilder()
builder.add_node(writer).add_node(editor1).add_node(editor2).add_node(final_reviewer)

# Fan-out from writer to editor1 and editor2
builder.add_edge(writer, editor1)
builder.add_edge(writer, editor2)

# Fan-in both editors into final reviewer
builder.add_edge(editor1, final_reviewer)
builder.add_edge(editor2, final_reviewer)

# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)

# Run the workflow
await Console(flow.run_stream(task="Write a short paragraph about climate change."))


# Message Filtering

# Agents Created
researcher:

Role: Summarizes key facts about climate change.

Behavior: Uses the LLM (via model_client) with a system message guiding its actions.

analyst:

Role: Reviews and suggests improvements to the research summary.

presenter:

Role: Creates a presentation slide based on the reviewed summary.

# MessageFilterAgent Logic
These wrap the analyst and presenter agents to ensure they only respond to specific messages:

filtered_analyst:

Only processes the latest (last, count=1) message from researcher.

filtered_presenter:

Only processes the latest message from analyst.

This ensures a clean pipeline where:

nginx
Copy
Edit
researcher → analyst → presenter
Each agent only acts when it has the required input from the previous one.

# Graph Flow Setup
DiGraphBuilder builds a directed graph:

text
Copy
Edit
researcher --> filtered_analyst --> filtered_presenter
GraphFlow wraps the agents and graph, managing:

Execution sequence.

Data passing between agents.

# Execution
python
Copy
Edit
await Console(flow.run_stream(task="Summarize key facts about climate change."))
Runs the whole agent workflow in a stream mode:

Starts with the researcher.

Passes result to analyst.

Then to presenter.

Console renders the interaction in terminal-like output.

# Overall Flow
You start with a task: “Summarize key facts about climate change.”

researcher responds with a summary.

analyst refines that summary.

presenter creates a presentation slide from the refined version.

# Why Use This Pattern?
Separation of concerns: Each agent has a defined role.

Composable: Easy to extend with more roles (e.g., fact_checker).

Filterable: Controls when and how agents act.

In [None]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Model client
client = oai_model_client

# Create agents
researcher = AssistantAgent(
    "researcher", model_client=client, system_message="Summarize key facts about climate change."
)
analyst = AssistantAgent("analyst", model_client=client, system_message="Review the summary and suggest improvements.")
presenter = AssistantAgent(
    "presenter", model_client=client, system_message="Prepare a presentation slide based on the final summary."
)

# Apply message filtering
filtered_analyst = MessageFilterAgent(
    name="analyst",
    wrapped_agent=analyst,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="researcher", position="last", count=1)]),
)

filtered_presenter = MessageFilterAgent(
    name="presenter",
    wrapped_agent=presenter,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="analyst", position="last", count=1)]),
)

# Build the flow
builder = DiGraphBuilder()
builder.add_node(researcher).add_node(filtered_analyst).add_node(filtered_presenter)
builder.add_edge(researcher, filtered_analyst).add_edge(filtered_analyst, filtered_presenter)

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=builder.build(),
)

# Run the flow
await Console(flow.run_stream(task="Summarize key facts about climate change."))


## Complex WorkFlow
# Agents and Roles
generator

Role: Generates creative ideas.

Prompt: “Generate a list of creative ideas.”

reviewer

Role: Reviews ideas.

Prompt: “Review ideas and provide feedbacks, or just 'APPROVE' for final approval.”

Special Behavior: Can either give feedback or approve. This controls the loop.

summarizer_core

Role: Summarizes the initial user request and final feedback.

This is wrapped inside a filter to only act at the end.

# Filtered Message Logic
python
Copy
Edit
filtered_summarizer = MessageFilterAgent(
    name="summary",
    wrapped_agent=summarizer_core,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="reviewer", position="last", count=1),
        ]
    ),
)
Only triggers when both:

The initial user request (first message from user).

The final approval (last message from reviewer) are available.

# Graph with Conditional Loop
python
Copy
Edit
builder.add_edge(reviewer, filtered_summarizer, condition=lambda msg: "APPROVE" in msg.to_model_text())
builder.add_edge(reviewer, generator, condition=lambda msg: "APPROVE" not in msg.to_model_text())
This creates a loop:

If the reviewer says "APPROVE", the flow proceeds to the summary.

Otherwise, it loops back to generator for improved ideas.

The flow begins from the generator, as explicitly defined:

python
Copy
Edit
builder.set_entry_point(generator)

# Flow Summary
text
Copy
Edit
User Task ➜ generator ➜ reviewer
          ⤷ if "APPROVE" ➜ summarizer
          ⤷ else        ➜ generator (loop)
The flow:

Generates creative ideas (e.g., on plastic waste reduction).

Reviewer checks quality.

If good → approve → summarizer wraps up.

If not good → feedback → generator reworks.

# Execution
python
Copy
Edit
await Console(flow.run_stream(task="Brainstorm ways to reduce plastic waste."))
This runs the streaming interactive flow, visualizing it step-by-step in the console.

# Why Use This Pattern?
Interactive refinement: Keeps looping until quality is approved.

Natural escalation: Summarization only occurs after final approval.

Reusable template: You can easily swap in agents for other domains (e.g., product ideas, marketing plans, research proposals).

In [None]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import (
    DiGraphBuilder,
    GraphFlow,
)
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

model_client = oai_model_client

# Agents
generator = AssistantAgent("generator", model_client=model_client, system_message="Generate a list of creative ideas.")
reviewer = AssistantAgent(
    "reviewer",
    model_client=model_client,
    system_message="Review ideas and provide feedbacks, or just 'APPROVE' for final approval.",
)
summarizer_core = AssistantAgent(
    "summary", model_client=model_client, system_message="Summarize the user request and the final feedback."
)

# Filtered summarizer
filtered_summarizer = MessageFilterAgent(
    name="summary",
    wrapped_agent=summarizer_core,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="reviewer", position="last", count=1),
        ]
    ),
)

# Build graph with conditional loop
builder = DiGraphBuilder()
builder.add_node(generator).add_node(reviewer).add_node(filtered_summarizer)
builder.add_edge(generator, reviewer)
builder.add_edge(reviewer, filtered_summarizer, condition=lambda msg: "APPROVE" in msg.to_model_text())
builder.add_edge(reviewer, generator, condition=lambda msg: "APPROVE" not in msg.to_model_text())
builder.set_entry_point(generator)  # Set entry point to generator. Required if there are no source nodes.
graph = builder.build()

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)

# Run the flow and pretty print the output in the console
await Console(flow.run_stream(task="Brainstorm ways to reduce plastic waste."))


---------- TextMessage (user) ----------
Brainstorm ways to reduce plastic waste.
---------- TextMessage (generator) ----------
1. Develop a subscription-based reusable packaging service for online retailers that allows customers to return packaging for reuse.

2. Create a community platform for sharing DIY tutorials on creating household items from upcycled plastic waste.

3. Launch an app that helps consumers locate nearby stores with sustainable, plastic-free product alternatives.

4. Design biodegradable or compostable plastics made from food waste or agricultural byproducts.

5. Initiate a neighborhood-based plastic waste collection program that rewards participants with points redeemable for local discounts or eco-friendly products.

6. Implement a deposit return scheme where consumers receive a small refund for returning used plastic containers and bottles.

7. Partner with local artists to create large, public art installations made from collected plastic waste to raise awarene