# Agentic Sales Pipeline

Description: Automating a Sales Pipeline Using Agentic "Flows"
This project automates a sales pipeline using a modular and agentic system called "flows". A flow represents a sequence of tasks—such as loading, scoring, filtering, and emailing leads—each executed by a specialized crew (agent).

High-Level Use Case:
We aim to automate the process of:

Loading lead information

Researching and scoring the lead

Filtering out low-potential leads

Writing and optimizing personalized emails for qualified leads

Each stage is handled by a dedicated crew, and the process is managed through the flow framework which maintains the state before and after each execution.


Flow Breakdown:
Loading Leads:
This is the first step in the flow. It does not require any agentic logic. Simple Python code can be used to load and tabulate the lead data before passing it to the next crew.

Lead Scoring Crew:
This crew is responsible for analyzing the lead's data, conducting external research on the company (e.g., culture, size, industry fit), and assigning a lead score based on how well it aligns with the product and target market.

Filtering:
Leads with scores below a defined threshold are filtered out. Only high-scoring leads are forwarded to the next stage.

Email Writing Crew:
This crew drafts and optimizes an email tailored to the lead, with the goal of maximizing engagement and conversion.

## Initial Imports

In [4]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

# Load environment variables
from helper import load_env
load_env()

import os
import yaml
from crewai import Agent, Task, Crew

## Load API tokens for our 3rd party APIs

In [5]:
os.environ['OPENAI_MODEL_NAME'] = 'gpt-4o'

## Loading Tasks and Agents YAML files

In [6]:
# Define file paths for YAML configurations
files = {
    'lead_agents': 'config/lead_qualification_agents.yaml',
    'lead_tasks': 'config/lead_qualification_tasks.yaml',
    'email_agents': 'config/email_engagement_agents.yaml',
    'email_tasks': 'config/email_engagement_tasks.yaml'
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, 'r') as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
lead_agents_config = configs['lead_agents']
lead_tasks_config = configs['lead_tasks']
email_agents_config = configs['email_agents']
email_tasks_config = configs['email_tasks']

## Create Pydantic Models for Structured Output

In [7]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

# informaiton for the lead
class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(..., ge=0, le=10, description="A score representing how relevant the lead's role is to the decision-making process (0-10).")
    professional_background: Optional[str] = Field(..., description="A brief description of the lead's professional background.")

# the lead to hold the information about the company
class CompanyInfo(BaseModel):
    company_name: str = Field(..., description="The name of the company the lead works for.")
    industry: str = Field(..., description="The industry in which the company operates.")
    company_size: int = Field(..., description="The size of the company in terms of employee count.")
    revenue: Optional[float] = Field(None, description="The annual revenue of the company, if available.")
    market_presence: int = Field(..., ge=0, le=10, description="A score representing the company's market presence (0-10).")

# information about the lead scoring        
class LeadScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The final score assigned to the lead (0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to determine the lead's score.")
    validation_notes: Optional[str] = Field(None, description="Any notes regarding the validation of the lead score.")

        
class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(..., description="Personal information about the lead.")
    company_info: CompanyInfo = Field(..., description="Information about the lead's company.")
    lead_score: LeadScore = Field(..., description="The calculated score and related information for the lead.")

## Importing Tools

In [8]:
from crewai_tools import SerperDevTool, ScrapeWebsiteTool

## Lead Qualification Crew, Agents and Tasks

In [9]:
# Creating Agents to find the specific information about our lead 
# informaiotn like data, culture and score validation.

lead_data_agent = Agent(
    # the role of the company and if this is going to be a good buyer
  config=lead_agents_config['lead_data_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

cultural_fit_agent = Agent(
    # make sure that this company is feel like a ideal customer giving what we are going to sell them  
  config=lead_agents_config['cultural_fit_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

scoring_validation_agent = Agent(
    # final score for the lead by looking at the all information.
  config=lead_agents_config['scoring_validation_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

# Creating Tasks
lead_data_task = Task(
  config=lead_tasks_config['lead_data_collection'],
  agent=lead_data_agent
)

cultural_fit_task = Task(
  config=lead_tasks_config['cultural_fit_analysis'],
  agent=cultural_fit_agent
)

scoring_validation_task = Task(
  config=lead_tasks_config['lead_scoring_and_validation'],
  agent=scoring_validation_agent,
  context=[lead_data_task, cultural_fit_task],
  output_pydantic=LeadScoringResult
)



# 1-Creating Crew
lead_scoring_crew = Crew(
  agents=[
    lead_data_agent,
    cultural_fit_agent,
    scoring_validation_agent
  ],
  tasks=[
    lead_data_task,
    cultural_fit_task,
    scoring_validation_task
  ],
  verbose=True
)

## Email Engagement Crew

In [10]:
# Creating Agents
# two agents for this crew and two tasks 
email_content_specialist = Agent(
  config=email_agents_config['email_content_specialist']
)

engagement_strategist = Agent(
  config=email_agents_config['engagement_strategist']
)

# Creating Tasks
#draftign the email
email_drafting = Task(
  config=email_tasks_config['email_drafting'],
  agent=email_content_specialist
)

# if it is engaging with the high score 
engagement_optimization = Task(
  config=email_tasks_config['engagement_optimization'],
  agent=engagement_strategist
)

# 2- Creating Crew
email_writing_crew = Crew(
  agents=[
    email_content_specialist,
    engagement_strategist
  ],
  tasks=[
    email_drafting,
    engagement_optimization
  ],
  verbose=True
)



## Creating Complete Sales Flow

In [None]:
from crewai import Flow
from crewai.flow.flow import listen, start

# we can excecute the python code before and after the crew excecution 
# complete the automation and see all the agents bringing to the table 
# if we really need to we can take actions.

# sales calss inherits from the flow
class SalesPipeline(Flow):
    # initial funciton that is going to be executed
    @start()
    def fetch_leads(self):
        # Pull our leads from the database
        leads = [
            {
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment."
                },
            },
        ]
        return leads

    # to score the leads that is started
    # score leads is going to be executed after the fetch leads executed
    # exceute the lead scoring crew after fetching the leads
    @listen(fetch_leads)
    # The lead as a list from the fetch_leads is going to be passed to the lead scoring crew
    def score_leads(self, leads):
        # lead is going to be used to kick off the lead scoring crew
        scores = lead_scoring_crew.kickoff_for_each(leads)
        # every flow has states and we can store the scores in the states to use them later
        self.state["score_crews_results"] = scores
        return scores
 
 
    # both the filter_leads and write_email is going to liten to score_leads and they can execute in parallel
    @listen(score_leads)
    def store_leads_score(self, scores):
        # Here we would store the scores in the database
        return scores
   
    @listen(score_leads)
    # filters the list to remove anythin that has a score less than 70
    def filter_leads(self, scores):
        return [score for score in scores if score['lead_score'].score > 70]

    @listen(filter_leads)
    # listening to the filter_leads and then the leads that have a score greater than 70 will be passed to the email writing crew
    def write_email(self, leads):
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    # sending the emails 
    def send_email(self, emails):
        # Here we would send the emails to the leads
        return emails

flow = SalesPipeline()

## Plotting the Flow

In [None]:
# ploting the flow to see the flow of the agents and tasks
flow.plot()

Graph saved as crewai_flow_graph.html


In [13]:
from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

## Flow Kickoff

In [None]:
# Execute the flow asynchronously, which will run all the tasks and agents in the defined order
emails = await flow.kickoff()



[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Task:[00m [92mCollect and analyze the following information about the lead:
- Personal Information:
  - Name: Obtain the full name of the lead.
  - Job Title: Determine the lead's current job title.
  - Role Relevance: Assess how relevant the lead's role is to the decision-making process on a scale from 0 to 10.
  - Professional Background: Optionally, gather a brief description of the lead's professional background.

- Company Information:
  - Company Name: Identify the name of the company the lead works for.
  - Industry: Determine the industry in which the company operates.
  - Company Size: Estimate the size of the company in terms of employee count.
  - Revenue: If available, collect information on the annual revenue of the company.
  - Market Presence: Evaluate the company's market presence on a scale from 0 to 10.

- Our Company and Product:
  - Company Name: CrewAI
  - Product: Multi-Agent Orchestration Platfor



[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Thought:[00m [92mThought: I have gathered detailed personal and professional information about João Moura. Next, I need to collect and analyze company-level data for Clearbit, including information about the industry, company size, revenue, and market presence.[00m
[95m## Using tool:[00m [92mSearch the internet[00m
[95m## Tool Input:[00m [92m
"{\"search_query\": \"Clearbit company profile\"}"[00m
[95m## Tool Output:[00m [92m

Search results: Title: Leverage 100+ business data attributes - Clearbit
Link: https://clearbit.com/attributes
Snippet: Enrich your CRM and database, manage leads efficiently, and enable marketing personalization with over 100 business data points.
---
Title: Clearbit 2025 Company Profile: Valuation, Investors, Acquisition
Link: https://pitchbook.com/profiles/company/101066-86
Snippet: Information on acquisition, funding, cap tables, investors, and executives for Clearbit. Use the Pi



[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Final Answer:[00m [92m
**Personal Information:**
- **Name:** João Moura
- **Job Title:** Senior Engineering Manager at Clearbit
- **Role Relevance:** 9/10 (As a Senior Engineering Manager, João is highly relevant in decision-making processes, particularly in engineering and technical projects.)
- **Professional Background:** João Moura has a strong background in technology, starting programming at 13 and managing multiple engineering teams. He has experience in international markets and has led engineering efforts at Clearbit, focusing on AI and data platforms. Previously, João was CTO at Palpiteros and has participated in global speaking engagements.

**Company Information:**
- **Company Name:** Clearbit
- **Industry:** Software, Technology, Information and Internet, Custom Software & Technical Consulting, Business Intelligence, Data and Analytics, Developer APIs, etc.
- **Company Size:** Approximately 31 employees 



[1m[95m# Agent:[00m [1m[92mCultural Fit Analyst[00m
[95m## Final Answer:[00m [92m
**Cultural Alignment Assessment Report: CrewAI and Clearbit**

**Cultural Fit Score: 8/10**

**Supporting Analysis:**

1. **Cultural Values Alignment:**
   - **Clearbit Values:**
     - Care (Empathy for customers)
     - Craft (Learning and Excellence)
     - Team (Collaboration)
     - Truth (Honest Communication)
     - Initiative (Resourcefulness)
     - Fun (Enjoyment in work)

   Clearbit’s values reflect a strong emphasis on innovation, continuous improvement, teamwork, honest communication, and customer-centric approaches. They stress the importance of understanding customer needs and maintaining high standards both in craft and conduct.

   - **CrewAI Values:**
     - Innovation through AI orchestration
     - Strategic partnerships with enterprises
     - Efficiency in process automation

   CrewAI prioritizes innovation and efficiency, aligning with Clearbit’s values of continuous im



[1m[95m# Agent:[00m [1m[92mLead Scorer and Validator[00m
[95m## Final Answer:[00m [92m
The completed lead score report is as above, detailing the scoring process, criteria used, and validation notes confirming accuracy.[00m






[1m[95m# Agent:[00m [1m[92mEmail Content Writer[00m
[95m## Task:[00m [92mCraft a highly personalized email using the lead's name, job title, company information, and any relevant personal or company achievements. The email should speak directly to the lead's interests and the needs of their company. This is not as cold outreach as it is a follow up to a lead form, so keep it short and to the point. Don't use any salutations or closing remarks, nor too complex sentences.
Our Company and Product: - Company Name: CrewAI - Product: Multi-Agent Orchestration Platform - ICP: Enterprise companies looking into Agentic automation. - Pitch: We are a platform that allows you to orchestrate AI Agents for automations to any vertical.
Use the following information: Personal Info: {'name': 'Jane Doe', 'job_title': 'Marketing Director', 'role_relevance': 8, 'professional_background': 'Jane has over 15 years of experience in the marketing industry, working with various Fortune 500 companies.'}

## Usage Metrics and Costs

Let’s see how much it would cost each time if this crew runs at scale.

In [15]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0083


Unnamed: 0,total_tokens,prompt_tokens,completion_tokens,successful_requests
0,55553,51472,4081,24


In [None]:

# how much it costed to run the email writing crew


# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0003


Unnamed: 0,total_tokens,prompt_tokens,completion_tokens,successful_requests
0,1869,1435,434,3


## Inspecting Results

In [None]:

# access and plot the scores
scores = flow.state["score_crews_results"]

In [None]:
# conver the pydantic object to a pandas dataframe

from IPython.display import display, HTML
lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    'Name': lead_scoring_result.personal_info.name,
    'Job Title': lead_scoring_result.personal_info.job_title,
    'Role Relevance': lead_scoring_result.personal_info.role_relevance,
    'Professional Background': lead_scoring_result.personal_info.professional_background,
    'Company Name': lead_scoring_result.company_info.company_name,
    'Industry': lead_scoring_result.company_info.industry,
    'Company Size': lead_scoring_result.company_info.company_size,
    'Revenue': lead_scoring_result.company_info.revenue,
    'Market Presence': lead_scoring_result.company_info.market_presence,
    'Lead Score': lead_scoring_result.lead_score.score,
    'Scoring Criteria': ', '.join(lead_scoring_result.lead_score.scoring_criteria),
    'Validation Notes': lead_scoring_result.lead_score.validation_notes
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient='index', columns=['Value'])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={'index': 'Attribute'})

# Create HTML table with bold attributes and left-aligned values
html_table = df.style.set_properties(**{'text-align': 'left'}) \
                     .format({'Attribute': lambda x: f'<b>{x}</b>'}) \
                     .hide(axis='index') \
                     .to_html()

# Display the styled HTML table
display(HTML(html_table))

Attribute,Value
Name,Jane Doe
Job Title,Marketing Director
Role Relevance,8
Professional Background,"Jane has over 15 years of experience in the marketing industry, working with various Fortune 500 companies."
Company Name,Tech Giants Inc.
Industry,Technology
Company Size,5000
Revenue,
Market Presence,9
Lead Score,85


## Results

In [None]:
import textwrap
# only the final resutls (email) of the email writing crew
result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

# the output has some brakets anor links that we need to add, remove or correct

Jane,    With your extensive background leading marketing initiatives at Tech
Giants Inc., you're no stranger to shaping the future of marketing in the tech
industry. CrewAI’s Multi-Agent Orchestration Platform empowers your team to
automate complex tasks across various verticals, enhancing strategic decision-
making and efficiency.    Imagine setting a new benchmark for operational
excellence by leveraging AI for automation. Are you ready to explore how our
solution can align with your strategic objectives?    Book a demo today to see
the innovative power of CrewAI in action and discover the potential to
revolutionize your operations. [Insert Link to Book a Demo]    Let's set the
pace for the future together.


## How Complex Can it Get?

In [None]:
from crewai import Flow
# and_, or_ and router are used to create complex conditions for the flow
from crewai.flow.flow import listen, start, and_, or_, router


class SalesPipeline(Flow):
    
  @start()
  def fetch_leads(self):
    # Pull our leads from the database
    # This is a mock, in a real-world scenario, this is where you would
    # fetch leads from a database
    leads = [
      {
        "lead_data": {
          "name": "João Moura",
          "job_title": "Director of Engineering",
          "company": "Clearbit",
          "email": "joao@clearbit.com",
          "use_case": "Using AI Agent to do better data enrichment."
        },
      },
    ]
    return leads

  @listen(fetch_leads)
  def score_leads(self, leads):
    scores = lead_scoring_crew.kickoff_for_each(leads)
    self.state["score_crews_results"] = scores
    return scores

  @listen(score_leads)
  def store_leads_score(self, scores):
    # Here we would store the scores in the database
    return scores

  @listen(score_leads)
  def filter_leads(self, scores):
    return [score for score in scores if score['lead_score'].score > 70]

  # using and_ function allocatoin to wait for two fucntions before executing 
  @listen(and_(filter_leads, store_leads_score))
  def log_leads(self, leads):
    print(f"Leads: {leads}")

  # router function alows us to have multiple paths and execute the function after filter_leads
  #  based on the path (high, medium, low)

  @router(filter_leads, paths=["high", "medium", "low"])
  # how many leads we have with the high score
  def count_leads(self, scores):
    if len(scores) > 10:
      return 'high'
    elif len(scores) > 5:
      return 'medium'
    else:
      return 'low'

# if count_leads returs high then we are going to store the leads in salesforce
  @listen('high')
  def store_in_salesforce(self, leads):
    return leads
# we are sending them for the direct sales team
  @listen('medium')
  def send_to_sales_team(self, leads):
    return leads

# we are going to write the email using to crew the same way we did before
  @listen('low')
  def write_email(self, leads):
    # Here we would write the email  agian to the leads and send the emails out to score it again =
    scored_leads = [lead.to_dict() for lead in leads]
    emails = email_writing_crew.kickoff_for_each(scored_leads)
    return emails

  @listen(write_email)
  def send_email(self, emails):
    # Here we would send the emails to the leads
    return emails

## Plotting the Flow

In [21]:
flow = SalesPipeline()
flow.plot()

Graph saved as crewai_flow_graph.html


In [22]:
from IPython.display import IFrame

IFrame(src='./crewai_flow_complex.html', width='150%', height=600)