## Lab 03 Complex Crew Setups: Building Flows

Agentic Sales Pipeline


In [1]:
import yaml
from crewai import LLM, Agent, Crew, Task

In [2]:
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

True

In [3]:
# Define file paths for YAML configurations
files = {
    "lead_agents": "./config/lead_qualification_agents.yaml",
    "lead_tasks": "./config/lead_qualification_tasks.yaml",
    "email_agents": "./config/email_engagement_agents.yaml",
    "email_tasks": "./config/email_engagement_tasks.yaml",
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, "r") as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
lead_agents_config = configs["lead_agents"]
lead_tasks_config = configs["lead_tasks"]
email_agents_config = configs["email_agents"]
email_tasks_config = configs["email_tasks"]

In [4]:
from typing import List, Optional

from pydantic import BaseModel, Field

In [5]:
class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(
        ...,
        ge=0,
        le=10,
        description="A score representing how relevant the lead's role is to the decision-making process (0-10).",
    )
    professional_background: Optional[str] = Field(
        ..., description="A brief description of the lead's professional background."
    )


class CompanyInfo(BaseModel):
    company_name: str = Field(
        ..., description="The name of the company the lead works for."
    )
    industry: str = Field(
        ..., description="The industry in which the company operates."
    )
    company_size: int = Field(
        ..., description="The size of the company in terms of employee count."
    )
    revenue: Optional[float] = Field(
        None, description="The annual revenue of the company, if available."
    )
    market_presence: int = Field(
        ...,
        ge=0,
        le=10,
        description="A score representing the company's market presence (0-10).",
    )


class LeadScore(BaseModel):
    score: int = Field(
        ..., ge=0, le=100, description="The final score assigned to the lead (0-100)."
    )
    scoring_criteria: List[str] = Field(
        ..., description="The criteria used to determine the lead's score."
    )
    validation_notes: Optional[str] = Field(
        None, description="Any notes regarding the validation of the lead score."
    )


class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(
        ..., description="Personal information about the lead."
    )
    company_info: CompanyInfo = Field(
        ..., description="Information about the lead's company."
    )
    lead_score: LeadScore = Field(
        ..., description="The calculated score and related information for the lead."
    )

In [6]:
from crewai_tools import ScrapeWebsiteTool, SerperDevTool

/home/zeyang/git-repos/ai-courses/deeplearning_ai/crewai_practical_multi_ai_agents/.venv/lib/python3.12/site-packages/pydantic/fields.py:1093: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'required'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  warn(


### Lead Qualification Crew, Agents and Tasks


In [7]:
llm = LLM(model="openai/gpt-4.1-mini", temperature=0.2, timeout=60, stream=False)

In [8]:
# Creating Agents
lead_data_agent = Agent(
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
    llm=llm,
    **lead_agents_config["lead_data_agent"],
)

cultural_fit_agent = Agent(
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
    llm=llm,
    **lead_agents_config["cultural_fit_agent"],
)

scoring_validation_agent = Agent(
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
    llm=llm,
    **lead_agents_config["scoring_validation_agent"],
)

In [9]:
# Creating Tasks
lead_data_task = Task(
    agent=lead_data_agent,
    **lead_tasks_config["lead_data_collection"],
)

cultural_fit_task = Task(
    agent=cultural_fit_agent,
    **lead_tasks_config["cultural_fit_analysis"],
)

scoring_validation_task = Task(
    agent=scoring_validation_agent,
    context=[lead_data_task, cultural_fit_task],
    output_pydantic=LeadScoringResult,
    **lead_tasks_config["lead_scoring_and_validation"],
)

In [10]:
# Creating Crew
lead_scoring_crew = Crew(
    agents=[lead_data_agent, cultural_fit_agent, scoring_validation_agent],
    tasks=[lead_data_task, cultural_fit_task, scoring_validation_task],
    verbose=True,
)

### Email Engagement Crew


In [11]:
# Creating Agents
email_content_specialist = Agent(
    llm=llm,
    **email_agents_config["email_content_specialist"],
)

engagement_strategist = Agent(
    llm=llm,
    **email_agents_config["engagement_strategist"],
)

In [12]:
# Creating Tasks
email_drafting = Task(
    agent=email_content_specialist,
    **email_tasks_config["email_drafting"],
)


engagement_optimization = Task(
    agent=engagement_strategist,
    **email_tasks_config["engagement_optimization"],
)

In [13]:
# Creating Crew
email_writing_crew = Crew(
    agents=[email_content_specialist, engagement_strategist],
    tasks=[email_drafting, engagement_optimization],
    verbose=True,
)

### Creating Complete Sales Flow


In [14]:
from crewai import Flow
from crewai.flow.flow import listen, start

In [15]:
class SalesPipeline(Flow):
    @start()
    def fetch_leads(self) -> list[dict[str, dict[str, str]]]:
        # Pull our leads from the database
        leads = [
            {
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment.",
                },
            },
        ]
        return leads

    @listen(fetch_leads)
    def score_leads(self, leads: list[dict[str, dict[str, str]]]) -> list:
        scores = lead_scoring_crew.kickoff_for_each(leads)
        self.state["score_crews_results"] = scores
        return scores

    @listen(score_leads)
    def store_leads_score(self, scores: list) -> list:
        # Here we would store the scores in the database
        return scores

    @listen(score_leads)
    def filter_leads(self, scores: list) -> list:
        return [score for score in scores if score["lead_score"].score > 70]

    @listen(filter_leads)
    def write_email(self, leads: list) -> list:
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    def send_email(self, emails: list) -> list:
        # Here we would send the emails to the leads
        return emails


flow = SalesPipeline()

### Plotting the Flow


In [16]:
flow.plot()

Plot saved as crewai_flow.html


In [17]:
from IPython.display import IFrame

In [None]:
IFrame(src="./crewai_flow.html", width="150%", height=600)

### Flow Kickoff


In [20]:
emails = await flow.kickoff_async()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

### Usage Metrics and Costs


In [21]:
import pandas as pd

In [31]:
# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame(
    [flow.state["score_crews_results"][0].token_usage.model_dump()]
)

# Calculate total costs
costs = 0.150 * df_usage_metrics["total_tokens"].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0031


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,20368,18401,0,1967,9


In [32]:
# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.model_dump()])

# Calculate total costs
costs = 0.150 * df_usage_metrics["total_tokens"].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0002


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,1238,968,0,270,2


### Inspecting Results


In [24]:
scores = flow.state["score_crews_results"]

In [25]:
from IPython.display import HTML, display

In [26]:
lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    "Name": lead_scoring_result.personal_info.name,
    "Job Title": lead_scoring_result.personal_info.job_title,
    "Role Relevance": lead_scoring_result.personal_info.role_relevance,
    "Professional Background": lead_scoring_result.personal_info.professional_background,
    "Company Name": lead_scoring_result.company_info.company_name,
    "Industry": lead_scoring_result.company_info.industry,
    "Company Size": lead_scoring_result.company_info.company_size,
    "Revenue": lead_scoring_result.company_info.revenue,
    "Market Presence": lead_scoring_result.company_info.market_presence,
    "Lead Score": lead_scoring_result.lead_score.score,
    "Scoring Criteria": ", ".join(lead_scoring_result.lead_score.scoring_criteria),
    "Validation Notes": lead_scoring_result.lead_score.validation_notes,
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient="index", columns=["Value"])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={"index": "Attribute"})

# Create HTML table with bold attributes and left-aligned values
html_table = (
    df.style.set_properties(**{"text-align": "left"})
    .format({"Attribute": lambda x: f"<b>{x}</b>"})
    .hide(axis="index")
    .to_html()
)

# Display the styled HTML table
display(HTML(html_table))

Attribute,Value
Name,João Moura
Job Title,Director of Engineering
Role Relevance,9
Professional Background,"João Moura is a results-driven engineering leader with close to 20 years of experience in software engineering and AI. He has led enterprise AI product development at Clearbit, focusing on data enrichment and marketing intelligence solutions."
Company Name,Clearbit
Industry,"Business Intelligence, Data Enrichment, Marketing Intelligence"
Company Size,250
Revenue,20.000000
Market Presence,8
Lead Score,84


In [27]:
import textwrap

In [28]:
result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)

print(wrapped_text)

Thanks for reaching out to CrewAI. Our Multi-Agent Orchestration Platform is
designed to help enterprise teams like yours automate complex workflows across
any vertical. Imagine reducing manual tasks while increasing efficiency through
seamless AI agent collaboration.  Ready to see how CrewAI can transform your
automation strategy? Schedule a quick demo this week to explore tailored
solutions for your business.    [Book Your Demo Now]    Have specific challenges
or goals? Reply with your top priorities, and we’ll prepare a customized plan to
address them immediately.    Don’t miss the chance to accelerate your automation
journey—let’s get started today.    [Schedule a Meeting]


### How Complex Can it Get?


In [29]:
# from crewai import Flow
from crewai.flow.flow import and_, listen, router, start

In [33]:
class SalesPipeline(Flow):

    @start()
    def fetch_leads(self):
        # Pull our leads from the database
        # This is a mock, in a real-world scenario, this is where you would
        # fetch leads from a database
        leads = [
            {
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment.",
                },
            },
        ]
        return leads

    @listen(fetch_leads)
    def score_leads(self, leads):
        scores = lead_scoring_crew.kickoff_for_each(leads)
        self.state["score_crews_results"] = scores
        return scores

    @listen(score_leads)
    def store_leads_score(self, scores):
        # Here we would store the scores in the database
        return scores

    @listen(score_leads)
    def filter_leads(self, scores):
        return [score for score in scores if score["lead_score"].score > 70]

    @listen(and_(filter_leads, store_leads_score))
    def log_leads(self, leads):
        print(f"Leads: {leads}")

    @router(filter_leads)
    def count_leads(self, scores):
        if len(scores) > 10:
            return "high"
        elif len(scores) > 5:
            return "medium"
        return "low"

    @listen("high")
    def store_in_salesforce(self, leads):
        return leads

    @listen("medium")
    def send_to_sales_team(self, leads):
        return leads

    @listen("low")
    def write_email(self, leads):
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    def send_email(self, emails):
        # Here we would send the emails to the leads
        return emails

In [34]:
flow = SalesPipeline()
flow.plot(filename="crewai_flow_complex")

Plot saved as crewai_flow_complex.html


In [None]:
from IPython.display import IFrame

IFrame(src="./crewai_flow_complex.html", width="150%", height=600)