In [8]:
import io 
import os 
from agno.run.response import RunResponse 
from agno.tools.zoom import ZoomTools
from typing import List 
from agno.models.azure import AzureOpenAI
from agno.tools.resend import ResendTools
from agno.utils.log import  logger 
from agno.workflow import Workflow
from pypdf import PdfReader
from pydantic import BaseModel,Field
from agno.agent import Agent


In [10]:
class ScreeningResult(BaseModel):
    name : str = Field(description="The name of the candidate")
    email : str = Field(description="The email address of the candidate")
    score : float = Field(description="The score achieved by the candidate")
    feedback : str = Field(description="Feedback provided to the candidate")


class CandidateScheduledCall(BaseModel):
    name : str = Field(description="The name of the candidate")
    email : str = Field(description= "The email of the candidate")
    call_time : str = Field(description="The scheduled time for the candidate's call")
    url : str = Field(description="The url of the call")

class Email(BaseModel):
    subject : str = Field(description="The subject of the email")
    bofy : str = Field(description="The body of the email")



In [None]:
class EmployeeRecuritmentWorkflow(Workflow):
    screening_agent : Agent = Agent(
        description="You are an HR agent that screens candidates for a job interview.",
        model=AzureOpenAI(id="gpt-4o",api_key="",azure_endpoint="https:"),
        instructions=[
            "You are an expert HR agent that screens candidates for a job interview.",
            "You are given a candidate's name and resume and job description.",
            "You need to screen the candidate and determine if they are a good fit for the job.",
            "You need to provide a score for the candidate from 0 to 10.",
            "You need to provide a feedback for the candidate on why they are a good fit or not.",
        ],
        response_model=ScreeningResult,
    )
    interview_scheduler_agent: Agent = Agent(
        description="You are an interview scheduler agent that schedules interviews for candidates.",
        model=AzureOpenAI(id="gpt-4o",api_key="",azure_endpoint="https:/"),
        instructions=[
            "You are an interview scheduler agent that schedules interviews for candidates.",
            "You need to schedule interviews for the candidates using the Zoom tool.",
            "You need to schedule the interview for the candidate at the earliest possible time between 10am and 6pm.",
            "Check if the candidate and interviewer are available at the time and if the time is free in the calendar.",
            "You are in IST timezone and the current time is {current_time}. So schedule the call in future time with reference to current time.",
        ],
        tools=[
            ZoomTools(
                account_id=os.getenv("ZOOM_ACCOUNT_ID"),
                client_id=os.getenv("ZOOM_CLIENT_ID"),
                client_secret=os.getenv("ZOOM_CLIENT_SECRET"),
            )
        ],
        response_model=CandidateScheduledCall,
    )

    email_writer_agent: Agent = Agent(
        description="You are an expert email writer agent that writes emails to selected candidates.",
        model=AzureOpenAI(id="gpt-4o",api_key="02j8yI53TMGAgFEoDuiCkrYuUzXLB5N2hJik92GyFvnnSxPt68uHJQQJ99BAACYeBjFXJ3w3AAABACOG4ocs",azure_endpoint="https://genairag.openai.azure.com/"),
        instructions=[
            "You are an expert email writer agent that writes emails to selected candidates.",
            "You need to write an email and send it to the candidates using the Resend tool.",
            "You represent the company and the job position.",
            "You need to write an email that is concise and to the point.",
            "You need to write an email that is friendly and professional.",
            "You need to write an email that is not too long and not too short.",
            "You need to write an email that is not too formal and not too informal.",
        ],
        response_model=Email,
    )

    email_sender_agent: Agent = Agent(
        description="You are an expert email sender agent that sends emails to selected candidates.",
        model=AzureOpenAI(id="gpt-4o",api_key="",azure_endpoint="https:"),
        instructions=[
            "You are an expert email sender agent that sends emails to selected candidates.",
            "You need to send an email to the candidate using the Resend tool.",
            "You will be given the email subject and body and you need to send it to the candidate.",
        ],
        tools=[ResendTools(from_email="email@agno.com")],
    )

    def extract_text_from_pdf(self,pdf_url : str) -> str:
        response = requests.get(pdf_url)
        response.raise_for_status()

        pdf_file = io.BytesIO(response.content)
        pdf_Reader = PdfReader(pdf_file)

        text_content = "" 
        for page in pdf_Reader.pages:
            text_content += page.extract_text() + "\n"
        return text_content.strip()

    def run(self,candidate_resume_url : List[str],job_description : str) -> RunResponse:
        selected_candidates = []
        if not candidate_resume_urls:
            raise Exception("candidate_resume_urls cannot be empty")

        for resume_url in candidate_resume_urls:
            # Extract text from PDF resume
            if resume_url in self.session_state:
                resume_content = self.session_state[resume_url]
            else:
                resume_content = self.extract_text_from_pdf(resume_url)
                self.session_state[resume_url] = resume_content
            screening_result = None

            if resume_content:
                # Screen the candidate
                input = f"Candidate resume: {resume_content}, Job description: {job_description}"
                screening_result = self.screening_agent.run(input)
                logger.info(screening_result)
            else:
                logger.error(f"Could not process resume from URL: {resume_url}")

            if (
                screening_result
                and screening_result.content
                and screening_result.content.score > 7.0
            ):
                selected_candidates.append(screening_result.content)

        for selected_candidate in selected_candidates:
            input = f"Schedule a 1hr call with Candidate name: {selected_candidate.name}, Candidate email: {selected_candidate.email} and the interviewer would be Manthan Gupts with email manthan@agno.com"
            scheduled_call = self.interview_scheduler_agent.run(input)
            logger.info(scheduled_call.content)

            if (
                scheduled_call.content
                and scheduled_call.content.url
                and scheduled_call.content.call_time
            ):
                input = f"Write an email to Candidate name: {selected_candidate.name}, Candidate email: {selected_candidate.email} for the call scheduled at {scheduled_call.content.call_time} with the url {scheduled_call.content.url} and congratulate them for the interview from John Doe designation Senior Software Engineer and email john@agno.com"
                email = self.email_writer_agent.run(input)
                logger.info(email.content)

                if email.content:
                    input = f"Send email to {selected_candidate.email} with subject {email.content.subject} and body {email.content.body}"
                    self.email_sender_agent.run(input)

        return RunResponse(
            content=f"Selected {len(selected_candidates)} candidates for the interview.",
        )


In [None]:
workflow = EmployeeRecruitmentWorkflow()

In [None]:
result = workflow.run(
    candidate_resume_urls=[
      #url
    ],
    job_description="""
        We are hiring for backend and systems engineers!
        Join our team building the future of agentic software

        Apply if:
        🧠 You know your way around Python, typescript, docker, and AWS.
        ⚙️ Love to build in public and contribute to open source.
        🚀 Are ok dealing with the pressure of an early-stage startup.
        🏆 Want to be a part of the biggest technological shift since the internet.
        🌟 Bonus: experience with infrastructure as code.
        🌟 Bonus: starred Agno repo.
    """,
)

In [None]:
result.content