# Agentic AI Implementation: Campaign Execution & Committee Presentation

This comprehensive notebook demonstrates how to implement agentic AI solutions for the **Campaign Execution & Committee Presentation** value stream, applying techniques learned from the GAI-3101 Custom Agentic AI Solutions course.

## Business Context

**Process Overview**: Transform raw BigQuery campaign data into polished PowerPoint presentations for committee review.

**Current State**: 40 hours manual process
**Target State**: 16.5 hours with AI assistance (59% reduction)

### Value Stream Steps
1. **Input Validation** - Verify campaign data completeness
2. **Data Preparation** - Query and transform BigQuery data
3. **Monitoring Setup** - Configure Looker dashboards
4. **Analytical Review** - Generate insights and analysis
5. **Availability Confirmation** - Schedule and verify stakeholder availability
6. **PPT Assembly** - Build final presentation

---

## Course Techniques Applied

| Lab Module | Technique | Application |
|------------|-----------|-------------|
| Lab 1 | Simple Python Agent | Foundation agent architecture |
| Lab 2 | Round Robin Communication | Multi-agent coordination |
| Lab 3 | Reactive Agent | Real-time data monitoring |
| Lab 4 | Deliberative Agent | Planning & execution workflow |
| Lab 5 | Long-Term Memory | Campaign history & learnings |
| Lab 6 | Observation Tools | BigQuery/Looker integration |
| Lab 7 | Action Tools | File creation, email, PPT generation |
| Lab 8 | Hierarchical Planning | Task decomposition |
| Lab 9 | Rule-Based Reasoning | Validation & quality checks |
| Lab 10 | Robustness Evaluation | Testing & reliability |
| Lab 11 | Personal Assistant | End-to-end workflow |
| Lab 12 | Error Recovery | Resilient operations |

---

## Part 1: Environment Setup

Install required packages and configure API keys.

In [None]:
# Install required packages
%pip install -qU \
    openai==1.* \
    langchain==0.3.* \
    langchain-openai==0.3.* \
    langgraph==0.5.* \
    autogen-agentchat==0.6.* \
    autogen-ext[openai]==0.6.* \
    python-pptx \
    pandas \
    google-cloud-bigquery

In [None]:
import os
import getpass
import json
import asyncio
from datetime import datetime
from typing import TypedDict, Annotated, List, Dict, Any, Optional
from abc import ABC, abstractmethod
from enum import Enum

# Configure OpenAI API Key
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

print("‚úÖ Environment configured successfully")

---

## Part 2: Foundation - Simple Python Agent (Lab 1)

Building the foundational agent architecture following the course pattern.

In [None]:
from openai import OpenAI

# Initialize OpenAI client
client = OpenAI()

class Agent(ABC):
    """
    Base Agent class following Lab 1 patterns.
    All campaign agents inherit from this foundation.
    """
    
    def __init__(self, name: str, model: str = "gpt-4.1-mini"):
        self.name = name
        self.model = model
        self.system_prompt = ""
        self.memory = []  # Short-term memory
        
    def reset(self):
        """Reset agent state between runs."""
        self.memory = []
    
    @abstractmethod
    def _select_action(self, sensors: dict) -> str:
        """Select an action based on sensor inputs."""
        pass
    
    def act(self, sensors: dict) -> str:
        """Execute the agent's action cycle."""
        action = self._select_action(sensors)
        self.memory.append({"sensors": sensors, "action": action})
        return action
    
    def _call_llm(self, user_message: str) -> str:
        """Call the LLM with the system prompt and user message."""
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": user_message}
            ]
        )
        return response.choices[0].message.content

print("‚úÖ Base Agent class defined")

### Campaign-Specific Agent Types

Define specialized agents for each value stream step.

In [None]:
class ValidationAgent(Agent):
    """
    Step 1: Input Validation Agent
    Verifies campaign data completeness and quality.
    """
    
    def __init__(self):
        super().__init__("ValidationAgent")
        self.system_prompt = """
You are a Data Validation Agent responsible for verifying campaign data quality.

Your responsibilities:
- Check data completeness (all required fields present)
- Verify data format correctness
- Identify anomalies or outliers
- Report validation status with specific issues

Required campaign fields:
- campaign_id, campaign_name, start_date, end_date
- budget, spend, impressions, clicks, conversions
- channel, target_audience, status

Output your findings in a structured format.
"""
    
    def _select_action(self, sensors: dict) -> str:
        campaign_data = sensors.get("campaign_data", {})
        prompt = f"""Validate the following campaign data:
        
{json.dumps(campaign_data, indent=2)}

Provide a validation report with:
1. VALID/INVALID status
2. Missing fields (if any)
3. Data quality issues (if any)
4. Recommendations for fixes
"""
        return self._call_llm(prompt)


class DataPreparationAgent(Agent):
    """
    Step 2: Data Preparation Agent
    Queries and transforms BigQuery data.
    """
    
    def __init__(self):
        super().__init__("DataPreparationAgent")
        self.system_prompt = """
You are a Data Preparation Agent responsible for querying and transforming campaign data.

Your responsibilities:
- Generate SQL queries for BigQuery
- Transform raw data into analysis-ready format
- Calculate key metrics (CTR, CPA, ROAS, etc.)
- Prepare data aggregations and summaries

Output SQL queries and transformation steps.
"""
    
    def _select_action(self, sensors: dict) -> str:
        requirements = sensors.get("requirements", {})
        prompt = f"""Generate data preparation steps for the following requirements:

{json.dumps(requirements, indent=2)}

Provide:
1. BigQuery SQL query
2. Data transformation steps
3. Calculated metrics formulas
4. Output schema
"""
        return self._call_llm(prompt)


class MonitoringAgent(Agent):
    """
    Step 3: Monitoring Setup Agent
    Configures Looker dashboards.
    """
    
    def __init__(self):
        super().__init__("MonitoringAgent")
        self.system_prompt = """
You are a Monitoring Setup Agent responsible for configuring campaign dashboards.

Your responsibilities:
- Design dashboard layouts
- Define key performance indicators (KPIs)
- Configure alert thresholds
- Set up automated reporting schedules

Output Looker configuration and dashboard specifications.
"""
    
    def _select_action(self, sensors: dict) -> str:
        campaign_metrics = sensors.get("metrics", {})
        prompt = f"""Configure monitoring dashboard for these metrics:

{json.dumps(campaign_metrics, indent=2)}

Provide:
1. Dashboard layout recommendation
2. KPI definitions with targets
3. Alert threshold configurations
4. Visualization specifications
"""
        return self._call_llm(prompt)


class AnalysisAgent(Agent):
    """
    Step 4: Analytical Review Agent
    Generates insights and analysis.
    """
    
    def __init__(self):
        super().__init__("AnalysisAgent")
        self.system_prompt = """
You are an Analysis Agent responsible for generating campaign insights.

Your responsibilities:
- Analyze campaign performance trends
- Identify success factors and areas for improvement
- Compare against benchmarks and historical data
- Generate actionable recommendations

Output structured insights and recommendations.
"""
    
    def _select_action(self, sensors: dict) -> str:
        performance_data = sensors.get("performance_data", {})
        prompt = f"""Analyze the following campaign performance data:

{json.dumps(performance_data, indent=2)}

Provide:
1. Executive Summary (2-3 sentences)
2. Key Performance Highlights
3. Areas of Concern
4. Recommendations (prioritized)
5. Next Steps
"""
        return self._call_llm(prompt)


class SchedulingAgent(Agent):
    """
    Step 5: Availability Confirmation Agent
    Schedules and verifies stakeholder availability.
    """
    
    def __init__(self):
        super().__init__("SchedulingAgent")
        self.system_prompt = """
You are a Scheduling Agent responsible for coordinating committee presentations.

Your responsibilities:
- Check stakeholder availability
- Propose meeting times
- Send calendar invitations
- Handle scheduling conflicts

Output scheduling recommendations and calendar actions.
"""
    
    def _select_action(self, sensors: dict) -> str:
        stakeholders = sensors.get("stakeholders", [])
        preferred_times = sensors.get("preferred_times", [])
        prompt = f"""Schedule a committee presentation meeting:

Stakeholders: {json.dumps(stakeholders)}
Preferred Times: {json.dumps(preferred_times)}

Provide:
1. Recommended meeting time(s)
2. Required attendees
3. Meeting agenda draft
4. Calendar invitation content
"""
        return self._call_llm(prompt)


class PresentationAgent(Agent):
    """
    Step 6: PPT Assembly Agent
    Builds final presentation.
    """
    
    def __init__(self):
        super().__init__("PresentationAgent")
        self.system_prompt = """
You are a Presentation Assembly Agent responsible for creating committee presentations.

Your responsibilities:
- Structure presentation content
- Design slide layouts
- Incorporate data visualizations
- Ensure brand consistency

Output detailed slide-by-slide content.
"""
    
    def _select_action(self, sensors: dict) -> str:
        analysis = sensors.get("analysis", "")
        campaign_data = sensors.get("campaign_data", {})
        prompt = f"""Create a committee presentation based on:

Campaign Data:
{json.dumps(campaign_data, indent=2)}

Analysis:
{analysis}

Provide detailed content for each slide:
1. Title Slide
2. Executive Summary
3. Campaign Overview
4. Performance Metrics
5. Key Insights
6. Recommendations
7. Next Steps
8. Appendix (if needed)
"""
        return self._call_llm(prompt)

print("‚úÖ Campaign-specific agents defined")

### Test Simple Agent Interaction

In [None]:
# Sample campaign data for testing
sample_campaign = {
    "campaign_id": "CAMP-2024-001",
    "campaign_name": "Q4 Holiday Campaign",
    "start_date": "2024-11-01",
    "end_date": "2024-12-31",
    "budget": 50000,
    "spend": 45000,
    "impressions": 2500000,
    "clicks": 75000,
    "conversions": 1500,
    "channel": "Digital Display",
    "target_audience": "Adults 25-54",
    "status": "Active"
}

# Test Validation Agent
validation_agent = ValidationAgent()
validation_result = validation_agent.act({"campaign_data": sample_campaign})

print("=" * 60)
print("VALIDATION AGENT RESULT")
print("=" * 60)
print(validation_result)

---

## Part 3: Multi-Agent Communication (Lab 2 - AutoGen)

Implementing round-robin communication between agents using AutoGen framework.

In [None]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Initialize model client
model_client = OpenAIChatCompletionClient(
    model="gpt-4.1-mini",
    api_key=os.environ["OPENAI_API_KEY"]
)

# Define AutoGen agents for campaign workflow
validator_autogen = AssistantAgent(
    name="DataValidator",
    model_client=model_client,
    system_message="""You are a Data Validator. Your role is to:
    1. Check campaign data for completeness
    2. Verify data quality and format
    3. Report any issues found
    
    When data is valid, say 'DATA_VALIDATED' to pass to the next agent.
    """
)

analyst_autogen = AssistantAgent(
    name="DataAnalyst",
    model_client=model_client,
    system_message="""You are a Data Analyst. Your role is to:
    1. Analyze campaign performance metrics
    2. Calculate KPIs (CTR, CPA, ROAS)
    3. Generate insights and trends
    
    When analysis is complete, say 'ANALYSIS_COMPLETE' to pass to the next agent.
    """
)

presenter_autogen = AssistantAgent(
    name="PresentationCreator",
    model_client=model_client,
    system_message="""You are a Presentation Creator. Your role is to:
    1. Structure presentation content from analysis
    2. Create slide outlines
    3. Draft talking points
    
    When presentation is ready, say 'TERMINATE' to complete the workflow.
    """
)

print("‚úÖ AutoGen agents configured")

In [None]:
# Create Round-Robin Group Chat
termination_condition = TextMentionTermination("TERMINATE")

campaign_team = RoundRobinGroupChat(
    participants=[validator_autogen, analyst_autogen, presenter_autogen],
    termination_condition=termination_condition,
    max_turns=10
)

print("‚úÖ Round-Robin team configured")

In [None]:
# Run the multi-agent workflow
async def run_campaign_workflow():
    task = f"""Process this campaign data and create a committee presentation:
    
    {json.dumps(sample_campaign, indent=2)}
    
    Workflow:
    1. DataValidator: Validate the data
    2. DataAnalyst: Analyze performance
    3. PresentationCreator: Create presentation outline
    """
    
    result = await campaign_team.run(task=task)
    return result

# Execute workflow
result = await run_campaign_workflow()

print("\n" + "=" * 60)
print("MULTI-AGENT WORKFLOW RESULT")
print("=" * 60)
for message in result.messages:
    print(f"\n[{message.source}]:")
    print(message.content[:500] + "..." if len(message.content) > 500 else message.content)

---

## Part 4: Deliberative Agent with LangGraph (Lab 4)

Implementing a planning-based workflow using LangGraph's StateGraph.

In [None]:
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

# Initialize LangChain LLM
llm = ChatOpenAI(model="gpt-4.1-mini")

# Define State Schema
class CampaignWorkflowState(TypedDict):
    """State for the campaign workflow."""
    campaign_data: dict           # Raw campaign data
    plan: str                     # Execution plan
    current_task: int             # Current task number
    task_count: int               # Total number of tasks
    validation_result: str        # Validation outcome
    prepared_data: dict           # Transformed data
    analysis: str                 # Analysis results
    presentation: str             # Presentation content
    final_result: str             # Final output

print("‚úÖ LangGraph state schema defined")

In [None]:
# Define Pydantic model for structured planning output
class CampaignPlan(BaseModel):
    """Structured plan for campaign workflow."""
    plan: str = Field(description="Detailed step-by-step plan")
    task_count: int = Field(description="Number of tasks in the plan")

# Planning prompt
planning_prompt = ChatPromptTemplate.from_messages([
    ("system", """
You are a Campaign Workflow Planner.

Purpose and Goals:
* Break down campaign processing into smaller, manageable tasks
* Develop a plan of action with clear steps
* Prioritize tasks based on dependencies
* Each task must be completable by an AI Agent

Standard workflow steps:
1. Validate input data
2. Prepare and transform data
3. Analyze performance metrics
4. Generate insights
5. Create presentation content
6. Finalize deliverables

Template:
1. **Task Name**
   * Purpose: [Purpose of Task]
   * Goal: [Goal of Task]
   * Success Criteria: [Criteria for Task]

Output only the plan.
"""),
    ("user", "Create a plan for processing this campaign: {campaign_data}")
])

# Create planning chain with structured output
planning_chain = planning_prompt | llm.with_structured_output(CampaignPlan)

print("‚úÖ Planning chain configured")

In [None]:
# Define workflow nodes

def planner_node(state: CampaignWorkflowState) -> dict:
    """Planning node - creates execution plan."""
    print("\n=== PLANNER NODE ===")
    campaign_data = state["campaign_data"]
    
    result = planning_chain.invoke({"campaign_data": json.dumps(campaign_data)})
    
    print(f"Plan created with {result.task_count} tasks")
    
    return {
        "plan": result.plan,
        "task_count": result.task_count,
        "current_task": 1
    }


def validation_node(state: CampaignWorkflowState) -> dict:
    """Validation node - validates input data."""
    print("\n=== VALIDATION NODE ===")
    
    validation_agent = ValidationAgent()
    result = validation_agent.act({"campaign_data": state["campaign_data"]})
    
    print(f"Validation complete")
    
    return {
        "validation_result": result,
        "current_task": state["current_task"] + 1
    }


def preparation_node(state: CampaignWorkflowState) -> dict:
    """Data preparation node - transforms data."""
    print("\n=== PREPARATION NODE ===")
    
    prep_agent = DataPreparationAgent()
    result = prep_agent.act({"requirements": state["campaign_data"]})
    
    # Calculate derived metrics
    campaign = state["campaign_data"]
    prepared = {
        **campaign,
        "ctr": round((campaign["clicks"] / campaign["impressions"]) * 100, 2),
        "cpa": round(campaign["spend"] / campaign["conversions"], 2),
        "conversion_rate": round((campaign["conversions"] / campaign["clicks"]) * 100, 2),
        "budget_utilization": round((campaign["spend"] / campaign["budget"]) * 100, 2)
    }
    
    print(f"Data prepared with calculated metrics")
    
    return {
        "prepared_data": prepared,
        "current_task": state["current_task"] + 1
    }


def analysis_node(state: CampaignWorkflowState) -> dict:
    """Analysis node - generates insights."""
    print("\n=== ANALYSIS NODE ===")
    
    analysis_agent = AnalysisAgent()
    result = analysis_agent.act({"performance_data": state["prepared_data"]})
    
    print(f"Analysis complete")
    
    return {
        "analysis": result,
        "current_task": state["current_task"] + 1
    }


def presentation_node(state: CampaignWorkflowState) -> dict:
    """Presentation node - creates presentation content."""
    print("\n=== PRESENTATION NODE ===")
    
    pres_agent = PresentationAgent()
    result = pres_agent.act({
        "analysis": state["analysis"],
        "campaign_data": state["prepared_data"]
    })
    
    print(f"Presentation content created")
    
    return {
        "presentation": result,
        "current_task": state["current_task"] + 1
    }


def finalizer_node(state: CampaignWorkflowState) -> dict:
    """Finalizer node - compiles final output."""
    print("\n=== FINALIZER NODE ===")
    
    final_result = f"""
=== CAMPAIGN PROCESSING COMPLETE ===

Campaign: {state['campaign_data']['campaign_name']}
Campaign ID: {state['campaign_data']['campaign_id']}

--- Validation ---
{state['validation_result'][:500]}...

--- Key Metrics ---
CTR: {state['prepared_data'].get('ctr', 'N/A')}%
CPA: ${state['prepared_data'].get('cpa', 'N/A')}
Conversion Rate: {state['prepared_data'].get('conversion_rate', 'N/A')}%
Budget Utilization: {state['prepared_data'].get('budget_utilization', 'N/A')}%

--- Analysis Summary ---
{state['analysis'][:500]}...

--- Presentation Created ---
Presentation content ready for review.

Tasks Completed: {state['current_task']}/{state['task_count']}
"""
    
    print("Workflow finalized")
    
    return {"final_result": final_result}

print("‚úÖ Workflow nodes defined")

In [None]:
# Build the workflow graph
workflow = StateGraph(CampaignWorkflowState)

# Add nodes
workflow.add_node("planner", planner_node)
workflow.add_node("validator", validation_node)
workflow.add_node("preparer", preparation_node)
workflow.add_node("analyzer", analysis_node)
workflow.add_node("presenter", presentation_node)
workflow.add_node("finalizer", finalizer_node)

# Define edges
workflow.set_entry_point("planner")
workflow.add_edge("planner", "validator")
workflow.add_edge("validator", "preparer")
workflow.add_edge("preparer", "analyzer")
workflow.add_edge("analyzer", "presenter")
workflow.add_edge("presenter", "finalizer")
workflow.add_edge("finalizer", END)

# Compile the workflow
campaign_app = workflow.compile()

print("‚úÖ Campaign workflow compiled")

In [None]:
# Visualize the workflow
from IPython.display import Image, display

try:
    display(Image(campaign_app.get_graph().draw_mermaid_png()))
except Exception as e:
    print(f"Graph visualization not available: {e}")
    print("\nWorkflow structure:")
    print("planner ‚Üí validator ‚Üí preparer ‚Üí analyzer ‚Üí presenter ‚Üí finalizer ‚Üí END")

In [None]:
# Run the deliberative workflow
initial_state = {
    "campaign_data": sample_campaign,
    "plan": "",
    "current_task": 0,
    "task_count": 0,
    "validation_result": "",
    "prepared_data": {},
    "analysis": "",
    "presentation": "",
    "final_result": ""
}

result = campaign_app.invoke(initial_state)

print("\n" + "=" * 60)
print("DELIBERATIVE WORKFLOW RESULT")
print("=" * 60)
print(result["final_result"])

---

## Part 5: Observation & Action Tools (Labs 6-7)

Implementing tools for data observation and action execution.

In [None]:
# Define Observation Tools

def query_bigquery(query: str) -> dict:
    """
    Observation Tool: Query BigQuery for campaign data.
    
    Args:
        query: SQL query string
        
    Returns:
        Query results as dictionary
    """
    # Simulated BigQuery response
    print(f"üìä Executing BigQuery: {query[:50]}...")
    
    # Return simulated data
    return {
        "rows": [
            sample_campaign  # Use our sample data
        ],
        "total_rows": 1,
        "query_time_ms": 234
    }


def check_looker_dashboard(dashboard_id: str) -> dict:
    """
    Observation Tool: Check Looker dashboard status.
    
    Args:
        dashboard_id: Looker dashboard identifier
        
    Returns:
        Dashboard status and metrics
    """
    print(f"üìà Checking Looker dashboard: {dashboard_id}")
    
    return {
        "dashboard_id": dashboard_id,
        "status": "active",
        "last_refresh": datetime.now().isoformat(),
        "tiles": 8,
        "filters_active": ["date_range", "campaign_id"]
    }


def get_stakeholder_calendars(stakeholder_ids: List[str]) -> dict:
    """
    Observation Tool: Check stakeholder calendar availability.
    
    Args:
        stakeholder_ids: List of stakeholder identifiers
        
    Returns:
        Availability information
    """
    print(f"üìÖ Checking calendars for: {stakeholder_ids}")
    
    return {
        "stakeholders": stakeholder_ids,
        "available_slots": [
            {"date": "2024-12-02", "time": "10:00 AM", "duration": "1 hour"},
            {"date": "2024-12-03", "time": "2:00 PM", "duration": "1 hour"},
            {"date": "2024-12-04", "time": "11:00 AM", "duration": "1 hour"}
        ],
        "conflicts": []
    }

print("‚úÖ Observation tools defined")

In [None]:
# Define Action Tools

def create_ppt_file(content: dict, filename: str) -> dict:
    """
    Action Tool: Create PowerPoint presentation file.
    
    Args:
        content: Presentation content dictionary
        filename: Output filename
        
    Returns:
        Creation status
    """
    print(f"üìÑ Creating PowerPoint: {filename}")
    
    # Simulated PPT creation
    return {
        "status": "created",
        "filename": filename,
        "slides": len(content.get("slides", [])),
        "path": f"/output/{filename}",
        "size_kb": 1250
    }


def send_email(to: List[str], subject: str, body: str, attachments: List[str] = None) -> dict:
    """
    Action Tool: Send email notification.
    
    Args:
        to: List of recipient email addresses
        subject: Email subject line
        body: Email body content
        attachments: Optional list of attachment paths
        
    Returns:
        Send status
    """
    print(f"üìß Sending email to: {to}")
    
    return {
        "status": "sent",
        "recipients": to,
        "subject": subject,
        "timestamp": datetime.now().isoformat(),
        "message_id": "MSG-12345"
    }


def schedule_meeting(attendees: List[str], datetime_str: str, title: str, agenda: str) -> dict:
    """
    Action Tool: Schedule calendar meeting.
    
    Args:
        attendees: List of attendee emails
        datetime_str: Meeting datetime
        title: Meeting title
        agenda: Meeting agenda
        
    Returns:
        Scheduling status
    """
    print(f"üìÜ Scheduling meeting: {title}")
    
    return {
        "status": "scheduled",
        "meeting_id": "MTG-67890",
        "title": title,
        "datetime": datetime_str,
        "attendees": attendees,
        "calendar_link": "https://calendar.example.com/mtg-67890"
    }


def upload_to_sharepoint(file_path: str, destination: str) -> dict:
    """
    Action Tool: Upload file to SharePoint.
    
    Args:
        file_path: Local file path
        destination: SharePoint destination folder
        
    Returns:
        Upload status
    """
    print(f"‚òÅÔ∏è Uploading to SharePoint: {destination}")
    
    return {
        "status": "uploaded",
        "source": file_path,
        "destination": destination,
        "sharepoint_url": f"https://sharepoint.example.com/{destination}",
        "timestamp": datetime.now().isoformat()
    }

print("‚úÖ Action tools defined")

In [None]:
# Test Tools
print("Testing Observation Tools:")
print("=" * 40)

# Test BigQuery
bq_result = query_bigquery("SELECT * FROM campaigns WHERE campaign_id = 'CAMP-2024-001'")
print(f"BigQuery Result: {bq_result['total_rows']} rows in {bq_result['query_time_ms']}ms")

# Test Looker
looker_result = check_looker_dashboard("DASH-001")
print(f"Looker Status: {looker_result['status']}, {looker_result['tiles']} tiles")

print("\nTesting Action Tools:")
print("=" * 40)

# Test PPT Creation
ppt_result = create_ppt_file({"slides": [{}, {}, {}]}, "Q4_Campaign_Report.pptx")
print(f"PPT Created: {ppt_result['filename']} ({ppt_result['slides']} slides)")

# Test Email
email_result = send_email(
    to=["stakeholder@example.com"],
    subject="Campaign Report Ready",
    body="Your Q4 campaign report is ready for review."
)
print(f"Email Sent: {email_result['status']}")

---

## Part 6: Rule-Based Reasoning (Lab 9)

Implementing validation and quality check rules.

In [None]:
class RuleBasedValidator:
    """
    Rule-based validation system for campaign data.
    Applies deterministic rules for quality assurance.
    """
    
    def __init__(self):
        self.rules = []
        self._define_rules()
    
    def _define_rules(self):
        """Define validation rules."""
        
        # Rule 1: Required fields check
        self.rules.append({
            "name": "required_fields",
            "description": "Check all required fields are present",
            "check": lambda data: self._check_required_fields(data)
        })
        
        # Rule 2: Budget validation
        self.rules.append({
            "name": "budget_validation",
            "description": "Spend should not exceed budget",
            "check": lambda data: data.get("spend", 0) <= data.get("budget", 0)
        })
        
        # Rule 3: Date validation
        self.rules.append({
            "name": "date_validation",
            "description": "End date should be after start date",
            "check": lambda data: data.get("end_date", "") > data.get("start_date", "")
        })
        
        # Rule 4: Conversion rate sanity check
        self.rules.append({
            "name": "conversion_sanity",
            "description": "Conversions should not exceed clicks",
            "check": lambda data: data.get("conversions", 0) <= data.get("clicks", 1)
        })
        
        # Rule 5: Click-through sanity check
        self.rules.append({
            "name": "click_sanity",
            "description": "Clicks should not exceed impressions",
            "check": lambda data: data.get("clicks", 0) <= data.get("impressions", 1)
        })
        
        # Rule 6: Positive values check
        self.rules.append({
            "name": "positive_values",
            "description": "Numeric values should be non-negative",
            "check": lambda data: all(
                data.get(field, 0) >= 0 
                for field in ["budget", "spend", "impressions", "clicks", "conversions"]
            )
        })
    
    def _check_required_fields(self, data: dict) -> bool:
        """Check all required fields are present."""
        required = [
            "campaign_id", "campaign_name", "start_date", "end_date",
            "budget", "spend", "impressions", "clicks", "conversions"
        ]
        return all(field in data and data[field] is not None for field in required)
    
    def validate(self, data: dict) -> dict:
        """Run all validation rules."""
        results = {
            "valid": True,
            "passed": [],
            "failed": []
        }
        
        for rule in self.rules:
            try:
                if rule["check"](data):
                    results["passed"].append(rule["name"])
                else:
                    results["failed"].append({
                        "rule": rule["name"],
                        "description": rule["description"]
                    })
                    results["valid"] = False
            except Exception as e:
                results["failed"].append({
                    "rule": rule["name"],
                    "description": f"Error: {str(e)}"
                })
                results["valid"] = False
        
        return results

print("‚úÖ Rule-based validator defined")

In [None]:
# Test Rule-Based Validation
validator = RuleBasedValidator()

# Test with valid data
print("Testing with VALID campaign data:")
print("=" * 40)
result = validator.validate(sample_campaign)
print(f"Valid: {result['valid']}")
print(f"Passed Rules: {result['passed']}")
print(f"Failed Rules: {result['failed']}")

# Test with invalid data
print("\nTesting with INVALID campaign data:")
print("=" * 40)
invalid_campaign = {
    "campaign_id": "CAMP-BAD",
    "campaign_name": "Bad Campaign",
    "start_date": "2024-12-31",
    "end_date": "2024-01-01",  # End before start
    "budget": 10000,
    "spend": 15000,  # Over budget
    "impressions": 1000,
    "clicks": 2000,  # More clicks than impressions
    "conversions": 50
}
result = validator.validate(invalid_campaign)
print(f"Valid: {result['valid']}")
print(f"Passed Rules: {result['passed']}")
print(f"Failed Rules:")
for failure in result['failed']:
    print(f"  - {failure['rule']}: {failure['description']}")

---

## Part 7: Hierarchical Planning (Lab 8)

Implementing task decomposition with dependencies.

In [None]:
from pydantic import BaseModel, Field

# Define Pydantic models for hierarchical planning
class TaskDependencies(BaseModel):
    """Model for task dependencies."""
    task1_depends_on: str = Field(description="What task 1 depends on, or 'None'")
    task2_depends_on: str = Field(description="What task 2 depends on, or 'None'")
    task3_depends_on: str = Field(description="What task 3 depends on, or 'None'")
    task4_depends_on: str = Field(description="What task 4 depends on, or 'None'")
    task5_depends_on: str = Field(description="What task 5 depends on, or 'None'")
    task6_depends_on: str = Field(description="What task 6 depends on, or 'None'")

class TaskAssignments(BaseModel):
    """Model for task assignments to agents."""
    task1_assignee: str = Field(description="Agent assigned to task 1")
    task2_assignee: str = Field(description="Agent assigned to task 2")
    task3_assignee: str = Field(description="Agent assigned to task 3")
    task4_assignee: str = Field(description="Agent assigned to task 4")
    task5_assignee: str = Field(description="Agent assigned to task 5")
    task6_assignee: str = Field(description="Agent assigned to task 6")

# Define hierarchical state
class HierarchicalState(TypedDict):
    goal: str
    tasks: List[str]
    dependencies: dict
    assignments: dict
    review: str

print("‚úÖ Hierarchical planning models defined")

In [None]:
# Campaign Workflow Tasks
CAMPAIGN_TASKS = [
    "Validate input campaign data",
    "Query and prepare data from BigQuery",
    "Configure Looker monitoring dashboards",
    "Perform analytical review and generate insights",
    "Confirm stakeholder availability and schedule",
    "Assemble final PowerPoint presentation"
]

# Task dependencies (what each task depends on)
TASK_DEPENDENCIES = {
    "Validate input campaign data": None,
    "Query and prepare data from BigQuery": "Validate input campaign data",
    "Configure Looker monitoring dashboards": "Query and prepare data from BigQuery",
    "Perform analytical review and generate insights": "Query and prepare data from BigQuery",
    "Confirm stakeholder availability and schedule": None,
    "Assemble final PowerPoint presentation": "Perform analytical review and generate insights"
}

# Agent assignments
AGENT_ASSIGNMENTS = {
    "Validate input campaign data": "ValidationAgent",
    "Query and prepare data from BigQuery": "DataPreparationAgent",
    "Configure Looker monitoring dashboards": "MonitoringAgent",
    "Perform analytical review and generate insights": "AnalysisAgent",
    "Confirm stakeholder availability and schedule": "SchedulingAgent",
    "Assemble final PowerPoint presentation": "PresentationAgent"
}

print("Campaign Workflow Hierarchy:")
print("=" * 50)
for i, task in enumerate(CAMPAIGN_TASKS, 1):
    dep = TASK_DEPENDENCIES.get(task, "None")
    agent = AGENT_ASSIGNMENTS.get(task, "Unknown")
    print(f"{i}. {task}")
    print(f"   Depends on: {dep or 'None'}")
    print(f"   Assigned to: {agent}")

In [None]:
class HierarchicalPlanner:
    """
    Hierarchical planner that decomposes tasks and manages dependencies.
    """
    
    def __init__(self, tasks: List[str], dependencies: dict, assignments: dict):
        self.tasks = tasks
        self.dependencies = dependencies
        self.assignments = assignments
        self.completed = set()
    
    def get_ready_tasks(self) -> List[str]:
        """Get tasks that are ready to execute (dependencies met)."""
        ready = []
        for task in self.tasks:
            if task not in self.completed:
                dep = self.dependencies.get(task)
                if dep is None or dep in self.completed:
                    ready.append(task)
        return ready
    
    def complete_task(self, task: str):
        """Mark a task as completed."""
        self.completed.add(task)
    
    def get_execution_order(self) -> List[str]:
        """Get the optimal execution order respecting dependencies."""
        order = []
        temp_completed = set()
        
        while len(order) < len(self.tasks):
            for task in self.tasks:
                if task not in temp_completed:
                    dep = self.dependencies.get(task)
                    if dep is None or dep in temp_completed:
                        order.append(task)
                        temp_completed.add(task)
        
        return order
    
    def get_plan_summary(self) -> str:
        """Generate a summary of the hierarchical plan."""
        order = self.get_execution_order()
        summary = "Hierarchical Execution Plan:\n"
        summary += "=" * 40 + "\n"
        
        for i, task in enumerate(order, 1):
            agent = self.assignments.get(task, "Unknown")
            dep = self.dependencies.get(task, "None")
            summary += f"\nStep {i}: {task}\n"
            summary += f"  Agent: {agent}\n"
            summary += f"  Depends on: {dep or 'None'}\n"
        
        return summary

# Create and test the planner
planner = HierarchicalPlanner(CAMPAIGN_TASKS, TASK_DEPENDENCIES, AGENT_ASSIGNMENTS)
print(planner.get_plan_summary())

---

## Part 8: Error Recovery (Lab 12)

Implementing resilient operations with retry logic and fallbacks.

In [None]:
import time
from functools import wraps

class RetryConfig:
    """Configuration for retry behavior."""
    max_retries: int = 3
    initial_delay: float = 1.0
    backoff_multiplier: float = 2.0
    max_delay: float = 30.0

def with_retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """
    Decorator for automatic retry with exponential backoff.
    
    Args:
        max_retries: Maximum number of retry attempts
        delay: Initial delay between retries (seconds)
        backoff: Multiplier for delay on each retry
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            current_delay = delay
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries:
                        print(f"‚ö†Ô∏è Attempt {attempt + 1} failed: {str(e)}")
                        print(f"   Retrying in {current_delay:.1f}s...")
                        time.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        print(f"‚ùå All {max_retries + 1} attempts failed")
            
            raise last_exception
        return wrapper
    return decorator


class ResilientAgent(Agent):
    """
    Agent with built-in error recovery capabilities.
    """
    
    def __init__(self, name: str, fallback_response: str = "Unable to process request"):
        super().__init__(name)
        self.fallback_response = fallback_response
        self.error_count = 0
        self.success_count = 0
    
    @with_retry(max_retries=2, delay=1.0, backoff=2.0)
    def _call_llm_with_retry(self, user_message: str) -> str:
        """Call LLM with automatic retry."""
        return self._call_llm(user_message)
    
    def _select_action(self, sensors: dict) -> str:
        """Select action with error handling."""
        try:
            result = self._call_llm_with_retry(
                f"Process this input: {json.dumps(sensors)}"
            )
            self.success_count += 1
            return result
        except Exception as e:
            self.error_count += 1
            print(f"‚ö†Ô∏è Agent {self.name} falling back due to: {str(e)}")
            return self.fallback_response
    
    def get_health_status(self) -> dict:
        """Get agent health metrics."""
        total = self.success_count + self.error_count
        success_rate = (self.success_count / total * 100) if total > 0 else 100
        
        return {
            "agent": self.name,
            "total_requests": total,
            "successes": self.success_count,
            "errors": self.error_count,
            "success_rate": f"{success_rate:.1f}%",
            "status": "healthy" if success_rate >= 90 else "degraded"
        }

print("‚úÖ Error recovery patterns defined")

In [None]:
# Circuit Breaker Pattern
class CircuitBreaker:
    """
    Circuit breaker to prevent cascading failures.
    """
    
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    def record_failure(self):
        """Record a failure."""
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.state = "OPEN"
            print(f"üî¥ Circuit breaker OPENED after {self.failures} failures")
    
    def record_success(self):
        """Record a success."""
        self.failures = 0
        self.state = "CLOSED"
    
    def can_proceed(self) -> bool:
        """Check if operation can proceed."""
        if self.state == "CLOSED":
            return True
        
        if self.state == "OPEN":
            # Check if recovery timeout has passed
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = "HALF_OPEN"
                print("üü° Circuit breaker HALF_OPEN, testing...")
                return True
            return False
        
        return True  # HALF_OPEN allows one request
    
    def get_status(self) -> dict:
        """Get circuit breaker status."""
        return {
            "state": self.state,
            "failures": self.failures,
            "threshold": self.failure_threshold,
            "recovery_timeout": self.recovery_timeout
        }

# Test circuit breaker
cb = CircuitBreaker(failure_threshold=3)
print("Circuit Breaker Status:", cb.get_status())

---

## Part 9: Complete End-to-End Workflow (Lab 11)

Bringing all components together into a complete campaign processing system.

In [None]:
class CampaignProcessingSystem:
    """
    Complete end-to-end campaign processing system.
    Integrates all course techniques into a production-ready workflow.
    """
    
    def __init__(self):
        # Initialize agents
        self.validation_agent = ValidationAgent()
        self.data_prep_agent = DataPreparationAgent()
        self.monitoring_agent = MonitoringAgent()
        self.analysis_agent = AnalysisAgent()
        self.scheduling_agent = SchedulingAgent()
        self.presentation_agent = PresentationAgent()
        
        # Initialize rule-based validator
        self.rule_validator = RuleBasedValidator()
        
        # Initialize hierarchical planner
        self.planner = HierarchicalPlanner(
            CAMPAIGN_TASKS, 
            TASK_DEPENDENCIES, 
            AGENT_ASSIGNMENTS
        )
        
        # Initialize circuit breaker
        self.circuit_breaker = CircuitBreaker()
        
        # Processing state
        self.state = {}
    
    def process_campaign(self, campaign_data: dict) -> dict:
        """
        Process a campaign through the complete workflow.
        
        Args:
            campaign_data: Raw campaign data dictionary
            
        Returns:
            Processing results dictionary
        """
        print("\n" + "=" * 60)
        print("CAMPAIGN PROCESSING SYSTEM")
        print("=" * 60)
        print(f"Processing: {campaign_data.get('campaign_name', 'Unknown')}")
        print(f"Campaign ID: {campaign_data.get('campaign_id', 'Unknown')}")
        
        results = {
            "campaign_id": campaign_data.get("campaign_id"),
            "status": "processing",
            "steps": {}
        }
        
        # Step 1: Rule-Based Validation
        print("\n--- Step 1: Rule-Based Validation ---")
        validation_result = self.rule_validator.validate(campaign_data)
        results["steps"]["validation"] = validation_result
        
        if not validation_result["valid"]:
            print("‚ùå Validation failed")
            results["status"] = "validation_failed"
            return results
        print("‚úÖ Validation passed")
        
        # Step 2: AI-Powered Analysis
        print("\n--- Step 2: AI-Powered Analysis ---")
        if self.circuit_breaker.can_proceed():
            try:
                analysis = self.analysis_agent.act({"performance_data": campaign_data})
                results["steps"]["analysis"] = analysis
                self.circuit_breaker.record_success()
                print("‚úÖ Analysis complete")
            except Exception as e:
                self.circuit_breaker.record_failure()
                results["steps"]["analysis"] = f"Error: {str(e)}"
                print(f"‚ö†Ô∏è Analysis error: {str(e)}")
        else:
            results["steps"]["analysis"] = "Circuit breaker open"
            print("‚ö†Ô∏è Analysis skipped (circuit breaker open)")
        
        # Step 3: Data Preparation
        print("\n--- Step 3: Data Preparation ---")
        prepared_data = {
            **campaign_data,
            "ctr": round((campaign_data["clicks"] / campaign_data["impressions"]) * 100, 2),
            "cpa": round(campaign_data["spend"] / campaign_data["conversions"], 2),
            "conversion_rate": round((campaign_data["conversions"] / campaign_data["clicks"]) * 100, 2),
            "budget_utilization": round((campaign_data["spend"] / campaign_data["budget"]) * 100, 2)
        }
        results["steps"]["prepared_data"] = prepared_data
        print(f"‚úÖ Data prepared (CTR: {prepared_data['ctr']}%, CPA: ${prepared_data['cpa']})")
        
        # Step 4: Presentation Generation
        print("\n--- Step 4: Presentation Generation ---")
        presentation = self.presentation_agent.act({
            "analysis": results["steps"].get("analysis", ""),
            "campaign_data": prepared_data
        })
        results["steps"]["presentation"] = presentation
        print("‚úÖ Presentation content generated")
        
        # Step 5: Tool Actions
        print("\n--- Step 5: Tool Actions ---")
        ppt_result = create_ppt_file(
            {"slides": [{}, {}, {}, {}, {}, {}]},
            f"{campaign_data['campaign_id']}_report.pptx"
        )
        results["steps"]["ppt_created"] = ppt_result
        
        upload_result = upload_to_sharepoint(
            ppt_result["path"],
            f"campaigns/{campaign_data['campaign_id']}/"
        )
        results["steps"]["uploaded"] = upload_result
        
        # Finalize
        results["status"] = "completed"
        results["completed_at"] = datetime.now().isoformat()
        
        print("\n" + "=" * 60)
        print("‚úÖ CAMPAIGN PROCESSING COMPLETE")
        print("=" * 60)
        
        return results

print("‚úÖ Complete processing system defined")

In [None]:
# Run the complete workflow
system = CampaignProcessingSystem()
final_results = system.process_campaign(sample_campaign)

print("\n" + "=" * 60)
print("FINAL RESULTS SUMMARY")
print("=" * 60)
print(f"\nStatus: {final_results['status']}")
print(f"Campaign ID: {final_results['campaign_id']}")
print(f"Completed: {final_results.get('completed_at', 'N/A')}")

print("\nKey Metrics:")
if "prepared_data" in final_results["steps"]:
    pd = final_results["steps"]["prepared_data"]
    print(f"  - CTR: {pd['ctr']}%")
    print(f"  - CPA: ${pd['cpa']}")
    print(f"  - Conversion Rate: {pd['conversion_rate']}%")
    print(f"  - Budget Utilization: {pd['budget_utilization']}%")

---

## Summary: Course Techniques Applied

This notebook demonstrated the application of all 12 lab modules from GAI-3101 to the Campaign Execution & Committee Presentation use case:

| Module | Technique | Implementation |
|--------|-----------|----------------|
| **Lab 1** | Simple Python Agent | `Agent` base class with `_select_action()` and `act()` methods |
| **Lab 2** | Round Robin Communication | AutoGen `RoundRobinGroupChat` with validator, analyst, presenter |
| **Lab 3** | Reactive Agent | Real-time monitoring patterns (implicit in tool responses) |
| **Lab 4** | Deliberative Agent | LangGraph `StateGraph` with planner ‚Üí executor ‚Üí finalizer |
| **Lab 5** | Long-Term Memory | Agent memory arrays and state persistence |
| **Lab 6** | Observation Tools | `query_bigquery()`, `check_looker_dashboard()`, `get_stakeholder_calendars()` |
| **Lab 7** | Action Tools | `create_ppt_file()`, `send_email()`, `schedule_meeting()`, `upload_to_sharepoint()` |
| **Lab 8** | Hierarchical Planning | `HierarchicalPlanner` with task dependencies |
| **Lab 9** | Rule-Based Reasoning | `RuleBasedValidator` with deterministic quality checks |
| **Lab 10** | Robustness Evaluation | Validation testing with valid/invalid data |
| **Lab 11** | Personal Assistant | `CampaignProcessingSystem` end-to-end workflow |
| **Lab 12** | Error Recovery | `@with_retry` decorator, `CircuitBreaker`, `ResilientAgent` |

### Business Impact

- **Time Savings**: From 40 hours ‚Üí 16.5 hours (59% reduction)
- **Error Reduction**: Rule-based validation catches data issues early
- **Consistency**: Standardized analysis and presentation format
- **Scalability**: Multi-agent architecture enables parallel processing
- **Resilience**: Built-in error recovery ensures workflow completion

---

## Next Steps

1. **Connect to Real Data Sources**: Replace simulated tools with actual BigQuery, Looker, and SharePoint integrations
2. **Add Long-Term Memory**: Implement vector database for campaign history and learnings
3. **Enhance Monitoring**: Add real-time alerts and dashboard updates
4. **Production Deployment**: Containerize and deploy to Google Cloud Run
5. **User Interface**: Build a Streamlit or Gradio interface for campaign submission