In [1]:
from crewai import Crew, Task, Agent
from dotenv import load_dotenv
import os
import yaml

load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")
serper_api_key = os.getenv("SERPER_API_KEY")
os.environ["OPENAI_MODEL_NAME"] = 'gpt-4o-mini'

* 'fields' has been removed


In [2]:
files = {
    'lead_qualification_agents':'9_agentic_sale_pipeline/lead_qualification/agents.yaml',
    'lead_qualification_tasks':'9_agentic_sale_pipeline/lead_qualification/tasks.yaml',
    'email_engagement_agents':'9_agentic_sale_pipeline/email_engagement/agents.yaml',
    'email_engagement_tasks':'9_agentic_sale_pipeline/email_engagement/tasks.yaml'
}


with open(files['lead_qualification_agents'], 'r') as f:
    lead_qualification_agents = yaml.safe_load(f)

with open(files['lead_qualification_tasks'], 'r') as f:
    lead_qualification_tasks = yaml.safe_load(f)

with open(files['email_engagement_agents'], 'r') as f:
    email_engagement_agents = yaml.safe_load(f)

with open(files['email_engagement_tasks'], 'r') as f:
    email_engagement_tasks = yaml.safe_load(f)

In [11]:
from pprint import pprint
pprint(lead_qualification_tasks)

{'cultural_fit_analysis': {'description': 'Assess the cultural alignment '
                                          "between the lead's company and our "
                                          'organization by considering the '
                                          'following:\n'
                                          '  - Cultural Values: Analyze the '
                                          "company's publicly stated values "
                                          'and internal culture (e.g., '
                                          'innovation, sustainability, '
                                          'employee engagement).\n'
                                          '  - Strategic Alignment: Evaluate '
                                          "how well the company's goals and "
                                          'mission align with our '
                                          "organization's strategic "
                                          'obj

In [3]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The fullname of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(..., ge=0, le=10, description="A score representing how relevant is the lead's role to the decision making process (0-10).")
    professional_background: Optional[str] = Field(..., description="A brief description of the lead's professional background.")

class CompanyInfo(BaseModel):
    company_name: str = Field(..., description="The name of the lead's company.")
    industry: str = Field(..., description="The industry of the lead's company in which it operates.")
    company_size: int = Field(..., description="The size of the lead's company in terms of number of employees.")
    revenue: Optional[float] = Field(..., description="The revenue of the lead's company in USD if available.")
    market_presence: int = Field(..., ge=0, le=10, description="A score representing the lead's company's market presence (0-10).")

class LeadScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The score assigned to the lead(0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to assign the score to the lead.")
    validation_comments: Optional[str] = Field(None, description="Additional comments or observations that support the lead score.")

class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(..., description="Personal information about the lead.")
    company_info: CompanyInfo = Field(..., description="Company information about the lead's company.")
    lead_score: LeadScore = Field(..., description="The score assigned to the lead.")


In [4]:
from crewai_tools import SerperDevTool, ScrapeWebsiteTool

serper_tool = SerperDevTool()
scrape_tool = ScrapeWebsiteTool()

## Lead Qualification Crew

In [12]:
#Creating Agents
lead_data_agent = Agent(
    config=lead_qualification_agents['lead_data_agent'],
    tools=[serper_tool, scrape_tool]
)

cultural_fit_agent = Agent(
    config=lead_qualification_agents['cultural_fit_agent'],
    tools=[serper_tool, scrape_tool]
)

scoring_validation_agent = Agent(
    config=lead_qualification_agents['scoring_validation_agent'],
    tools=[serper_tool, scrape_tool]
)

#Creating Tasks
lead_data_task = Task(
    config=lead_qualification_tasks['lead_data_collection'],
    agent=lead_data_agent
)

cultural_fit_task = Task(
    config=lead_qualification_tasks['cultural_fit_analysis'],
    agent=cultural_fit_agent
)

scoring_validation_task = Task(
    config=lead_qualification_tasks['lead_scoring_and_validation'],
    agent=scoring_validation_agent,
    context=[lead_data_task, cultural_fit_task],
    output_pydantic=LeadScoringResult
)

#Creating Crew
lead_qualification_crew = Crew(
    agents=[lead_data_agent, cultural_fit_agent, scoring_validation_agent],
    tasks=[lead_data_task, cultural_fit_task, scoring_validation_task],
    verbose=True
)




## Email Engagement Crew

In [13]:
#Creating Agents
email_content_specialist = Agent(
    config=email_engagement_agents['email_content_sepcialist'],
 
)

engagement_strategist = Agent(
    config=email_engagement_agents['engagement_strategist'],

)


#Creating Tasks
email_drafting_task = Task(
    config=email_engagement_tasks['email_drafting'],
    agent=email_content_specialist
)

engagement_optimization_task = Task(
    config=email_engagement_tasks['engagement_optimization'],
    agent=engagement_strategist
)

#Creating Crew
email_engagement_crew = Crew(
    agents=[email_content_specialist, engagement_strategist],
    tasks=[email_drafting_task, engagement_optimization_task],
    verbose=True
)





## Flow

In [34]:
from crewai import Flow
from crewai.flow.flow import listen, start

class SalesPipelineFlow(Flow):
    @start()
    def fetch_leads(self):
        #Pull our leads from the database, here we'll use a mock data
        leads = [
            {
                "lead_data":{
                    "name" : "Ryan Merritt",
                    "job title" : "CPO",
                    "company" : "Deep Sync",
                    "email" : "ryan@deepsync.com",
                    "use_case" : "AI-powered agentic automation enablement"
                }
            }
        ]
        return leads
    
    @listen(fetch_leads)
    def score_leads(self, leads):
        scores = lead_qualification_crew.kickoff_for_each(leads)
        self.state["score_crew_results"] = scores
        return scores
    
    @listen(score_leads)
    def store_leads_score(self, scores):
        #Code to store the scroes in the database
        return scores
    
    @listen(score_leads)
    def filter_leads(self, scores):
        return [score for score in scores if score['lead_score'].score > 70]

    @listen(filter_leads)
    def write_email(self, filtered_leads):
        leads = [filtered_lead.to_dict() for filtered_lead in filtered_leads]
        emails = email_engagement_crew.kickoff_for_each(leads)
        return emails

    @listen(write_email)
    def send_emails(self, emails):
        #Code to send emails to the leads
        return emails
    
flow = SalesPipelineFlow()



In [28]:
flow.plot()

Plot saved as crewai_flow.html


In [40]:
emails =  flow.kickoff()



[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Task:[00m [92mCollect and analyze the following information about the lead:
- Personal Information:
  - Name: Obtain the full name of the lead.
  - Job Title: Determine the lead's current job title.
  - Role Relevance: Assess how relevant the lead's role is to the decision-making process on a scale from 0 to 10.
  - Professional Background: Optionally, gather a brief description of the lead's professional background.

- Company Information:
  - Company Name: Identify the name of the company the lead works for.
  - Industry: Determine the industry in which the company operates.
  - Company Size: Estimate the size of the company in terms of employee count.
  - Revenue: If available, collect information on the annual revenue of the company.
  - Market Presence: Evaluate the company's market presence on a scale from 0 to 10.

- Our Company and Product:
  - Company Name: CrewAI
  - Product: Multi-Agent Orchestration Platfor

  u.setup()




[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Thought:[00m [92mThought: I have found several relevant links, particularly to LinkedIn and the company website. Now, I will read the website content to gather detailed information about Ryan Merritt's background and role at Deep Sync.[00m
[95m## Using tool:[00m [92mRead website content[00m
[95m## Tool Input:[00m [92m
"{\"website_url\": \"https://www.linkedin.com/in/ryan-merritt-2522baab\"}"[00m
[95m## Tool Output:[00m [92m

Ryan Merritt - Deep Sync | LinkedIn
Skip to main content
LinkedIn
Articles
People
Learning
Jobs
Games
Get the app
Join now
Sign in
Sign in to view Ryan’s full profile
Sign in
Welcome back
Email or phone
Password
Show
Forgot password?
Sign in
or
By clicking Continue to join or sign in, you agree to LinkedIn’s User Agreement , Privacy Policy , and Cookie Policy .
New to LinkedIn? Join now
or
New to LinkedIn? Join now
By clicking Continue to join or sign in, you agree to LinkedIn’s User 



[1m[95m# Agent:[00m [1m[92mEmail Content Writer[00m
[95m## Task:[00m [92mCraft a highly personalized email using the lead's name, job title, company information, and any relevant personal or company achievements. The email should speak directly to the lead's interests and the needs of their company. This is not as cold outreach as it is a follow up to a lead form, so keep it short and to the point. Don't use any salutations or closing remarks, nor too complex sentences.
Our Company and Product: - Company Name: CrewAI - Product: Multi-Agent Orchestration Platform - ICP: Enterprise companies looking into Agentic automation. - Pitch: We are a platform that allows you to orchestrate AI Agents for automations to any vertical.
Use the following information: Personal Info: {'name': 'Ryan Merritt', 'job_title': 'CPO', 'role_relevance': 9, 'professional_background': 'Ryan has extensive experience in product management, having led multi-million dollar products and teams. He has been com

In [52]:
import pandas as pd
from IPython.display import display, HTML

lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    'Name': lead_scoring_result.personal_info.name,
    'Job Title': lead_scoring_result.personal_info.job_title,
    'Role Relevance': lead_scoring_result.personal_info.role_relevance,
    'Professional Background': lead_scoring_result.personal_info.professional_background,
    'Company Name': lead_scoring_result.company_info.company_name,
    'Industry': lead_scoring_result.company_info.industry,
    'Company Size': lead_scoring_result.company_info.company_size,
    'Revenue': lead_scoring_result.company_info.revenue,
    'Market Presence': lead_scoring_result.company_info.market_presence,
    'Lead Score': lead_scoring_result.lead_score.score,
    'Scoring Criteria': ', '.join(lead_scoring_result.lead_score.scoring_criteria),
    'Validation Comments': lead_scoring_result.lead_score.validation_comments
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient='index', columns=['Value'])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={'index': 'Attribute'})

# Create HTML table with bold attributes and left-aligned values
html_table = df.style.set_properties(**{'text-align': 'left'}) \
                     .format({'Attribute': lambda x: f'<b>{x}</b>'}) \
                     .hide(axis='index') \
                     .to_html()

# Display the styled HTML table
display(HTML(html_table))

Attribute,Value
Name,Ryan Merritt
Job Title,CPO
Role Relevance,9
Professional Background,"Ryan Merritt has over 5 years of experience in product management, previously working at companies such as Redfin and LiveRamp. He has a strong technical background with experience in business applications and engineering gained from early roles at Tesla Motors and FiveStars. He is known for his empathetic approach toward stakeholders and was pivotal in leading multi-million dollar products."
Company Name,Deep Sync
Industry,Marketing Services
Company Size,150
Revenue,
Market Presence,7
Lead Score,85


In [50]:
import textwrap

result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

Ryan, unlock the full potential of your product management expertise with
CrewAI’s Multi-Agent Orchestration Platform. Imagine automating your business
intelligence processes seamlessly, tailored to your unique workflows. Maximize
your team’s efficiency and drive deeper impact in the BI sector.   Ready to
discuss how we can enhance your offerings? Schedule a quick 15-minute meeting
[here](#) to explore the possibilities. Don't miss out on this opportunity to
elevate your capabilities. Alternatively, reply to this email with your
availability, and let’s connect!   Take action now and transform Deep Sync's
automation strategy today!


## Usage

In [56]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([flow.state["score_crew_results"][0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0086


/var/folders/3g/r3ht76p907vdqxh325xx85tm0000ks/T/ipykernel_80003/2196624912.py:4: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  df_usage_metrics = pd.DataFrame([flow.state["score_crew_results"][0].token_usage.dict()])


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,57497,54636,34048,2861,17


In [57]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0002


/var/folders/3g/r3ht76p907vdqxh325xx85tm0000ks/T/ipykernel_80003/99626748.py:4: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,1347,1078,0,269,2


## More Complex Flow

In [59]:
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

class SalesPipelineFlow(Flow):
    @start()
    def fetch_leads(self):
        #Pull our leads from the database, here we'll use a mock data
        leads = [
            {
                "lead_data":{
                    "name" : "Ryan Merritt",
                    "job_title" : "CPO",
                    "company" : "Deep Sync",
                    "email" : "ryan@deepsync.com",
                    "use_case" : "AI-powered agentic automation enablement"
                }
            }
        ]
        return leads
    
    @listen(fetch_leads)
    def score_leads(self, leads):
        scores = lead_qualification_crew.kickoff_for_each(leads)
        self.state["score_crew_results"] = scores
        return scores
    
    @listen(score_leads)
    def store_leads_score(self, scores):
        #Code to store the scroes in the database
        return scores
    
    @listen(score_leads)
    def filter_leads(self, scores):
        return [score for score in scores if score['lead_score'].score > 70]

    @listen(and_(filter_leads, store_leads_score))
    def log_leads(self, filtered_leads):
        print(f"Filtered leads: {filtered_leads}")
        
    @router(filter_leads)
    def count_leads(self, scores):
        if len(scores) > 10:
            return "high"
        elif len(scores) > 5:
            return "medium"
        else:
            return "low"
        
    @listen('high')
    def store_in_salesforce(self, leads):
        return leads
    
    @listen('medium')
    def send_to_sales_rep(self, leads):
        return leads
    
    @listen('low')
    def write_email(self, leads):
        leads = [lead.to_dict() for lead in leads]
        emails=email_engagement_crew.kickoff_for_each(leads)
        return emails
    
    def send_email(self, emails):
        return emails
    
flow = SalesPipelineFlow()


In [60]:
flow.plot()


Plot saved as crewai_flow.html
