In [1]:
!pip install -q openai python-dotenv nest_asyncio

In [8]:
import os
import asyncio
import nest_asyncio
from typing import List, Dict, Any, Optional
from google.colab import userdata
import openai

In [3]:
nest_asyncio.apply()

In [5]:
# Configure OpenAI API
OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise ValueError("Please set OPENAI_API_KEY in your environment variables")

In [9]:
# Base Agent Class (adapted from your code)
class BaseAgent:
    """Base class for all agents using OpenAI's API."""

    def __init__(self, name: str, role: str, system_prompt: str, model: str = "gpt-3.5-turbo"):
        self.name = name
        self.role = role
        self.system_prompt = system_prompt
        self.model = model
        self.execution_time = 0
        self.token_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
        self.client = openai.OpenAI(api_key=OPENAI_API_KEY)

    async def _make_api_call(self, messages: list, temperature: float = 0.7) -> str:
        """Make an API call to OpenAI and track metrics."""
        start_time = asyncio.get_event_loop().time()

        try:
            response = await asyncio.get_event_loop().run_in_executor(
                None,
                lambda: self.client.chat.completions.create(
                    model=self.model,
                    messages=messages,
                    temperature=temperature
                )
            )

            self.execution_time = asyncio.get_event_loop().time() - start_time

            # Track token usage
            if hasattr(response, 'usage') and response.usage:
                self.token_usage = {
                    "prompt_tokens": response.usage.prompt_tokens,
                    "completion_tokens": response.usage.completion_tokens,
                    "total_tokens": response.usage.total_tokens
                }

            return response.choices[0].message.content

        except Exception as e:
            self.execution_time = asyncio.get_event_loop().time() - start_time
            raise Exception(f"API call failed for {self.name}: {str(e)}")

    async def process(self, input_data: Any, context: Optional[Dict] = None) -> str:
        """Process input using the system prompt."""
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": str(input_data)}
        ]

        if context:
            messages.insert(1, {"role": "system", "content": f"Context: {context}"})

        return await self._make_api_call(messages)

    def get_metrics(self) -> Dict:
        """Get execution metrics for this agent."""
        return {
            "name": self.name,
            "role": self.role,
            "execution_time": self.execution_time,
            "token_usage": self.token_usage
        }

In [10]:
# 1. Sequential Pattern
class SequentialOrchestrator:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents

    async def execute(self, initial_input: str) -> str:
        current_input = initial_input
        for agent in self.agents:
            print(f"🔹 {agent.name} is processing...")
            current_input = await agent.process(current_input)
            print(f"   Response length: {len(current_input)} characters")
        return current_input

# 2. Hierarchical Pattern
class HierarchicalOrchestrator:
    def __init__(self, manager: BaseAgent, specialists: Dict[str, BaseAgent]):
        self.manager = manager
        self.specialists = specialists

    async def execute(self, task: str) -> str:
        # Manager analyzes and delegates
        analysis = await self.manager.process(
            f"Analyze this task and determine which specialists are needed: {task}"
        )
        print(f"🔍 Analysis: {analysis[:200]}...")

        # Get required specialists
        response = await self.manager.process(
            f"Based on this analysis, which specialists should handle this task? {analysis}\n"
            f"Available specialists: {', '.join(self.specialists.keys())}\n"
            "Return just the role names, comma-separated."
        )

        # Execute tasks in parallel
        specialist_roles = [r.strip() for r in response.split(",") if r.strip() in self.specialists]
        print(f"🧑‍💻 Delegating to specialists: {', '.join(specialist_roles)}")

        tasks = []
        for role in specialist_roles:
            if role in self.specialists:
                tasks.append(self.specialists[role].process(task))

        # Gather and combine results
        results = await asyncio.gather(*tasks)
        combined = "\n\n".join(results)

        # Final integration
        return await self.manager.process(
            f"Combine these specialist outputs into a cohesive response: {combined}"
        )

# 3. Map-Reduce Pattern
class MapReduceOrchestrator:
    def __init__(self, mapper: BaseAgent, reducer: BaseAgent):
        self.mapper = mapper
        self.reducer = reducer

    async def execute(self, chunks: List[str]) -> str:
        # Map phase
        print(f"🗺️ Mapping {len(chunks)} chunks...")
        map_tasks = [self.mapper.process(chunk) for chunk in chunks]
        map_results = await asyncio.gather(*map_tasks)

        # Reduce phase
        print("🔍 Reducing results...")
        combined = "\n---\n".join(map_results)
        return await self.reducer.process(
            f"Combine these mapped results into a final output:\n{combined}"
        )

# 4. Producer-Reviewer Pattern
class ProducerReviewerOrchestrator:
    def __init__(self, producer: BaseAgent, reviewer: BaseAgent, max_iterations: int = 2):
        self.producer = producer
        self.reviewer = reviewer
        self.max_iterations = max_iterations

    async def execute(self, task: str) -> str:
        current_output = await self.producer.process(task)
        print("🔄 Initial output generated")

        for i in range(self.max_iterations):
            # Get feedback
            feedback = await self.reviewer.process(
                f"Review this output and provide specific improvement suggestions (be concise):\n{current_output}"
            )

            # Check if review is satisfactory
            is_approved = await self.reviewer.process(
                f"Answer only 'yes' or 'no': Is this output satisfactory?\n{current_output}"
            )

            if "yes" in is_approved.lower():
                print("✅ Review passed")
                return current_output

            print(f"🔄 Iteration {i+1}: Refining based on feedback...")
            # Refine based on feedback
            current_output = await self.producer.process(
                f"Improve this based on the feedback: {feedback}\n\nOriginal output:\n{current_output}"
            )

        print("⏹️ Max iterations reached")
        return current_output

# 5. Consensus Pattern
class ConsensusOrchestrator:
    def __init__(self, agents: List[BaseAgent], judge: BaseAgent):
        self.agents = agents
        self.judge = judge

    async def execute(self, task: str) -> str:
        print(f"🤝 Getting responses from {len(self.agents)} agents...")
        # Get responses from all agents
        tasks = [agent.process(task) for agent in self.agents]
        responses = await asyncio.gather(*tasks)

        # Print individual responses
        for i, (agent, response) in enumerate(zip(self.agents, responses)):
            print(f"\n👤 {agent.name} says:\n{response[:200]}...")

        # Get consensus
        print("\n🧑‍⚖️ Getting consensus...")
        return await self.judge.process(
            "Reach a consensus from these responses. "
            "Identify the best aspects of each and combine them into a single, improved response.\n\n" +
            "\n---\n".join(responses)
        )

# Example usage
async def run_examples():
    # Initialize agents
    writer = BaseAgent(
        "Writer",
        "Creative Writer",
        "You are a creative writer who generates engaging content."
    )

    editor = BaseAgent(
        "Editor",
        "Editor",
        "You are a meticulous editor who provides constructive feedback. Be concise and specific in your suggestions."
    )

    # Example 1: Sequential Pattern
    print("\n" + "="*50)
    print("=== Sequential Pattern ===")
    print("Writer will write a story, then Editor will refine it")
    sequential = SequentialOrchestrator([writer, editor])
    result = await sequential.execute("Write a short story about AI")
    print("\n📝 Final output:")
    print(result[:500] + "...")

    # Example 2: Hierarchical Pattern
    print("\n" + "="*50)
    print("=== Hierarchical Pattern ===")
    manager = BaseAgent(
        "Manager",
        "Project Manager",
        "You are a project manager who delegates tasks effectively."
    )
    hierarchical = HierarchicalOrchestrator(
        manager,
        {"writer": writer, "editor": editor}
    )
    result = await hierarchical.execute("Create a report on climate change impacts")
    print("\n📊 Final report:")
    print(result[:500] + "...")

    # Example 3: Map-Reduce Pattern
    print("\n" + "="*50)
    print("=== Map-Reduce Pattern ===")
    mapper = BaseAgent(
        "Mapper",
        "Text Analyzer",
        "You analyze text and extract key points."
    )
    reducer = BaseAgent(
        "Reducer",
        "Summary Generator",
        "You combine multiple analyses into a coherent summary."
    )
    map_reduce = MapReduceOrchestrator(mapper, reducer)
    chunks = [
        "Artificial intelligence is transforming industries by automating tasks and providing insights from large datasets.",
        "Machine learning enables computers to learn from data without being explicitly programmed for specific tasks.",
        "Natural language processing helps computers understand, interpret, and generate human language effectively."
    ]
    result = await map_reduce.execute(chunks)
    print("\n📋 Combined summary:")
    print(result)

    # Example 4: Producer-Reviewer Pattern
    print("\n" + "="*50)
    print("=== Producer-Reviewer Pattern ===")
    producer_reviewer = ProducerReviewerOrchestrator(writer, editor)
    result = await producer_reviewer.execute("Write a blog post about renewable energy")
    print("\n✍️ Final blog post:")
    print(result[:500] + "...")

    # Example 5: Consensus Pattern
    print("\n" + "="*50)
    print("=== Consensus Pattern ===")
    expert1 = BaseAgent(
        "Expert1",
        "AI Ethics Expert",
        "You are an expert in AI ethics and responsible AI development."
    )
    expert2 = BaseAgent(
        "Expert2",
        "Technology Futurist",
        "You analyze technology trends and their future implications."
    )
    judge = BaseAgent(
        "Judge",
        "Consensus Builder",
        "You find common ground between different perspectives and synthesize them into a coherent view."
    )
    consensus = ConsensusOrchestrator([expert1, expert2], judge)
    result = await consensus.execute("What are the ethical implications of AI in healthcare?")
    print("\n🤝 Consensus:")
    print(result)

# Run the examples
if __name__ == "__main__":
    import asyncio
    asyncio.run(run_examples())


=== Sequential Pattern ===
Writer will write a story, then Editor will refine it
🔹 Writer is processing...
   Response length: 1938 characters
🔹 Editor is processing...
   Response length: 462 characters

📝 Final output:
This story beautifully explores the theme of connection between humans and AI. To enhance it further, consider delving deeper into the ethical implications of AI companionship, such as boundaries and privacy concerns. Additionally, adding more dialogue between Aurora and Lily could bring their relationship to life and showcase their emotional growth. Lastly, consider incorporating moments of conflict or challenges to add more depth to their journey together....

=== Hierarchical Pattern ===
🔍 Analysis: To create a report on climate change impacts, you will need specialists with expertise in climate science, environmental science, and data analysis. These specialists can provide insights on the curre...
🧑‍💻 Delegating to specialists: writer, editor

📊 Final report:
**T