**Asadullah Yousaf**

Portfolio: [link](https://asadyousaf03.github.io/Asadullah/)

Email: asaduyousaf@gmail.com

-------------------------------------------------------------------------------

# Multi-Agent CRM Automation System

This Google Colab notebook implements an AI-powered multi-agent system for automating HubSpot CRM operations (e.g., creating/updating contacts and deals) and sending email notifications. It uses free-tier tools: Google Gemini Flash 1.5 for the language model and Hugging Face embeddings for query classification.

## Overview
- **Purpose**: Process user queries to perform CRM tasks and send confirmation emails.
- **Agents**:
  - **Global Orchestrator (Supervisor)**: Analyzes queries and delegates tasks.
  - **HubSpot Agent**: Handles CRM operations (create/update contacts/deals).
  - **Email Agent**: Sends email notifications.
- **Features**:
  - Robust error handling for invalid queries or API failures.
  - Logging to `agent.log` for debugging.
  - Free-tier compatible with rate-limiting delays.

## Setup Instructions
Follow these steps to run the notebook:

1. **Create API Accounts**:
   - **Google Gemini**: Get a free API key from [Google AI Studio](https://aistudio.google.com/app/apikey).
   - **HubSpot**: Sign up for a free developer account and get an API key from [HubSpot Developers](https://developers.hubspot.com).
   - **SMTP (Email)**: Use Gmail (enable 2FA, generate an [App Password](https://myaccount.google.com/security)) or another free SMTP service.

2. ** You Can provide HT_TOKEN (Not Necessary but better) **
3. **Upload `config.json`**:
   - In Colab, click the **Files** tab (left sidebar).
   - Click the upload icon and select your `config.json`.

4. **Install Dependencies**:
   Run the cell below to install required Python packages.

5. **Run the Notebook**:
   - The final cell runs an example query: "Create a new contact for Asadullah with email asadullahyousaf786@gmail.com and send a confirmation email to him."
   - Check outputs and `agent.log` (in Files tab) for results or errors.

## Install Dependencies


In [1]:
!pip install -qU langgraph langchain-google-genai hubspot-api-client transformers langchain-core

## Importing Necessary Libraries

In [2]:
import os
import json
import logging
from typing import List
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage, BaseMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, MessagesState, END
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from hubspot import HubSpot
from hubspot.crm.contacts import SimplePublicObjectInputForCreate, PublicObjectSearchRequest, SimplePublicObjectInput
from hubspot.crm.deals import SimplePublicObjectInput as DealSimplePublicObjectInput
from hubspot.crm.contacts.exceptions import ApiException as ContactsApiException
from hubspot.crm.deals.exceptions import ApiException as DealsApiException
import smtplib
from email.mime.text import MIMEText
from transformers import AutoTokenizer, AutoModel
from torch import cosine_similarity
import torch
from pydantic import BaseModel, Field
import time


## Initial Setup **Logging Validating API calls**
This code sets up logging to agent.log, loads API keys from config.json, and initializes Gemini Flash 2.5, HubSpot client, and Hugging Face embeddings for query classification. Upload config.json to Colab’s /content/ directory before running. Check agent.log for debugging.

In [3]:
# Set up logging
logging.basicConfig(level=logging.INFO, filename='agent.log', format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load configurations from JSON file
try:
    with open('/content/config.json', 'r') as f:
        config = json.load(f)
except FileNotFoundError:
    logger.error("config.json not found. Please create it with API keys.")
    raise

os.environ["GOOGLE_API_KEY"] = config["GOOGLE_API_KEY"]
os.environ["HUBSPOT_API_KEY"] = config["HUBSPOT_API_KEY"]
os.environ["SMTP_SERVER"] = config["SMTP_SERVER"]
os.environ["SMTP_PORT"] = config["SMTP_PORT"]
os.environ["SMTP_USER"] = config["SMTP_USER"]
os.environ["SMTP_PASS"] = config["SMTP_PASS"]
os.environ["FROM_EMAIL"] = config["FROM_EMAIL"]

# Initialize Gemini Flash 2.5 LLM
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")

# Initialize HubSpot client
api_client = HubSpot(access_token=os.environ["HUBSPOT_API_KEY"])

# Initialize Hugging Face embedder
embedder_model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = AutoTokenizer.from_pretrained(embedder_model_name)
embedder = AutoModel.from_pretrained(embedder_model_name)

def get_embedding(text: str):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        embeddings = embedder(**inputs).last_hidden_state.mean(dim=1)
    return embeddings


## Query Classification and Tool Definitions
This section defines how user queries are classified and sets up tools for HubSpot CRM and email operations:


*   #### Query Classification:
    The classify_query function uses Hugging Face embeddings to match queries (e.g., "create a contact") to agents (hubspot_agent or email_agent). It defaults to the supervisor for unclear queries and prioritizes hubspot_agent for CRM-related terms like "contact" or "deal".
*   #### HubSpot Tools:
    Functions (create_contact, update_contact, create_deal, update_deal) interact with HubSpot’s API to manage contacts and deals, handling errors (e.g., duplicate contacts) and logging results to agent.log.
    
*  #### Email Tool:
    The send_email function sends email notifications via SMTP, with error handling for invalid addresses or server issues.
*   #### Usage:
    These tools are called by agents based on the classified query. Ensure config.json has valid SMTP settings for emails. Check agent.log for detailed execution logs.






In [4]:
# Classifier with weighted scoring
agent_descriptions = {
    "hubspot_agent": [
        ("create a contact", 1.0),
        ("update a contact", 1.0),
        ("create a deal", 1.0),
        ("update deal", 1.0),
        ("manage CRM operations", 0.8),
        ("add new contact", 1.0),
        ("edit contact", 1.0)
    ],
    "email_agent": [
        ("send an email", 1.0),
        ("email notification", 1.0),
        ("confirm action via email", 1.0)
    ]
}
embedded_descriptions = {
    agent: [(get_embedding(desc), weight) for desc, weight in descs]
    for agent, descs in agent_descriptions.items()
}

def classify_query(query: str) -> str:
    query_emb = get_embedding(query)
    max_score = -1
    best_agent = "supervisor"
    for agent, embeddings in embedded_descriptions.items():
        for emb, weight in embeddings:
            sim = cosine_similarity(query_emb, emb).item() * weight
            if sim > max_score:
                max_score = sim
                best_agent = agent
    logger.info(f"Query: {query}, Classified as: {best_agent}, Score: {max_score}")
    crm_keywords = ["contact", "deal", "create", "update", "crm"]
    if any(keyword in query.lower() for keyword in crm_keywords):
        best_agent = "hubspot_agent"
        logger.info(f"Overriding to hubspot_agent due to CRM keywords in query")
    return best_agent

# HubSpot Tools
@tool
def create_contact(properties: dict):
    """Create a new contact in HubSpot with the given properties."""
    logger.info(f"Creating contact with properties: {properties}")
    try:
        if 'email' in properties:
            email = properties['email']
            filter_group = {
                "filters": [{"propertyName": "email", "operator": "EQ", "value": email}]
            }
            search_request = PublicObjectSearchRequest(filter_groups=[filter_group])
            search_response = api_client.crm.contacts.search_api.do_search(public_object_search_request=search_request)
            if search_response.total > 0:
                existing_id = search_response.results[0].id
                logger.info(f"Contact already exists with ID: {existing_id}")
                return {
                    "success": False,
                    "error": f"Contact already exists with ID: {existing_id}",
                    "id": existing_id
                }

        input_obj = SimplePublicObjectInputForCreate(properties=properties)
        response = api_client.crm.contacts.basic_api.create(simple_public_object_input_for_create=input_obj)
        result = {"success": True, "id": response.id, "properties": response.properties}
        logger.info(f"Contact created successfully: {result}")
        return result
    except ContactsApiException as e:
        error_msg = f"Failed to create contact: {str(e)}"
        logger.error(error_msg)
        return {"success": False, "error": error_msg}

@tool
def update_contact(contact_id: str, properties: dict):
    """Update an existing contact in HubSpot with the given contact_id and properties."""
    logger.info(f"Updating contact {contact_id} with properties: {properties}")
    try:
        input_obj = SimplePublicObjectInput(properties=properties)
        response = api_client.crm.contacts.basic_api.update(
            contact_id=contact_id,
            simple_public_object_input=input_obj
        )
        result = {"success": True, "id": response.id, "properties": response.properties}
        logger.info(f"Contact updated successfully: {result}")
        return result
    except ContactsApiException as e:
        error_msg = f"Failed to update contact: {str(e)}"
        logger.error(error_msg)
        return {"success": False, "error": error_msg}

@tool
def create_deal(properties: dict):
    """Create a new deal in HubSpot with the given properties."""
    logger.info(f"Creating deal with properties: {properties}")
    try:
        input_obj = DealSimplePublicObjectInput(properties=properties)
        response = api_client.crm.deals.basic_api.create(simple_public_object_input=input_obj)
        result = {"success": True, "id": response.id, "properties": response.properties}
        logger.info(f"Deal created successfully: {result}")
        return result
    except DealsApiException as e:
        error_msg = f"Failed to create deal: {str(e)}"
        logger.error(error_msg)
        return {"success": False, "error": error_msg}

@tool
def update_deal(deal_id: str, properties: dict):
    """Update an existing deal in HubSpot with the given deal_id and properties."""
    logger.info(f"Updating deal {deal_id} with properties: {properties}")
    try:
        input_obj = DealSimplePublicObjectInput(properties=properties)
        response = api_client.crm.deals.basic_api.update(
            deal_id=deal_id,
            simple_public_object_input=input_obj
        )
        result = {"success": True, "id": response.id, "properties": response.properties}
        logger.info(f"Deal updated successfully: {result}")
        return result
    except DealsApiException as e:
        error_msg = f"Failed to update deal: {str(e)}"
        logger.error(error_msg)
        return {"success": False, "error": error_msg}

hubspot_tools = [create_contact, update_contact, create_deal, update_deal]

# Email Tool
@tool
def send_email(to_email: str, subject: str, body: str):
    """Send an email notification."""
    logger.info(f"Sending email to {to_email} with subject: {subject}")
    logger.info(f"SMTP Config: server={os.environ['SMTP_SERVER']}, port={os.environ['SMTP_PORT']}, user={os.environ['SMTP_USER']}")
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = os.environ["FROM_EMAIL"]
    msg['To'] = to_email
    try:
        server = smtplib.SMTP(os.environ["SMTP_SERVER"], int(os.environ["SMTP_PORT"]))
        server.starttls()
        server.login(os.environ["SMTP_USER"], os.environ["SMTP_PASS"])
        server.sendmail(os.environ["FROM_EMAIL"], to_email, msg.as_string())
        server.quit()
        result = {"success": True, "message": f"Email sent to {to_email}"}
        logger.info(f"Email sent successfully: {result}")
        return result
    except Exception as e:
        error_msg = f"Failed to send email: {str(e)}"
        logger.error(error_msg)
        return {"success": False, "error": error_msg}

email_tools = [send_email]

## Agent Creation
This section sets up agents for task execution:


*   #### Routing Schemas:
    TransferToHubspot and TransferToEmail define tasks for HubSpot and Email agents (e.g., create contact, send email).
*   #### Agent Factory:
    create_agent_node builds agents using Gemini Flash 2.5 to process tasks and call tools.

*   #### HubSpot Agent:
    Manages CRM tasks, summarizing results (e.g., "Contact created").
    Email Agent: Sends email notifications, summarizing outcomes (e.g., "Email sent").
*   #### Usage:
    Ensure valid email addresses in tasks. Check agent.log for details/errors.


In [5]:
# Routing Schemas for Supervisor
class TransferToHubspot(BaseModel):
    """Delegate task to HubSpot Agent."""
    task_description: str = Field(description="Clear task for HubSpot Agent, e.g., 'Create contact with properties: {'firstname': 'Asadullah', 'email': 'asaduyousaf@gmail.com'}'")
# add valid email adress where you want to send the confirmation mail.
class TransferToEmail(BaseModel):
    """Delegate task to Email Agent."""
    task_description: str = Field(description="Clear task for Email Agent, e.g., 'Send confirmation email to asaduyousaf@gmail.com with subject \"Contact Created\" and body containing contact ID'")

# Agent Node Factory
from langchain_core.prompts import ChatPromptTemplate

def create_agent_node(llm, tools, system_prompt):
    def agent_node(state: MessagesState):
        try:
            # Extract task_description from supervisor's tool call
            task = ""
            for msg in reversed(state["messages"]):
                if isinstance(msg, AIMessage) and msg.tool_calls:
                    task = msg.tool_calls[0]["args"].get("task_description", "")
                    break

            prompt = ChatPromptTemplate.from_messages([
                SystemMessage(content=system_prompt + f"\nCurrent task: {task}"),
                *state["messages"],
            ])
            bound_llm = llm.bind_tools(tools)
            response = bound_llm.invoke(prompt.format_messages())
            logger.info(f"Agent ({tools[0].name if tools else 'supervisor'}): {response.content}")
            return {"messages": [response]}
        except Exception as e:
            logger.error(f"Agent node error: {str(e)}")
            return {"messages": [AIMessage(content=f"Error: {str(e)}")]}
    return agent_node

# HubSpot Agent
hubspot_system_prompt = (
    "You are the HubSpot Agent. Use the current task to prepare dynamic payloads and call tools like create_contact, update_contact, etc.\n"
    "After receiving tool results, output a summary (e.g., 'Contact created with ID: xxx'). Do not call more tools if task is complete.\n"
    "Handle errors and include them in the summary."
)
hubspot_agent = create_agent_node(llm, hubspot_tools, hubspot_system_prompt)
hubspot_tool_node = ToolNode(tools=hubspot_tools)

# Email Agent
email_system_prompt = (
    "You are the Email Agent. Use the current task to generate email content and call send_email.\n"
    "After receiving tool results, output a summary (e.g., 'Email sent successfully'). Do not call more tools if task is complete.\n"
    "Handle errors and include them in the summary."
)
email_agent = create_agent_node(llm, email_tools, email_system_prompt)
email_tool_node = ToolNode(tools=email_tools)


## Supervisor and Workflow Setup
This section defines the supervisor agent and the workflow graph:


*   #### Supervisor Agent:
    The supervisor_agent uses Gemini Flash 2.5 to analyze queries and delegate tasks via TransferToHubspot or TransferToEmail, based on the prompt instructions. It tracks task history, handles errors, and outputs a final summary when complete.
*   #### Graph Definition:
    The StateGraph connects the supervisor, HubSpot, and Email agents with their tools, defining the workflow structure.
*   #### Routing:
    The supervisor_router directs tasks to hubspot_agent, email_agent, or ends the workflow, while agent_router manages tool interactions.
*   #### Usage:
    This orchestrates the system’s flow. Check agent.log for supervisor actions and errors.


In [6]:

# Supervisor (Orchestrator)
supervisor_system_prompt = (
    "You are the Global Orchestrator Agent. Analyze the query and history to delegate tasks:\n"
    "- Call TransferToHubspot for CRM operations (create/update contacts/deals).\n"
    "- Call TransferToEmail for email notifications, only after a successful CRM operation.\n"
    "Steps:\n"
    "1. Review history for completed tasks (look for ToolMessages with 'success': true).\n"
    "2. If CRM needed and not done, call TransferToHubspot with task_description.\n"
    "3. If CRM succeeded (e.g., contact ID in history), call TransferToEmail with task including ID and details.\n"
    "4. If both CRM and email complete, output 'Final summary: [brief overall summary]' without calling tools.\n"
    "5. Handle errors by including them in next task or final summary. Do not repeat completed tasks.\n"
    "Provide reasoning in your response, then call the tool if needed."
)
supervisor_bound_llm = llm.bind_tools([TransferToHubspot, TransferToEmail])

def supervisor_agent(state: MessagesState):
    try:
        prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content=supervisor_system_prompt),
            *state["messages"],
        ])
        response = supervisor_bound_llm.invoke(prompt.format_messages())
        logger.info(f"Supervisor: {response.content}")
        return {"messages": [response]}
    except Exception as e:
        logger.error(f"Supervisor error: {str(e)}")
        return {"messages": [AIMessage(content=f"Error: {str(e)}")]}

# Graph Definition
graph = StateGraph(state_schema=MessagesState)
graph.add_node("supervisor", supervisor_agent)
graph.add_node("hubspot_agent", hubspot_agent)
graph.add_node("hubspot_tools", hubspot_tool_node)
graph.add_node("email_agent", email_agent)
graph.add_node("email_tools", email_tool_node)

# Supervisor Router
def supervisor_router(state: MessagesState):
    last_msg = state["messages"][-1]
    if not isinstance(last_msg, AIMessage) or not last_msg.tool_calls:
        if "Final summary" in last_msg.content:
            return END
        return "supervisor"

    tool_call_name = last_msg.tool_calls[0]["name"]
    if tool_call_name == "TransferToHubspot":
        return "hubspot_agent"
    elif tool_call_name == "TransferToEmail":
        return "email_agent"
    return END

graph.set_entry_point("supervisor")
graph.add_conditional_edges("supervisor", supervisor_router, {"hubspot_agent": "hubspot_agent", "email_agent": "email_agent", END: END, "supervisor": "supervisor"})

# Agent-Tool Loops
def agent_router(state: MessagesState, agent_name: str):
    last_msg = state["messages"][-1]
    if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
        return f"{agent_name}_tools"
    return "supervisor"

graph.add_conditional_edges("hubspot_agent", lambda s: agent_router(s, "hubspot"), {"hubspot_tools": "hubspot_tools", "supervisor": "supervisor"})
graph.add_edge("hubspot_tools", "hubspot_agent")
graph.add_conditional_edges("email_agent", lambda s: agent_router(s, "email"), {"email_tools": "email_tools", "supervisor": "supervisor"})
graph.add_edge("email_tools", "email_agent")

# Compile
app = graph.compile()

##Running the Workflow
This section executes the multi-agent system with a user query:



*   #### User Input:
    Enter a query via input() (e.g., "Create a new contact for Asadullah with email asadullahyousaf786@gmail.com").
*   #### Classification:
    Routes the query to the correct agent (e.g., hubspot_agent for CRM).
*   #### Execution:
    Processes the query through agents, showing results in the console.
*   #### Errors:
    Logs issues to agent.log and displays them.
*   #### Usage:
    Input a clear query. Try "Update contact ID 12345" or "Send email to your-email@example.com". Check agent.log for errors.




In [7]:
# Prompt user to enter a query
user_query = input("Enter your query (e.g., 'Create a new contact for Asadullah with email asadullahyousaf786@gmail.com and send a confirmation email to him'): ")

# Log initial classification
initial_agent = classify_query(user_query)
logger.info(f"Embedding-based initial classification: {initial_agent}")
print(f"Embedding-based initial classification: {initial_agent}")

# Stream the workflow with rate limiting
try:
    for event in app.stream({"messages": [HumanMessage(content=user_query)]}, config={"recursion_limit": 20}):
        for key, value in event.items():
            print(f"Output from {key}:")
            if "messages" in value:
                for msg in value["messages"]:
                    if isinstance(msg, AIMessage):
                        print(f"AI: {msg.content}")
                        if msg.tool_calls:
                            print(f"Tool Calls: {msg.tool_calls}")
                    elif isinstance(msg, ToolMessage):
                        print(f"Tool Result: {msg.content}")
                    elif isinstance(msg, HumanMessage):
                        print(f"Human: {msg.content}")
            print("---")
            time.sleep(6)  # Delay to respect Gemini free-tier rate limits
except Exception as e:
    logger.error(f"Stream error: {str(e)}")
    print(f"Error during streaming: {str(e)}")

Enter your query (e.g., 'Create a new contact for Asadullah with email asadullahyousaf786@gmail.com and send a confirmation email to him'): asadullahyousaf0786@gmail.com
Embedding-based initial classification: email_agent
Output from supervisor:
AI: 
Tool Calls: [{'name': 'TransferToHubspot', 'args': {'task_description': "Create contact with properties: {'email': 'asadullahyousaf0786@gmail.com'}"}, 'id': 'e6052001-0d67-4a82-92c9-04a904a4f508', 'type': 'tool_call'}]
---
Output from hubspot_agent:
AI: 
Tool Calls: [{'name': 'create_contact', 'args': {'properties': {'email': 'asadullahyousaf0786@gmail.com'}}, 'id': 'a2803c89-2eb6-4587-92cd-4dee8cfa74fa', 'type': 'tool_call'}]
---
Output from hubspot_tools:
Tool Result: {"success": true, "id": "293923150574", "properties": {"createdate": "2025-10-21T18:43:31.493Z", "email": "asadullahyousaf0786@gmail.com", "hs_all_contact_vids": "293923150574", "hs_associated_target_accounts": "0", "hs_currently_enrolled_in_prospecting_agent": "false", "hs

## Handling Invalid Queries
- **Unclear Queries** (e.g., "hello world"): The supervisor responds with "Query unclear, please specify a valid CRM or email task" and ends the workflow.
- **Invalid CRM Inputs** (e.g., missing email): HubSpot tools return errors (e.g., `{"success": False, "error": "..."}`), logged to `agent.log`.
- **Invalid Email Inputs** (e.g., bad email address): Email tool returns errors, handled by the email agent.
- **Logs**: Check `agent.log` in the Files tab for detailed error information.

## Troubleshooting
- **config.json Error**: Ensure the file is uploaded and contains valid keys.
- **Rate Limits**: Gemini free-tier may throttle requests; the 6-second delay helps.
- **API Errors**: Verify HubSpot/SMTP credentials. Check `agent.log` for specifics.
- **Dependencies**: Re-run the install cell if import errors occur.

## Notes
- This system is modular and reusable for other CRM or email tasks.
- Free-tier tools ensure no cost for testing.
- For production, consider upgrading Gemini or HubSpot plans for higher quotas.
- Contact the developer for customization or support.
