<center>
    <p style="text-align:center">
        <img alt="phoenix logo" src="https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg" width="1000"/>
        <br>
        <br>
        <a href="https://docs.arize.com/phoenix/">Docs</a>
        |
        <a href="https://github.com/Arize-ai/phoenix">GitHub</a>
        |
        <a href="https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email">Community</a>
    </p>
</center>
<h1 align="center">Tracing CrewAI with Arize Phoenix - Routing Workflow</h1>

In [None]:
%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai

## Set up Keys and Dependencies

Note: For this colab you'll need:

*   OpenAI API key (https://openai.com/)
*   Serper API key (https://serper.dev/)
*   Phoenix API key (https://app.phoenix.arize.com/)

In [None]:
import getpass
import os

# Prompt the user for their API keys if they haven't been set
openai_key = os.getenv("OPENAI_API_KEY", "OPENAI_API_KEY")
serper_key = os.getenv("SERPER_API_KEY", "SERPER_API_KEY")

if openai_key == "OPENAI_API_KEY":
    openai_key = getpass.getpass("Please enter your OPENAI_API_KEY: ")

if serper_key == "SERPER_API_KEY":
    serper_key = getpass.getpass("Please enter your SERPER_API_KEY: ")

# Set the environment variables with the provided keys
os.environ["OPENAI_API_KEY"] = openai_key
os.environ["SERPER_API_KEY"] = serper_key

if "PHOENIX_API_KEY" not in os.environ:
    os.environ["PHOENIX_API_KEY"] = getpass.getpass("Enter your Phoenix API key: ")

os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={os.environ['PHOENIX_API_KEY']}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com/"

## Configure Tracing

In [None]:
from phoenix.otel import register

tracer_provider = register(
    project_name="crewai-agents", endpoint="https://app.phoenix.arize.com/v1/traces"
)

# Instrument CrewAI

In [None]:
from openinference.instrumentation.crewai import CrewAIInstrumentor

CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)

## Define your Working Agents

In [None]:
import nest_asyncio
from crewai import Agent, Crew, Process, Task
from crewai.flow import Flow, listen, router, start
from pydantic import BaseModel

research_analyst = Agent(
    role="Senior Research Analyst",
    goal="Gather and summarize data on the requested topic.",
    backstory="Expert in tech market trends.",
    allow_delegation=False,
)

content_strategist = Agent(
    role="Tech Content Strategist",
    goal="Craft an article outline based on provided research.",
    backstory="Storyteller who turns data into narratives.",
    allow_delegation=False,
)

From here, there are two ways to do this -- through a routing Agent or through ```@router()``` decorator in Flows (allows you to define conditional routing logic based on the output of a method).

### Option 1: Define your logic for Router Agent to classify the query & run corresponding Agent

In [None]:
routerAgent = Agent(
    role="Router",
    goal="Classify each query as either 'research' or 'content outline'.",
    backstory="Triage bot for content workflows.",
    verbose=False,
)


def route(user_input: str, router):
    router_task = Task(
        description=user_input, agent=router, expected_output="One word: 'research' or 'content'"
    )
    router_classify = Crew(
        agents=[router], tasks=[router_task], process=Process.sequential, verbose=False
    )
    router_results = router_classify.kickoff()
    return router_results


def type_of_task(router_results):
    if isinstance(router_results, list):
        result = router_results[0]
        result_text = result.text if hasattr(result, "text") else str(result)
    else:
        result_text = (
            router_results.text if hasattr(router_results, "text") else str(router_results)
        )
    task_type = result_text.strip().lower()

    return task_type


def working_agent(task_type, user_input: str):
    if "research" in task_type:
        agent = research_analyst
        label = "Research Analyst"
    else:
        agent = content_strategist
        label = "Content Strategist"

    work_task = Task(description=user_input, agent=agent, expected_output="Agent response")
    worker_crew = Crew(agents=[agent], tasks=[work_task], process=Process.sequential, verbose=True)
    work_results = worker_crew.kickoff()
    if isinstance(work_results, list):
        output = work_results[0].text if hasattr(work_results[0], "text") else str(work_results[0])
    else:
        output = work_results.text if hasattr(work_results, "text") else str(work_results)

    print(f"\n=== Routed to {label} ({task_type}) ===\n{output}\n")
    return output

#### Examples Runs

In [None]:
# ─── 4) Example Runs ─────────────────────────────────────────────────────────
for query in [
    "Please research the latest AI safety papers.",
    "Outline an article on AI safety trends.",
]:
    router_output = route(query, routerAgent)
    task_output = type_of_task(router_output)
    working_agent(task_output, query)

### Option 2: Define your logic for ```@router()``` Decorator to define routing logic

In [None]:
nest_asyncio.apply()


# Define Flow State
class RoutingState(BaseModel):
    query: str = ""
    route: str = ""


# Define Structured Flow
class RoutingFlow(Flow[RoutingState]):
    def __init__(self, query: str):
        super().__init__(state=RoutingState(query=query))

    @start()
    def handle_query(self):
        print(f"📥 Incoming Query: {self.state.query}")

    @router(handle_query)
    def decide_route(self):
        if "research" in self.state.query.lower():
            self.state.route = "research"
            return "research"
        else:
            self.state.route = "outline"
            return "outline"

    @listen("research")
    def run_research(self):
        task = Task(
            description=self.state.query,
            expected_output="Summary of findings on AI safety",
            agent=research_analyst,
        )
        crew = Crew(
            agents=[research_analyst], tasks=[task], process=Process.sequential, verbose=True
        )
        crew.kickoff()

    @listen("outline")
    def run_content_strategy(self):
        task = Task(
            description=self.state.query,
            expected_output="An article outline about the given topic",
            agent=content_strategist,
        )
        crew = Crew(
            agents=[content_strategist], tasks=[task], process=Process.sequential, verbose=True
        )
        crew.kickoff()

#### Examples Runs

In [None]:
queries = [
    "Please research the latest AI safety papers.",
    "Outline an article on AI safety trends.",
]

for query in queries:
    flow = RoutingFlow(query=query)
    flow.kickoff()

### Check your Phoenix project to view the traces and spans from your runs.