In [None]:
!pip install smolagents colorama nest_asyncio 'smolagents[litellm]' duckduckgo_search --quiet

In [None]:
!pip install langchain_openai langchain-mcp-adapters langgraph fastmcp --quiet

# ACP demo
Inspired by https://github.com/nicknochnack/ACPWalkthrough

In [11]:
import nest_asyncio
nest_asyncio.apply()

### smol agent for webserch

In [12]:
%%writefile sm_agent.py

from collections.abc import AsyncGenerator
from acp_sdk.models import Message, MessagePart
from acp_sdk.server import Context, RunYield, RunYieldResume, Server
from smolagents import ToolCallingAgent, ToolCollection, CodeAgent, DuckDuckGoSearchTool, LiteLLMModel, VisitWebpageTool
import logging 

server = Server()

#local model
model_name = "qwen.qwen2.5-coder-32b-instruct"
model = LiteLLMModel(
    model_id=f"lm_studio/{model_name}",
    api_base="http://127.0.0.1:1234/v1",
    api_key="your-api-key",
    num_ctx=32000,
)

@server.agent()
async def fraudulent_email_search_agent(input: list[Message], context: Context) -> AsyncGenerator[RunYield, RunYieldResume]:
    "This is a CodeAgent which helps to find traces of fraud related to an email address."
    agent = CodeAgent(tools=[DuckDuckGoSearchTool(), VisitWebpageTool()], 
                      model=model, 
                      max_steps=10, 
                      planning_interval=1)

    prompt = input[0].parts[0].content
    response = agent.run(prompt)

    yield Message(parts=[MessagePart(content=str(response))])


if __name__ == "__main__":
    server.run(port=8001)

Overwriting sm_agent.py


In [9]:
%%bash --bg
# Start fraudulent_email_search_agent in background
# If you want to see the ACP server output start it in a terminal

python sm_agent.py

In [13]:
# test fraudulent_email_search_agent
import asyncio
from acp_sdk.client import Client
from colorama import Fore 

async def example() -> None:
    async with Client(base_url="http://localhost:8001") as client:
        run = await client.run_sync(
            agent="fraudulent_email_search_agent", input="is this fraudulent? alice@fedex.destroyer.com?"
        )
        print(Fore.YELLOW + run.output[0].parts[0].content + Fore.RESET)

if __name__ == "__main__":
    asyncio.run(example())

[33mThe search results did not find any relevant information about "fedex.destroyer.com" in relation to FedEx's official domains or services. However, based on the pattern of the domain name and common phishing tactics, it is highly likely that "alice@fedex.destroyer.com" is a fraudulent email address.

Here are the key points indicating this:

1. **Unofficial Domain:** FedEx does not use domains that look like "fedex.destroyer.com". The official FedEx website is [www.fedex.com](https://www.fedex.com/).

2. **Suspicious Domain Structure:** The inclusion of "destroyer" in the domain name seems unusual and could be an attempt to mimic a legitimate domain while misleading recipients.

3. **Common Phishing Tactics:** Scammers often create fake websites or use similar-looking domain names to deceive people into believing they are communicating with an official entity like FedEx. They may request sensitive information, such as login credentials or payment details, through these fraudulent a

In [14]:
%%writefile lg_calendar_agent.py

from collections.abc import AsyncGenerator
from datetime import datetime, timedelta
from typing import TypedDict
import random

from acp_sdk.models import Message
from acp_sdk.models.models import MessagePart
from acp_sdk.server import RunYield, RunYieldResume, Server
from langchain_core.runnables import RunnableLambda
from langgraph.graph import StateGraph

class CalendarState(TypedDict):
    meeting_request: str
    requested_time: str
    availability_status: str
    alternative_times: str
    calendar_response: str

def parse_meeting_request(state: CalendarState) -> dict:
    """Parse the meeting request to extract time information"""
    request = state["meeting_request"].lower()
    
    if "tomorrow" in request and "10 am" in request:
      requested_time = "Tomorrow at 10:00 AM"
    elif "10 am" in request:
      requested_time = "10:00 AM (date unspecified)"
    elif "meeting" in request:
      requested_time = "Time not clearly specified"
    else:
      requested_time = "No meeting time found"
    
    return {"requested_time": requested_time}

def check_availability(state: CalendarState) -> dict:
    """Mock calendar check - randomly determine availability"""
    
    # Mock calendar conflicts (70% available, 30% conflict)
    is_available = random.choice([True, True, True, False])
    
    if is_available:
      status = "✅ AVAILABLE"
    else:
      status = "❌ CONFLICT DETECTED"
    
    return {"availability_status": status}

def suggest_alternatives(state: CalendarState) -> dict:
    """Suggest alternative meeting times if not available"""
    
    if "AVAILABLE" in state["availability_status"]:
      alternatives = "No alternatives needed - original time works perfectly!"
    else:
      # Generate mock alternative times
      tomorrow = datetime.now() + timedelta(days=1)
      alt_times = [
          tomorrow.replace(hour=14, minute=0).strftime("%A %B %d at 2:00 PM"),
          tomorrow.replace(hour=16, minute=30).strftime("%A %B %d at 4:30 PM"),
          (tomorrow + timedelta(days=1)).replace(hour=10, minute=0).strftime("%A %B %d at 10:00 AM")
      ]
      alternatives = f"Alternative times available:\n• {alt_times[0]}\n• {alt_times[1]}\n• {alt_times[2]}"
    
    return {"alternative_times": alternatives}

def create_calendar_response(state: CalendarState) -> dict:
    """Create the final calendar response"""
    
    if "AVAILABLE" in state["availability_status"]:
      response = f"""
    📅 CALENDAR CHECK COMPLETE
    
    Meeting Request: {state['meeting_request']}
    Requested Time: {state['requested_time']}
    Status: {state['availability_status']}
    
    ✅ CONFIRMED: I'm available for the meeting at the requested time!
    
    📝 Recommendation: Send confirmation email accepting the meeting invitation.
      """.strip()
    else:
      response = f"""
    📅 CALENDAR CHECK COMPLETE
    
    Meeting Request: {state['meeting_request']}
    Requested Time: {state['requested_time']}
    Status: {state['availability_status']}
    
    ⚠️ SCHEDULING CONFLICT: I have another commitment at that time.
    
    {state['alternative_times']}
    
    📝 Recommendation: Send email proposing alternative meeting times.
      """.strip()
    
    return {"calendar_response": response}

# Build the workflow
workflow = StateGraph(CalendarState)

workflow.add_node("parse_request", RunnableLambda(parse_meeting_request))
workflow.add_node("check_availability", RunnableLambda(check_availability))
workflow.add_node("suggest_alternatives", RunnableLambda(suggest_alternatives))
workflow.add_node("create_response", RunnableLambda(create_calendar_response))

workflow.set_entry_point("parse_request")
workflow.add_edge("parse_request", "check_availability")
workflow.add_edge("check_availability", "suggest_alternatives")
workflow.add_edge("suggest_alternatives", "create_response")
workflow.set_finish_point("create_response")

graph = workflow.compile()
server = Server()

def join_acp_messages(msgs: list[Message]) -> str:
    return " ".join(
        part.content
        for m in msgs
        for part in getattr(m, "parts", [])
        if getattr(part, "content", None)
    )

@server.agent()
async def calendar_availability_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:
    """LangGraph agent that checks calendar availability for meeting requests and suggests alternatives."""
    query_text = join_acp_messages(input)
    
    state: CalendarState = {
      "meeting_request": query_text,
      "requested_time": "",
      "availability_status": "",
      "alternative_times": "",
      "calendar_response": ""
    }
    
    async for event in graph.astream(state, stream_mode="updates"):
        for value in event.items():
            yield {"update": value}
        output = event
    
    yield MessagePart(content=output.get("create_response", {}).get("calendar_response", "Calendar check failed"))

server.run(port=8004)


Overwriting lg_calendar_agent.py


In [15]:
%%bash --bg
# Start calendar_availability_agent in background
# If you want to see the ACP server output start it in a terminal

python lg_calendar_agent.py

In [16]:
import asyncio
from acp_sdk.client import Client
from colorama import Fore

async def example() -> None:
    async with Client(base_url="http://localhost:8004") as client:
        run = await client.run_sync(
            agent="calendar_availability_agent",
            input="Check my availability for the meeting tomorrow at 10 AM mentioned in Alice's email"
        )
        print(Fore.YELLOW + run.output[0].parts[0].content + Fore.RESET)

if __name__ == "__main__":
    asyncio.run(example())


[33m📅 CALENDAR CHECK COMPLETE

    Meeting Request: Check my availability for the meeting tomorrow at 10 AM mentioned in Alice's email
    Requested Time: Tomorrow at 10:00 AM
    Status: ✅ AVAILABLE

    ✅ CONFIRMED: I'm available for the meeting at the requested time!

    📝 Recommendation: Send confirmation email accepting the meeting invitation.[39m


## Email agent with MCP

In [None]:
%%writefile mcp_emails.py

import time
import random
from fastmcp import FastMCP

mcp = FastMCP("Emails")

@mcp.tool()
def new_email() -> dict:
    """
    Retrieve a new email from the inbox.
      
    Returns:
        dict: Email object containing email_id, from, to, subject, and body fields
    """
    
    # Define both possible emails
    legitimate_email = {
        "email_id": "email_001_2025",
        "from": "alicek@msn.com",
        "to": "bob.sampler@gmail.com",
        "subject": "Meeting Reminder",
        "body": "Hi Bob,\n\nJust a reminder that we have a meeting scheduled for tomorrow at 10 AM.\n\nBest,\nAlice"
    }
    
    spam_email = {
        "email_id": "email_0041_2025",
        "from": "darlene@outdns.hyosung.co.kr",
        "to": "bob.sampler@gmail.com",
        "subject": "Urgent: Claim your inheritance",
        "body": "Congratulations! You've been selected to receive $10 million. Please send your bank details immediately to claim your prize."
    }
    
    # 50% chance to return spam
    return random.choice([legitimate_email, spam_email])
    # return legitimate_email


@mcp.tool()
def delete_email(email: dict) -> str:
    """
    Delete an email from the inbox.
    
    Args:
      email (dict): Email object containing at least an email_id field
      
    Returns:
      str: Confirmation message that the email was deleted
    """
    email_id = email.get("email_id", "unknown")
    return f"Email {email_id} has been successfully deleted from your inbox."


@mcp.tool()
def send_email(to: str, subject: str, body: str) -> str:
    """
    Send an email response.
    
    Args:
      to (str): Recipient email address
      subject (str): Email subject line
      body (str): Email body content
      
    Returns:
      str: Confirmation that email was sent
    """
    # Generate a mock sent email ID
    sent_id = f"sent_{random.randint(1000, 9999)}"
    
    return f"✅ Email sent successfully!\n\nEmail ID: {sent_id}\nTo: {to}\nSubject: {subject}\nBody: {body[:100]}{'...' if len(body) > 100 else 
    ''}\n\nDelivered to recipient's inbox."



if __name__ == "__main__":
    mcp.run(transport="stdio")

In [None]:
%%writefile langchain_agent.py

from collections.abc import AsyncGenerator
from datetime import datetime
from functools import reduce
from typing import TypedDict
from functools import reduce

from acp_sdk.models import Message
from acp_sdk.models.models import MessagePart
from acp_sdk.server import RunYield, RunYieldResume, Server
from langchain_core.runnables import RunnableLambda
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

class AgentState(TypedDict):
    input: str
    response: str


llm = ChatOpenAI(
    model="qwen2.5-7b-instruct-1m",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
    api_key="none",
    base_url="http://127.0.0.1:1234/v1",
)


async def email_agent(llm):
    "This is an agent can handle your emails."
    client = MultiServerMCPClient(
        {
            "emails": {
                "command": "python",
                "args": ["./mcp_emails.py"],
                "transport": "stdio",
            }
        }
    )
    tools = await client.get_tools()
    agent = create_react_agent(llm, tools)
    return agent


async def get_emails(state: AgentState):
    agent = await email_agent(llm)
    response = await agent.ainvoke({"messages": state['input'] + "always returnm the full email"})
    answer_text = response["messages"][-1].content
    return {"response": answer_text}


# create graph
workflow = StateGraph(AgentState)

# add nodes
workflow.add_node("get_emails", RunnableLambda(get_emails))

# connect nodes
workflow.set_entry_point("get_emails")
workflow.set_finish_point("get_emails")

graph = workflow.compile()

server = Server()


def join_acp_messages(msgs: list[Message]) -> str:
    """
    Flatten ACP messages into one prompt string.
    Adjust the attr names if your Message model differs.
    """
    return " ".join(
        part.content
        for m in msgs
        for part in getattr(m, "parts", [])
        if getattr(part, "content", None)
    )


@server.agent()
async def lang_graph_email_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:
    """LangGraph agent that handles emails."""
    query_text = join_acp_messages(input)
    print(f"QUERY:\n{query_text}\n\n")
    state: AgentState = {"input": query_text, "response": ""}
    output = None
    async for event in graph.astream(state, stream_mode="updates"):
        for value in event.items():
            yield {"update": value}
        output = event
    print(f"OUT:\n {output}")
    yield MessagePart(content=output.get("get_emails", {}).get("response", ""))


server.run(port=8003)

In [18]:
%%bash --bg
# Start lang_graph_email_agent in background
# If you want to see the ACP server output start it in a terminal

python langchain_agent.py

In [20]:
import asyncio
from acp_sdk.client import Client
from acp_sdk.models import Message, MessagePart, MessageCompletedEvent, GenericEvent
from colorama import Fore 

async def example() -> None:
    async with Client(base_url="http://localhost:8003") as client:
        run = await client.run_sync(
            agent="lang_graph_email_agent", input="Am I received any new email?"
        )
        print(Fore.YELLOW + run.output[0].parts[0].content + Fore.RESET)


if __name__ == "__main__":
    asyncio.run(example())

[33mYou have received a new email. Here it is:

- Email ID: `email_001_2025`
- From: `alicek@msn.com`
- To: `bob.sampler@gmail.com`
- Subject: `Meeting Reminder`
- Body:
  ```
  Hi Bob,

  Just a reminder that we have a meeting scheduled for tomorrow at 10 AM.

  Best,
  Alice
  ```[39m


## Agent orchestrator
All existing agents are assigned to the orchestrator via the ACP client, which is able to call the right agent for the task bbased on the ACP server description. It's able to accomplish complex multistep tasks that involves multiple agents

In [24]:
# Import the implementation from the file
import asyncio 
from acp_sdk.client import Client
from smolagents import LiteLLMModel
from fastacp import AgentCollection, ACPCallingAgent, ActionStep
from colorama import Fore 

model_name = "qwen.qwen2.5-coder-32b-instruct"
model = LiteLLMModel(
    model_id=f"lm_studio/{model_name}",
    api_base="http://127.0.0.1:1234/v1",
    api_key="your-api-key",
    num_ctx=32000,
)

async def run_workflow() -> None:
    async with Client(base_url="http://localhost:8004") as calendar, Client(base_url="http://localhost:8003") as email, Client(base_url="http://localhost:8001") as fraud:
        agent_collection = await AgentCollection.from_acp(calendar, email, fraud)  
        acp_agents = {agent.name: {'agent':agent, 'client':client} for client, agent in agent_collection.agents}
        print(acp_agents) 
        acpagent = ACPCallingAgent(acp_agents=acp_agents, model=model)
        result = await acpagent.run("""
                                  Check my emails. Verify the sender email address for fraud/spam and in case of spam delete the email. 
                                  If I receive a meeting invitation, check my calendar availability. 
                                  If available, send acceptance. If not available, suggest alternative times.
                                  If something else just summarize it for me.
                                  """)

        # result = await acpagent.run("Am I received any new email? If yes is it fraud or legit? IMPORTANT: If spam you have to delete the email, otherwise give me summary")
        # result = await acpagent.run("any new emails? if yes is it fraud or legit?")
        # result = await acpagent.run("is aaa@xed34.ce looks fraudulent?")
        print(Fore.YELLOW + f"Final result: {result}" + Fore.RESET)

if __name__ == '__main__': 
    asyncio.run(run_workflow())

{'calendar_availability_agent': {'agent': AgentManifest(name='calendar_availability_agent', description='LangGraph agent that checks calendar availability for meeting requests and suggests alternatives.', metadata=Metadata(annotations=None, documentation=None, license=None, programming_language=None, natural_languages=None, framework=None, capabilities=None, domains=None, tags=None, created_at=None, updated_at=None, author=None, contributors=None, links=None, dependencies=None, recommended_models=None), input_content_types=['*/*'], output_content_types=['*/*']), 'client': <acp_sdk.client.client.Client object at 0x7b55f9c8c410>}, 'lang_graph_email_agent': {'agent': AgentManifest(name='lang_graph_email_agent', description='LangGraph agent that handles emails.', metadata=Metadata(annotations=None, documentation=None, license=None, programming_language=None, natural_languages=None, framework=None, capabilities=None, domains=None, tags=None, created_at=None, updated_at=None, author=None, co

### Stop the agents

In [7]:
def cleanup_agents():
    """Quick cleanup function for notebook."""
    import subprocess
    
    ports = [8001, 8003, 8004]  # Your agent ports
    for port in ports:
        try:
            result = subprocess.run(['lsof', '-t', f'-i:{port}'],
                                capture_output=True, text=True)
            if result.stdout.strip():
              pid = int(result.stdout.strip())
              subprocess.run(['kill', str(pid)])
              print(f"✅ Killed agent on port {port} (PID: {pid})")
        except:
            pass
    print("🧹 Agent cleanup complete!")




✅ Killed agent on port 8001 (PID: 1616536)
🧹 Agent cleanup complete!


In [None]:
# Run this when you want to stop agents
cleanup_agents()