# L3: Agentic Sales Pipeline

<p style="background-color:#fff6e4; padding:15px; border-width:3px; border-color:#f5ecda; border-style:solid; border-radius:6px"> ⏳ <b>Note <code>(Kernel Starting)</code>:</b> This notebook takes about 30 seconds to be ready to use. You may start and watch the video while you wait.</p>

## Initial Imports

In [1]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

# Load environment variables
from dotenv import load_dotenv
load_dotenv("/home/jovyan/Albert/.env")

import os
import yaml
from crewai import Agent, Task, Crew

In [2]:
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

In [3]:
from crewai import Flow
from crewai.flow.flow import listen, start

<p style="background-color:#fff6ff; padding:15px; border-width:3px; border-color:#efe6ef; border-style:solid; border-radius:6px"> 💻 &nbsp; <b>Access <code>requirements.txt</code> and <code>helper.py</code> files:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Open"</em>. For more help, please see the <em>"Appendix - Tips and Help"</em> Lesson.</p>

## Load API tokens for our 3rd party APIs

In [4]:
os.environ['OPENAI_MODEL_NAME'] = 'gpt-4o-mini'
#os.environ['OPENAI_MODEL_NAME'] = 'gpt-4o-0806'

In [5]:
os.

SyntaxError: invalid syntax (1690411367.py, line 1)

In [None]:
 llm = ChatOpenAI(model="gpt-4o-0806", temperature=0)

In [None]:
OPENAI_API_KEY

## Loading Tasks and Agents YAML files

In [6]:
# Define file paths for YAML configurations
files = {
    'lead_agents': '/home/jovyan/Dongping/crewai/L3/config/lead_qualification_agents.yaml',
    'lead_tasks': '/home/jovyan/Dongping/crewai/L3/config/lead_qualification_tasks.yaml',
    'email_agents': '/home/jovyan/Dongping/crewai/L3/config/email_engagement_agents.yaml',
    'email_tasks': '/home/jovyan/Dongping/crewai/L3/config/email_engagement_tasks.yaml'
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, 'r') as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
lead_agents_config = configs['lead_agents']
lead_tasks_config = configs['lead_tasks']
email_agents_config = configs['email_agents']
email_tasks_config = configs['email_tasks']

In [7]:
configs

{'lead_agents': {'lead_data_agent': {'role': 'Lead Data Specialist\n',
   'goal': 'Collect and analyze personal and company-level data for each lead.\n',
   'backstory': 'You are a data specialist with a keen eye for detail, responsible for gathering and analyzing both personal and company data to determine the potential of each lead.\n',
   'verbose': True,
   'allow_delegation': False},
  'cultural_fit_agent': {'role': 'Cultural Fit Analyst\n',
   'goal': "Assess the cultural alignment between the lead's company and our organization, and evaluate the overall fit.\n",
   'backstory': 'You specialize in understanding company cultures and values, ensuring that partnerships are not only strategic but also culturally aligned for long-term success.\n',
   'verbose': True,
   'allow_delegation': False},
  'scoring_validation_agent': {'role': 'Lead Scorer and Validator\n',
   'goal': 'Aggregate data, assign a lead score based on predefined criteria, and validate the scoring accuracy.\n',
   

## Create Pydantic Models for Structured Output

In [8]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(..., ge=0, le=10, description="A score representing how relevant the lead's role is to the decision-making process (0-10).")
    professional_background: Optional[str] = Field(..., description="A brief description of the lead's professional background.")

class CompanyInfo(BaseModel):
    company_name: str = Field(..., description="The name of the company the lead works for.")
    industry: str = Field(..., description="The industry in which the company operates.")
    company_size: int = Field(..., description="The size of the company in terms of employee count.")
    revenue: Optional[float] = Field(None, description="The annual revenue of the company, if available.")
    market_presence: int = Field(..., ge=0, le=10, description="A score representing the company's market presence (0-10).")

class LeadScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The final score assigned to the lead (0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to determine the lead's score.")
    validation_notes: Optional[str] = Field(None, description="Any notes regarding the validation of the lead score.")

class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(..., description="Personal information about the lead.")
    company_info: CompanyInfo = Field(..., description="Information about the lead's company.")
    lead_score: LeadScore = Field(..., description="The calculated score and related information for the lead.")

## Importing Tools

## Lead Qualification Crew, Agents and Tasks

In [9]:
# Creating Agents
lead_data_agent = Agent(
  config=lead_agents_config['lead_data_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

cultural_fit_agent = Agent(
  config=lead_agents_config['cultural_fit_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

scoring_validation_agent = Agent(
  config=lead_agents_config['scoring_validation_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

# Creating Tasks
lead_data_task = Task(
  config=lead_tasks_config['lead_data_collection'],
  agent=lead_data_agent
)

cultural_fit_task = Task(
  config=lead_tasks_config['cultural_fit_analysis'],
  agent=cultural_fit_agent
)

scoring_validation_task = Task(
  config=lead_tasks_config['lead_scoring_and_validation'],
  agent=scoring_validation_agent,
  context=[lead_data_task, cultural_fit_task],
  output_pydantic=LeadScoringResult
)

# Creating Crew
lead_scoring_crew = Crew(
  agents=[
    lead_data_agent,
    cultural_fit_agent,
    scoring_validation_agent
  ],
  tasks=[
    lead_data_task,
    cultural_fit_task,
    scoring_validation_task
  ],
  verbose=True
)

In [10]:
lead_scoring_crew, lead_scoring_crew.agents, lead_scoring_crew.tasks

(Crew(id=9a9f3ecc-c144-4b03-94fb-cb875c0b3290, process=Process.sequential, number_of_agents=3, number_of_tasks=3),
 [Agent(role=Lead Data Specialist
  , goal=Collect and analyze personal and company-level data for each lead.
  , backstory=You are a data specialist with a keen eye for detail, responsible for gathering and analyzing both personal and company data to determine the potential of each lead.
  ),
  Agent(role=Cultural Fit Analyst
  , goal=Assess the cultural alignment between the lead's company and our organization, and evaluate the overall fit.
  , backstory=You specialize in understanding company cultures and values, ensuring that partnerships are not only strategic but also culturally aligned for long-term success.
  ),
  Agent(role=Lead Scorer and Validator
  , goal=Aggregate data, assign a lead score based on predefined criteria, and validate the scoring accuracy.
  , backstory=You are an expert in evaluating lead quality through data-driven analysis and ensuring the sco

## Email Engagement Crew

In [11]:
# Creating Agents
email_content_specialist = Agent(
  config=email_agents_config['email_content_specialist']
)

engagement_strategist = Agent(
  config=email_agents_config['engagement_strategist']
)

# Creating Tasks
email_drafting = Task(
  config=email_tasks_config['email_drafting'],
  agent=email_content_specialist
)

engagement_optimization = Task(
  config=email_tasks_config['engagement_optimization'],
  agent=engagement_strategist
)

# Creating Crew
email_writing_crew = Crew(
  agents=[
    email_content_specialist,
    engagement_strategist
  ],
  tasks=[
    email_drafting,
    engagement_optimization
  ],
  verbose=True
)



## Creating Complete Sales Flow

In [12]:
from crewai import Flow
from crewai.flow.flow import listen, start

class SalesPipeline(Flow):
    @start()
    def fetch_leads(self):
        # Pull our leads from the database
        leads = [
            {
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment."
                },
            },
        ]
        return leads

    @listen(fetch_leads)
    def score_leads(self, leads):
        scores = lead_scoring_crew.kickoff_for_each(leads)
        self.state["score_crews_results"] = scores
        return scores

    @listen(score_leads)
    def store_leads_score(self, scores):
        # Here we would store the scores in the database
        return scores

    @listen(score_leads)
    def filter_leads(self, scores):
        return [score for score in scores if score['lead_score'].score > 70]

    @listen(filter_leads)
    def write_email(self, leads):
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    def send_email(self, emails):
        # Here we would send the emails to the leads
        return emails

flow = SalesPipeline()

## Plotting the Flow

In [None]:
flow.plot()

In [None]:
from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

## Flow Kickoff

In [14]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
#emails = await flow.kickoff()

In [15]:
emails = flow.kickoff()



[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Task:[00m [92mCollect and analyze the following information about the lead:
- Personal Information:
  - Name: Obtain the full name of the lead.
  - Job Title: Determine the lead's current job title.
  - Role Relevance: Assess how relevant the lead's role is to the decision-making process on a scale from 0 to 10.
  - Professional Background: Optionally, gather a brief description of the lead's professional background.

- Company Information:
  - Company Name: Identify the name of the company the lead works for.
  - Industry: Determine the industry in which the company operates.
  - Company Size: Estimate the size of the company in terms of employee count.
  - Revenue: If available, collect information on the annual revenue of the company.
  - Market Presence: Evaluate the company's market presence on a scale from 0 to 10.

- Our Company and Product:
  - Company Name: CrewAI
  - Product: Multi-Agent Orchestration Platfor

2024-11-26 23:14:18,144 - 139977248014400 - llm.py-llm:170 - ERROR: LiteLLM call failed: litellm.APIError: APIError: OpenAIException - Invalid response object Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/litellm_core_utils/llm_response_utils/convert_dict_to_response.py", line 380, in convert_to_model_response_object
    assert response_object["choices"] is not None and isinstance(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError


received_args={'response_object': {'id': None, 'choices': None, 'created': None, 'model': None, 'object': None, 'service_tier': None, 'system_fingerprint': None, 'usage': None, 'llm-gateway-requestid': '496bc8ce8a94419fa91f8e6311b3e891', 'responseCode': 501, 'responseDesc': 'LLM Request Failed', 'Error': "Error code: 404 - {'error': {'code': 'DeploymentNotFound', 'message': 'The API deployment for this resource does not exist. If you created the deployment wi



LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.

[1m[95m# Agent:[00m [1m[92mLead Data Specialist[00m
[95m## Task:[00m [92mCollect and analyze the following information about the lead:
- Personal Information:
  - Name: Obtain the full name of the lead.
  - Job Title: Determine the lead's current job title.
  - Role Relevance: Assess how relevant the lead's role is to the decision-making process on a scale from 0 to 10.
  - Professional Background: Optionally, gather a brief description of the lead's professional background.

- Company Information:
  - Company Name: Identify the name of the company the lead works for.
  - Industry: Determine the industry in which the company operates.
  - Company Size: Estimate the size of the company in terms of employee count.
  - Revenue: If available, collect information on the annual revenue of the company.
  - Market Presence: Evaluate the company's market presence on a scale from 0 to 10.

- Our Company an

2024-11-26 23:14:18,377 - 139977248014400 - llm.py-llm:170 - ERROR: LiteLLM call failed: litellm.APIError: APIError: OpenAIException - Invalid response object Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/litellm_core_utils/llm_response_utils/convert_dict_to_response.py", line 380, in convert_to_model_response_object
    assert response_object["choices"] is not None and isinstance(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/llms/OpenAI/openai.py", line 860, in completion
    raise e
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/llms/OpenAI/openai.py", line 811, in completion
    return convert_to_model_response_object(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  



LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.

[Flow._execute_single_listener] Error in method score_leads: litellm.APIError: APIError: OpenAIException - Invalid response object Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/litellm_core_utils/llm_response_utils/convert_dict_to_response.py", line 380, in convert_to_model_response_object
    assert response_object["choices"] is not None and isinstance(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/llms/OpenAI/openai.py", line 860, in completion
    raise e
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/llms/OpenAI/openai.py", line 811, in completion
    return convert_to_model_response_o

Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/litellm_core_utils/llm_response_utils/convert_dict_to_response.py", line 380, in convert_to_model_response_object
    assert response_object["choices"] is not None and isinstance(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/llms/OpenAI/openai.py", line 860, in completion
    raise e
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/llms/OpenAI/openai.py", line 811, in completion
    return convert_to_model_response_object(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jovyan/envs/llm_agent_crewai/lib/python3.11/site-packages/litellm/litellm_core_utils/llm_response_utils/convert_dict_to_response.py", line 583, in 

## Usage Metrics and Costs

Let’s see how much it would cost each time if this crew runs at scale.

In [None]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

In [None]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

## Inspecting Results

In [None]:
scores = flow.state["score_crews_results"]

In [None]:
import pandas as pd
from IPython.display import display, HTML

lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    'Name': lead_scoring_result.personal_info.name,
    'Job Title': lead_scoring_result.personal_info.job_title,
    'Role Relevance': lead_scoring_result.personal_info.role_relevance,
    'Professional Background': lead_scoring_result.personal_info.professional_background,
    'Company Name': lead_scoring_result.company_info.company_name,
    'Industry': lead_scoring_result.company_info.industry,
    'Company Size': lead_scoring_result.company_info.company_size,
    'Revenue': lead_scoring_result.company_info.revenue,
    'Market Presence': lead_scoring_result.company_info.market_presence,
    'Lead Score': lead_scoring_result.lead_score.score,
    'Scoring Criteria': ', '.join(lead_scoring_result.lead_score.scoring_criteria),
    'Validation Notes': lead_scoring_result.lead_score.validation_notes
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient='index', columns=['Value'])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={'index': 'Attribute'})

# Create HTML table with bold attributes and left-aligned values
html_table = df.style.set_properties(**{'text-align': 'left'}) \
                     .format({'Attribute': lambda x: f'<b>{x}</b>'}) \
                     .hide(axis='index') \
                     .to_html()

# Display the styled HTML table
display(HTML(html_table))

## Results

In [None]:
import textwrap

result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

## How Complex Can it Get?

In [None]:
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

class SalesPipeline(Flow):
    
  @start()
  def fetch_leads(self):
    # Pull our leads from the database
    # This is a mock, in a real-world scenario, this is where you would
    # fetch leads from a database
    leads = [
      {
        "lead_data": {
          "name": "João Moura",
          "job_title": "Director of Engineering",
          "company": "Clearbit",
          "email": "joao@clearbit.com",
          "use_case": "Using AI Agent to do better data enrichment."
        },
      },
    ]
    return leads

  @listen(fetch_leads)
  def score_leads(self, leads):
    scores = lead_scoring_crew.kickoff_for_each(leads)
    self.state["score_crews_results"] = scores
    return scores

  @listen(score_leads)
  def store_leads_score(self, scores):
    # Here we would store the scores in the database
    return scores

  @listen(score_leads)
  def filter_leads(self, scores):
    return [score for score in scores if score['lead_score'].score > 70]

  @listen(and_(filter_leads, store_leads_score))
  def log_leads(self, leads):
    print(f"Leads: {leads}")

  @router(filter_leads, paths=["high", "medium", "low"])
  def count_leads(self, scores):
    if len(scores) > 10:
      return 'high'
    elif len(scores) > 5:
      return 'medium'
    else:
      return 'low'

  @listen('high')
  def store_in_salesforce(self, leads):
    return leads

  @listen('medium')
  def send_to_sales_team(self, leads):
    return leads

  @listen('low')
  def write_email(self, leads):
    scored_leads = [lead.to_dict() for lead in leads]
    emails = email_writing_crew.kickoff_for_each(scored_leads)
    return emails

  @listen(write_email)
  def send_email(self, emails):
    # Here we would send the emails to the leads
    return emails

## Plotting the Flow

In [None]:
flow = SalesPipeline()
flow.plot()

In [None]:
from IPython.display import IFrame

IFrame(src='./crewai_flow_complex.html', width='150%', height=600)