In [211]:
from firecrawl import FirecrawlApp
from pydantic import BaseModel, Field
from typing import List, TypedDict , Optional, Type
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.callbacks import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun,
)
import os
import requests
from langchain_core.tools import BaseTool
from langgraph.graph import START,END, StateGraph
from langchain_core.prompts import ChatPromptTemplate

In [212]:
load_dotenv()


True

In [213]:
llm=ChatOpenAI() 

In [247]:
class State(TypedDict):
    web_search: List[dict]
    structured_response: List[str]
    Company_Names: List[str]
    Ceos: List[str] 
    emails: List[str]
    phonenumbers: List[str]
    create_record: str
    email_drafts: str
    email_drafts_markdown: str
    

In [253]:
def search(state: State) -> State:
    tavily_tool = TavilySearchResults(max_results=5)
    search_query ='inurl:about intext:"Chief Executive Officer" "real estate"'
    state["web_search"] = tavily_tool.invoke(search_query)
    return state

In [216]:
class SearchResponse(BaseModel):
    url: List[str]

def structured_response(state: State) -> State:
    web_search = state["web_search"]
    urls = [result['url'] for result in web_search]


    prompt = """
    You are a senior search assistant, you have access to search some url dictionary from a search assistant. Your role is to extract all urls from the {web_search}. Return them as a list.
    """

    chat_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", prompt),
            ("human", "{web_search}"),
        ]
    )

    structure = llm.with_structured_output(SearchResponse)

    structured = chat_prompt | structure 

    structured_response = structured.invoke({"web_search": urls})

    return {"structured_response":structured_response.url}



In [227]:
def get_contact(state:State):
    structured_response=state["structured_response"]   
    firecrawl_api_key = os.getenv('FIRECRAWL_API_KEY')
    app = FirecrawlApp(api_key=firecrawl_api_key)
    
    class ContactInfo(BaseModel):
        phonenumber: str
        email: str
        Company_Name: str
        Ceo: str
    class WebsitecontactInfo(BaseModel):
        info: List[ContactInfo]

    class ContactExtractorInput(BaseModel):
        url: str = Field(..., description="Website URL to extract contact information from")

    class CustomContactExtractorTool(BaseTool):
        name: str = "website_contact_extractor"
        description: str = "Extracts company contact information from website URLs including phone numbers and emails"
        args_schema: Type[BaseModel] = ContactExtractorInput

        def _run(
            self, 
            url: str,
            run_manager: Optional[CallbackManagerForToolRun] = None
        ) -> WebsitecontactInfo:
            
            """Main execution method for contact extraction"""
            try:
                response = app.extract([url], {
                    'prompt': '''Extract all company contact information including:
                                - Phone number
                                - Email address
                                - Ceo's Name
                                -Company Name
                                Return structured JSON data''',
                    'schema': {
                        'type': 'object',
                        'properties': {
                            'phonenumber': {'type': 'string'},
                            'email': {'type': 'string'},
                            'Company_Name':{'type':'string'},
                            'Ceo':{'type': 'string'}
                        }
                    }
                })
                
                data = response['data']
                contact_info = ContactInfo(
                    phonenumber=data.get('phonenumber', ''),
                    email=data.get('email', ''),
                    Company_Name=data.get('Company_Name',''),
                    Ceo=data.get('Ceo','')
                )
                website_info = WebsitecontactInfo(info=[contact_info])
                return website_info
            
            except Exception as e:
                return WebsitecontactInfo(info=[])
        async def _arun(
            self, 
            url: str,
            run_manager: Optional[AsyncCallbackManagerForToolRun] = None
        ) -> WebsitecontactInfo:
            """Async version using same sync implementation"""
            return self._run(url, run_manager)

    tool = CustomContactExtractorTool() 
    phonenumbers = []
    emails = []
    Company_Names= []
    Ceos=[]

    for url in structured_response:
        contact = tool.invoke({"url": url})  

        for info in contact.info[:5]: 
            phonenumbers.append(info.phonenumber) 
            emails.append(info.email) 
            Company_Names.append(info.Company_Name)
            Ceos.append(info.Ceo)

    state["phonenumbers"] = phonenumbers  
    state["emails"] = emails  
    state["Company_Names"]=Company_Names
    state["Ceos"]=Ceos
    return state


In [228]:
def record(state: State):
    emails = state["emails"]
    phonenumbers = state["phonenumbers"]
    Company_Names= state["Company_Names"]
    Ceos=state["Ceos"]
    
    class AirtableInput(BaseModel):
        emails: List[str] = Field(..., description="List of email addresses")
        phonenumbers: List[str] = Field(..., description="List of phone numbers")
        Company_Names: List[str] = Field(..., description="List of company names")
        Ceos: List[str] = Field(..., description="List of ceos names")

    class AirtableTool(BaseTool):
        name: str = "airtable_recorder"
        description: str = "Adds contact information to Airtable database"
        args_schema: Type[BaseModel] = AirtableInput

        def _run(
            self, 
            emails: List[str],
            phonenumbers: List[str],
            Company_Names:List[str],
            Ceos: List[str],
            
            run_manager: Optional[CallbackManagerForToolRun] = None
        ) -> str:
            """Main execution method for adding records to Airtable"""
            emails_subset = emails[2:4]
            phonenumbers_subset = phonenumbers[2:4]
            company_names_subset= Company_Names[2:4]
            ceo_names_subset=Ceos[2:4]

            records = []
            for email, phone, CompanyName , ceoname in zip(emails_subset, phonenumbers_subset,company_names_subset,ceo_names_subset):
                records.append({
                    "fields": {
                        "Email Address": email,
                        "Phone Number": phone,
                        "Company Name": CompanyName,
                        "CEO NAME": ceoname

                    }
                })

            url = "https://api.airtable.com/v0/appTbNJ0F0exvFSFw/Email%20Campaign"
            api_token = os.getenv('AIRTABLE_API_KEY')
            if not api_token:
                raise ValueError("AIRTABLE_API_KEY environment variable not found")
            
            headers = {
                "Authorization": f"Bearer {api_token}",
                "Content-Type": "application/json"
            }
        
            data = {"records": records}
        
            try:
                response = requests.post(url, headers=headers, json=data)
                response.raise_for_status()
                return "Airtable record created successfully"
            except requests.exceptions.HTTPError as err:
                return f"Error creating Airtable record: {str(err)}"

        async def _arun(
            self, 
            emails: List[str],
            phonenumbers: List[str],
            Company_Names: List[str],
            Ceos: List[str],
            run_manager: Optional[AsyncCallbackManagerForToolRun] = None
        ) -> str:
            """Async version using same sync implementation"""
            return self._run(emails, phonenumbers,Company_Names, Ceos,run_manager)
        
    tool = AirtableTool()
    create_record = tool.invoke({"emails": emails, "phonenumbers": phonenumbers,"Company_Names":Company_Names, "Ceos":Ceos})
    return {"create_record": create_record}

In [229]:

class MessageDraft(BaseModel):
    subject: str = Field(..., description="Subject of the email")
    body: str = Field(..., description="Body content of the email")

def create_email_drafts(state: State):
    emails = state["emails"]
    Company_Names = state["Company_Names"]
    Ceos=state["Ceos"]
    
    drafts = []
    
    structured_output = llm.with_structured_output(MessageDraft)

    for email, company, ceo in zip(emails[2:4], Company_Names[2:4], Ceos[2:4]):
        structured_response = structured_output.invoke(
            f"You are a top-performing tech sales executive for Cribblr,your name is Auriel an AI agency specializing in voice agents for real estate. Your task is to craft a highly personalized email that demonstrates a deep understanding of {company}'s business requirements. Highlight how Cribblr's voice agents can solve specific real estate challenges faced by {company}. Maintain a professional yet conversational tone, follow email best practices (clear structure, strong CTAs), and make relevant references using available company research. You have access to the email address: {email}, CEO's name:{ceo} and the company name: {company}."
        )
        drafts.append({
            "to_email": email,
            "draft": structured_response,
            "Company_Name":company,
            "ceo_name": ceo
        })

    state["email_drafts"] = drafts
    # Generate a Markdown formatted string for display
    formatted_emails = "\n\n".join(
        f"To: {draft.get('to_email', 'Not provided')}\n\nCompany: {draft.get('Company_Name', 'N/A')}\nCeo: {draft.get('ceo_name', 'N/A')}\nSubject: {draft['draft'].subject}\n\n{draft['draft'].body}"
        for draft in drafts
    )
    state["email_drafts_markdown"] = formatted_emails

    return state


In [256]:
workflow = StateGraph(State)

workflow.add_node("tavily_tool", search)
workflow.add_node("structured", structured_response)
workflow.add_node("get_contact", get_contact)
workflow.add_node("record", record)
workflow.add_node("create_email_drafts", create_email_drafts)

workflow.add_edge("tavily_tool", "structured")
workflow.add_edge("structured", "get_contact")
workflow.add_edge("get_contact", "record")
workflow.add_edge("record", "create_email_drafts")
workflow.add_edge("create_email_drafts", END)
workflow.set_entry_point("tavily_tool")
graph = workflow.compile()

In [None]:
result = graph.invoke({"web_search": 'inurl:about intext:"Chief Executive Officer" "real estate"' })
from IPython.display import Markdown
Markdown(result["email_drafts_markdown"])

web_search: [{'url': 'https://www.brookfield.com/about-us/leadership/brian-kingston', 'content': "Brian Kingston is Chief Executive Officer of Brookfield's Real Estate business. In this role, he is responsible for investments, operations and the expansion"}, {'url': 'https://trumarkrem.com/about-us/chief-executive-officer/', 'content': '... Real Estate Management. ... Jason Trueblood: Chief Executive Officer. After graduated College in 2012, Jason joined a wealth management group with Morgan'}, {'url': 'https://corporate.colliers.com/about-us/board-of-directors/person-details/default.aspx?ItemId=8441c0ec-f183-489b-9681-d174ba19faab', 'content': 'Chris McLernon is the Chief Executive Officer, Real Estate Services | Global. Chris takes responsibility for client relationships, operational excellence, and'}, {'url': 'https://www.us.jll.com/en/about-jll/christian-ulbrich', 'content': 'Christian Ulbrich is Chief Executive Officer and President of JLL, a leading global real estate services an

To: info@brownstonemgt.com

Company: Brownstone Property Group
Ceo: Josh Blackman
Subject: Empowering Brownstone Property Group with Cribblr's Voice Agents

Dear Josh Blackman,

I hope this email finds you well. As a top-performing tech sales executive at Cribblr, I have been thoroughly impressed by Brownstone Property Group's commitment to innovation and excellence in the real estate industry.

Having delved into Brownstone Property Group's business requirements, it is evident that the challenges you face in the real estate landscape are multifaceted. From enhancing customer engagement to streamlining property search processes, the need for advanced technological solutions is paramount.

This is where Cribblr's voice agents stand out as a game-changing solution for Brownstone Property Group. Our AI agency specializes in creating voice agents tailored to the real estate sector, offering innovative features that can revolutionize your operations.

Our voice agents can assist in providing personalized property recommendations to potential buyers, improving the overall customer experience. Additionally, they streamline communication, enabling faster response times and enhancing client interactions.

I believe that leveraging Cribblr's voice agents can address the specific challenges faced by Brownstone Property Group, ultimately driving efficiency and customer satisfaction.

I would welcome the opportunity to discuss how Cribblr can tailor our solutions to meet your unique needs. Please let me know a convenient time for a call or meeting to explore this further.

Looking forward to the possibility of collaborating with Brownstone Property Group and contributing to your continued success in the real estate industry.

Warm regards,
Auriel
Top-Performing Tech Sales Executive
Cribblr

P.S. I have attached a brief overview of Cribblr's voice agents for your review. Feel free to reach out if you have any questions or require further information.

To: 

Company: RE/MAX
Ceo: Erik Carlson
Subject: Personalized Solution for RE/MAX: Enhance Real Estate Operations with Cribblr's Voice Agents

Dear Mr. Carlson,

I hope this email finds you well. As a top-performing tech sales executive at Cribblr, specializing in voice agents for real estate, I have taken the time to thoroughly research RE/MAX's business requirements and challenges.

I understand that RE/MAX, as a leading real estate company, faces the ongoing challenge of efficiently managing a large volume of property listings and client inquiries. This can often lead to time-consuming processes and potential missed opportunities.

Cribblr's advanced voice agents offer a tailored solution to streamline your real estate operations. Our voice agents are equipped with natural language processing capabilities, enabling seamless interaction with clients and providing quick responses to inquiries. By leveraging our voice agents, RE/MAX can significantly improve response times, enhance customer engagement, and ultimately drive more successful transactions.

Furthermore, our voice agents are designed to integrate seamlessly with your existing systems, ensuring a smooth transition and minimal disruption to your operations. With Cribblr's innovative technology, RE/MAX can stay ahead in the competitive real estate market while delivering exceptional customer service.

I would welcome the opportunity to discuss how Cribblr's voice agents can specifically address RE/MAX's unique challenges and contribute to your continued success. Please let me know a convenient time for a call or meeting to further explore this partnership.

Thank you for considering Cribblr as a strategic partner in revolutionizing real estate operations. I look forward to the possibility of working together to achieve mutual success.

Warm regards,
Auriel
Tech Sales Executive
Cribblr