# L3: Agentic Sales Pipeline

# Import libs

In [30]:
# Warning control
import warnings
import os
import yaml
import nest_asyncio
import pandas as pd
import textwrap

from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

from dotenv import load_dotenv

from crewai import Agent, Task, Crew, Flow
from crewai_tools import BaseTool, SerperDevTool, ScrapeWebsiteTool
from crewai.flow.flow import listen, start

from IPython.display import IFrame, display, HTML

load_dotenv()  # take environment variables from .env
warnings.filterwarnings("ignore")

os.environ["OPENAI_MODEL_NAME"] = "gpt-4o"

# Loading Tasks and Agents YAML files

In [31]:
# Define file paths for YAML configurations
files = {
    "lead_agents": "config/L3_lead_qualification_agents.yaml",
    "lead_tasks": "config/L3_lead_qualification_tasks.yaml",
    "email_agents": "config/L3_email_engagement_agents.yaml",
    "email_tasks": "config/L3_email_engagement_tasks.yaml",
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, "r") as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
lead_agents_config = configs["lead_agents"]
lead_tasks_config = configs["lead_tasks"]
email_agents_config = configs["email_agents"]
email_tasks_config = configs["email_tasks"]

# Create Pydantic Models for Structured Output

In [32]:
class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(
        ...,
        ge=0,
        le=10,
        description="A score representing how relevant the lead's role is to the decision-making process (0-10).",
    )
    professional_background: Optional[str] = Field(
        ..., description="A brief description of the lead's professional background."
    )


class CompanyInfo(BaseModel):
    company_name: str = Field(
        ..., description="The name of the company the lead works for."
    )
    industry: str = Field(
        ..., description="The industry in which the company operates."
    )
    company_size: int = Field(
        ..., description="The size of the company in terms of employee count."
    )
    revenue: Optional[float] = Field(
        None, description="The annual revenue of the company, if available."
    )
    market_presence: int = Field(
        ...,
        ge=0,
        le=10,
        description="A score representing the company's market presence (0-10).",
    )


class LeadScore(BaseModel):
    score: int = Field(
        ..., ge=0, le=100, description="The final score assigned to the lead (0-100)."
    )
    scoring_criteria: List[str] = Field(
        ..., description="The criteria used to determine the lead's score."
    )
    validation_notes: Optional[str] = Field(
        None, description="Any notes regarding the validation of the lead score."
    )


class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(
        ..., description="Personal information about the lead."
    )
    company_info: CompanyInfo = Field(
        ..., description="Information about the lead's company."
    )
    lead_score: LeadScore = Field(
        ..., description="The calculated score and related information for the lead."
    )

# Lead Qualification Crew, Agents and Tasks

In [33]:
# Creating Agents
lead_data_agent = Agent(
    config=lead_agents_config["lead_data_agent"],
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
)

cultural_fit_agent = Agent(
    config=lead_agents_config["cultural_fit_agent"],
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
)

scoring_validation_agent = Agent(
    config=lead_agents_config["scoring_validation_agent"],
    tools=[SerperDevTool(), ScrapeWebsiteTool()],
)

# Creating Tasks
lead_data_task = Task(
    config=lead_tasks_config["lead_data_collection"], agent=lead_data_agent
)

cultural_fit_task = Task(
    config=lead_tasks_config["cultural_fit_analysis"], agent=cultural_fit_agent
)

scoring_validation_task = Task(
    config=lead_tasks_config["lead_scoring_and_validation"],
    agent=scoring_validation_agent,
    context=[lead_data_task, cultural_fit_task],
    output_pydantic=LeadScoringResult,
)

# Creating Crew
lead_scoring_crew = Crew(
    agents=[lead_data_agent, cultural_fit_agent, scoring_validation_agent],
    tasks=[lead_data_task, cultural_fit_task, scoring_validation_task],
    verbose=True,
)



# Email Engagement Crew

In [34]:
# Creating Agents
email_content_specialist = Agent(config=email_agents_config["email_content_specialist"])

engagement_strategist = Agent(config=email_agents_config["engagement_strategist"])

# Creating Tasks
email_drafting = Task(
    config=email_tasks_config["email_drafting"], agent=email_content_specialist
)

engagement_optimization = Task(
    config=email_tasks_config["engagement_optimization"], agent=engagement_strategist
)

# Creating Crew
email_writing_crew = Crew(
    agents=[email_content_specialist, engagement_strategist],
    tasks=[email_drafting, engagement_optimization],
    verbose=True,
)



#  Creating Complete Sales Flow

In [35]:
class SalesPipeline(Flow):
    @start()
    def fetch_leads(self):
        # Pull our leads from the database
        leads = [
            {
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment.",
                },
            },
        ]
        return leads

    @listen(fetch_leads)
    def score_leads(self, leads):
        scores = lead_scoring_crew.kickoff_for_each(leads)
        self.state["score_crews_results"] = scores
        return scores

    @listen(score_leads)
    def store_leads_score(self, scores):
        # Here we would store the scores in the database
        return scores

    @listen(score_leads)
    def filter_leads(self, scores):
        return [score for score in scores if score["lead_score"].score > 70]

    @listen(filter_leads)
    def write_email(self, leads):
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    def send_email(self, emails):
        # Here we would send the emails to the leads
        return emails


flow = SalesPipeline()

## Visualize Flow

In [36]:
flow.plot()
IFrame(src="crewai_flow.html", width="100%", height=200)

Plot saved as crewai_flow.html


# Flow Kickoff

In [37]:
nest_asyncio.apply()
emails = flow.kickoff()



[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Task:[00m [92mCollect and analyze the following information about the lead:
- Personal Information:
  - Name: Obtain the full name of the lead.
  - Job Title: Determine the lead's current job title.
  - Role Relevance: Assess how relevant the lead's role is to the decision-making process on a scale from 0 to 10.
  - Professional Background: Optionally, gather a brief description of the lead's professional background.

- Company Information:
  - Company Name: Identify the name of the company the lead works for.
  - Industry: Determine the industry in which the company operates.
  - Company Size: Estimate the size of the company in terms of employee count.
  - Revenue: If available, collect information on the annual revenue of the company.
  - Market Presence: Evaluate the company's market presence on a scale from 0 to 10.

- Our Company and Product:
  - Company Name: CrewAI
  - Product: Multi-Agent Orchestration Platfor



[1m[95m# Agent:[00m [1m[92mEmail Content Writer[00m
[95m## Task:[00m [92mCraft a highly personalized email using the lead's name, job title, company information, and any relevant personal or company achievements. The email should speak directly to the lead's interests and the needs of their company. This is not as cold outreach as it is a follow up to a lead form, so keep it short and to the point. Don't use any salutations or closing remarks, nor too complex sentences.
Our Company and Product: - Company Name: CrewAI - Product: Multi-Agent Orchestration Platform - ICP: Enterprise companies looking into Agentic automation. - Pitch: We are a platform that allows you to orchestrate AI Agents for automations to any vertical.
Use the following information: Personal Info: {'name': 'João Moura', 'job_title': 'Director of Engineering', 'role_relevance': 9, 'professional_background': 'João Moura holds high relevance in decision-making within technology and product development, with exp

# Usage Metrics and Costs

[OpenAI API's pricing](https://openai.com/api/pricing/).

In [38]:
flow.state["score_crews_results"][0].token_usage.dict()

{'total_tokens': 57075,
 'prompt_tokens': 51825,
 'completion_tokens': 5250,
 'successful_requests': 21}

In [39]:
# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame(
    [flow.state["score_crews_results"][0].token_usage.dict()]
)

# Calculate total costs
costs = (2.50 * df_usage_metrics["prompt_tokens"] / 1_000_000) + (
    10 * df_usage_metrics["completion_tokens"] / 1_000_000
)

print(f"Total costs: ${costs.sum():.4f}")


# Display the DataFrame
df_usage_metrics

Total costs: $0.1821


Unnamed: 0,total_tokens,prompt_tokens,completion_tokens,successful_requests
0,57075,51825,5250,21


In [40]:
# Convert UsageMetrics instance to a DataFrame
email_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
email_costs = (2.50 * email_usage_metrics["prompt_tokens"] / 1_000_000) + (
    10 * email_usage_metrics["completion_tokens"] / 1_000_000
)

print(f"Total costs: ${email_costs.sum():.4f}")

# Display the DataFrame
email_usage_metrics

Total costs: $0.0073


Unnamed: 0,total_tokens,prompt_tokens,completion_tokens,successful_requests
0,1840,1478,362,3


# Inspecting Results

In [41]:
scores = flow.state["score_crews_results"]

In [42]:
lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    "Name": lead_scoring_result.personal_info.name,
    "Job Title": lead_scoring_result.personal_info.job_title,
    "Role Relevance": lead_scoring_result.personal_info.role_relevance,
    "Professional Background": lead_scoring_result.personal_info.professional_background,
    "Company Name": lead_scoring_result.company_info.company_name,
    "Industry": lead_scoring_result.company_info.industry,
    "Company Size": lead_scoring_result.company_info.company_size,
    "Revenue": lead_scoring_result.company_info.revenue,
    "Market Presence": lead_scoring_result.company_info.market_presence,
    "Lead Score": lead_scoring_result.lead_score.score,
    "Scoring Criteria": ", ".join(lead_scoring_result.lead_score.scoring_criteria),
    "Validation Notes": lead_scoring_result.lead_score.validation_notes,
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient="index", columns=["Value"])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={"index": "Attribute"})

# Create HTML table with bold attributes and left-aligned values
html_table = (
    df.style.set_properties(**{"text-align": "left"})
    .format({"Attribute": lambda x: f"<b>{x}</b>"})
    .hide(axis="index")
    .to_html()
)

# Display the styled HTML table
display(HTML(html_table))

Attribute,Value
Name,João Moura
Job Title,Director of Engineering
Role Relevance,9
Professional Background,"João Moura holds high relevance in decision-making within technology and product development, with experience as a former Director of Engineering at Clearbit and current involvement with CrewAI."
Company Name,Clearbit
Industry,B2B marketing technology
Company Size,150
Revenue,
Market Presence,8
Lead Score,92


In [43]:
result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

João, as Director of Engineering at Clearbit, your tech leadership is crucial
for driving innovation. CrewAI's Multi-Agent Orchestration Platform is designed
to seamlessly fit into Clearbit’s data-driven strategies, unlocking
transformative AI capabilities. Imagine the potential of enhanced operational
efficiency through adaptable AI-agent processes tailored to your needs. Ready to
explore this synergy with CrewAI? Schedule a meeting now to discuss how we can
elevate your B2B marketing technology. Let's set the benchmark for AI
orchestration together. Click here to book your personalized session today!
