**TITLE:** MULTI-AGENT INTERVIEWING SYSTEM

**DEVELOPERS:**

------

# Setup Instructions for Jupyter Notebook

This notebook installs essential packages for working with LangChain, OpenAI, and other data handling tools. 

### Important Notes:
- **Google Colab Users**: If you are using Google Colab, ensure to install `google-colab` specific packages. 
- **GPU Configuration**: If using Google Colab, you can enable GPU for faster performance by going to:
  - **Runtime** > **Change runtime type** > **Hardware accelerator** and selecting **GPU**.
  
---

## Step 1: Install General Utilities and Google Colab Packages


In [None]:
# Install general utilities and widgets
%pip install pandas opendatasets nest_asyncio ipywebrtc ipywidgets IPython 

In [None]:
# Only run this cell if using google-colab, else skip it
%pip install google-colab

---

## Step 2: Install OpenAI, LangChain, and Related Tools
These packages are necessary for using OpenAI’s language models and LangChain's toolkit for search, document processing, and data handling.

---


In [None]:
# OpenAI and related LangChain tools
%pip install openai langchain_openai

# LangChain Community Tools for search and document handling
%pip install langchain_community

# Typing extensions and Pydantic
%pip install typing_extensions pydantic

# LangGraph and experimental LangChain tools
%pip install langgraph langchain_experimental

In [None]:
# For agent tools
%pip install pypdf wikipedia duckduckgo-search playwright

!playwright install

---

## Step 3: Database Utilities, SQLAlchemy, and FAISS for Vector Storage

- **Database Utilities**: Install SQLAlchemy for database interactions.
- **FAISS**: Choose `faiss-cpu` for CPU environments or `faiss-gpu` if you've enabled GPU support on Colab.

---


In [None]:
# Database utilities and SQLAlchemy
%pip install SQLAlchemy

# FAISS for vector storage and retrieval
%pip install faiss-cpu

In [None]:
!where python
!pip show playwright


## General Imports
This cell includes the essential imports needed to use LangChain, OpenAI, and other data handling tools in any Jupyter Notebook or Python environment.


In [None]:
# General imports for data handling, display, and LangChain functionality
import os
import opendatasets as od
import nest_asyncio

from ipywebrtc import AudioRecorder, CameraStream
from IPython.display import Audio, display, clear_output
import ipywidgets as widgets

import openai
from openai import OpenAI
from langchain_openai import ChatOpenAI

# LangChain and related tools
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_community.document_transformers import BeautifulSoupTransformer
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import Docx2txtLoader
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import CharacterTextSplitter
from langchain.tools.retriever import create_retriever_tool

# LangChain Agents and supporting libraries
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, trim_messages
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from pydantic import BaseModel
from typing import Annotated, Literal, Sequence, List
import functools
import operator
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import create_react_agent
from langchain_experimental.tools import PythonREPLTool
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.memory import MemorySaver


## Google Colab Specific Imports
This cell should be run only if you're using Google Colab.


In [None]:
# Google Colab specific imports
from google.colab import output
from google.colab import userdata
from google.colab import files


In [None]:
nest_asyncio.apply()

# Get API Keys

In [None]:
# Check if running on Google Colab
try:
    # Retrieve API key from Google Colab userdata (if stored there)
    open_ai_api_key = userdata.get('OPENAI_API_KEY')
except:
    # Not running on Google Colab; prompt for API key input or retrieve from environment variables
    open_ai_api_key = os.getenv('OPENAI_API_KEY') or input("Enter your OpenAI API key: ")

# Set the API key as an environment variable for universal access within the notebook
os.environ['OPENAI_API_KEY'] = open_ai_api_key

# Confirm setup
if open_ai_api_key:
    print(f"API key successfully set: {open_ai_api_key}")
else:
    print("API key not set. Please check your setup.")


In [None]:
# Todo: 
# Need to have an alternative that grabs a HuggingFace API key and interfaces with free models there (Llama-3-8B)

# Create Tools

In [None]:
from langchain_core.tools import tool


## 1. Speech-to-text

This tool allows the user to record speech and converts it to a text using OpenAI Whisper model.



In [None]:
client = OpenAI()

In [None]:
# for colab
output.enable_custom_widget_manager()

In [None]:
def setup_audio_recorder():
    camera = CameraStream(constraints={'audio': True, 'video': False})
    recorder = AudioRecorder(stream=camera)
    display(recorder)
    return recorder

In [None]:
def save_recording(recorder):
    audio_data = recorder.audio.value
    if audio_data:
        with open("recording.webm", "wb") as f:
            f.write(audio_data)
        return "recording.webm"
    else:
        print("No audio data was captured. Please try again.")
        return None

In [None]:
def convert_to_wav(input_filename, output_filename="my_recording.wav"):
    if input_filename and os.path.exists(input_filename):
        os.system(f"ffmpeg -i {input_filename} -ac 1 -f wav {output_filename} -y -hide_banner -loglevel panic")
        if os.path.exists(output_filename):
            return output_filename
        else:
            print("Conversion failed.")
            return None
    else:
        print("Input file does not exist.")
        return None

In [None]:
def transcribe_audio(filename):
    with open(filename, "rb") as audio_file:
        transcription = client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file
        )
    print("")
    print("Transcription:", transcription.text)
    return transcription.text

In [None]:
def record_and_transcribe_candidate_answer():
    """Record and transcribe a candidate's answer on interviewers' questions."""
    # Set up the recorder
    recorder = setup_audio_recorder()

    # Create a save button
    print("")
    save_button = widgets.Button(description="Save Recording")

    # This dictionary will store the transcribed text
    transcription_result = {}

    # Define the callback function for the save button
    def on_save_clicked(button):
        # Save the recording
        webm_file = save_recording(recorder)
        if webm_file:
            # Convert to wav format
            wav_file = convert_to_wav(webm_file)
            if wav_file:
                # Transcribe the audio and store the result
                transcription_result['text'] = transcribe_audio(wav_file)

    save_button.on_click(on_save_clicked)
    display(save_button)

    # Return the transcription result dictionary
    return transcription_result

In [None]:
# Todo:
# Try to do live transcription, rather than recording a file. 
# Take a look at https://gist.github.com/Vaibhavs10/a48d141534cc8d877937d421bb828d8e
# and https://github.com/VRSEN/langchain-agents-tutorial/blob/main/main.py

# FOSS alternative pipeline, that doesn't rely on OpenAI models
# Using HF free API instead 
# Something like https://github.com/nyrahealth/CrisperWhisper?tab=readme-ov-file#31-usage-with--transformers

## 2. Text Input

In [None]:
def setup_text_input():
    text_input = widgets.Textarea(
        placeholder="Type your answer here...",
        description="Answer:",
        layout=widgets.Layout(width='500px', height='100px')
    )
    display(text_input)
    return text_input

In [None]:
def submit_text_input(text_widget):
    user_text = text_widget.value
    if user_text.strip():
        print("\nInput:\n", user_text)
        return user_text
    else:
        print("No input was provided. Please type your answer and try again.")
        return None

In [None]:
def record_and_submit_text():
    """Record a candidate's text answer on interviewers' questions which require written output like code."""
    # Set up the text input widget
    text_widget = setup_text_input()

    # Create a submit button
    print("")
    submit_button = widgets.Button(description="Save Answer")

    # This variable will store the submitted text
    submission_result = {}

    # Define the callback function for the submit button
    def on_submit_clicked(button):
        # Capture the user's text input and store it in the dictionary
        submission_result['text'] = submit_text_input(text_widget)

    submit_button.on_click(on_submit_clicked)
    display(submit_button)

    # Wait for user input to be submitted
    return submission_result

## 3. CV Reader

CV Reader for PDF and DOCX files.

Instead of CV you can upload your LinkedIn profile extract, which can be exported in a PDF format.

This tools can be easily changed to any file reading service, e.g., Azure DI, LlamaParse, custom parsing with PyPdf, etc.

In [None]:
# colab version

def upload_and_filter_file():
    # Upload a single file
    uploaded = files.upload()

    # Check if only one file was uploaded
    if len(uploaded) != 1:
        print("Please upload exactly one file.")
        return None

    # Get the uploaded file name and data
    file_name, file_data = next(iter(uploaded.items()))

    # Check if the file is .pdf or .docx
    if not file_name.endswith(('.pdf', '.docx')):
        print("Invalid file type. Please upload only .pdf or .docx files.")
        return None

    # Save the file directly to the /content/ directory
    file_path = f'/content/{file_name}'

    return file_path

cv_file_path = upload_and_filter_file()

In [None]:
# local jupyter notebook

cv_file_path = r'C:\Users\DMA\Downloads\CV - 2024-1.pdf'

In [None]:
cv_file_path

In [None]:
def create_cv_retriever(file_path, k):
    pages = []

    if file_path.endswith('.pdf'):
        loader = PyPDFLoader(file_path)
    elif file_path.endswith('.docx'):
        loader = Docx2txtLoader(file_path)
    else:
        raise ValueError("Unsupported file type.")

    for page in loader.load():
        pages.append(page)

    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    texts = text_splitter.split_documents(pages)

    embeddings = OpenAIEmbeddings()
    db = FAISS.from_documents(texts, embeddings)

    retriever = db.as_retriever(search_kwargs={"k": k})

    return retriever

In [None]:
cv_retriever = create_cv_retriever(cv_file_path, 5)

In [None]:
cv_tool = create_retriever_tool(
    cv_retriever,
    "search_candidate_info",
    "Searches and returns candidate's profile with experience, education, and skills.",
)

In [None]:
# todo: 
# Free alternative for embeddings that doesn't use OpenAI

## 4. Hiring Company Info Scraper

In [None]:
def get_wikipedia_content(query):
    """Fetches content from Wikipedia based on a query."""
    wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
    wikipedia_content = wikipedia.run(query)
    return wikipedia_content

In [None]:
def get_websites_links(query):
    """Fetches a list of website links based on a search query using DuckDuckGo."""
    search = DuckDuckGoSearchResults(output_format="list")
    search_results = search.invoke(query)
    return [result["link"] for result in search_results]

In [None]:
def load_websites_content(websites):
    """Loads the HTML content of a list of websites."""
    content_list = []
    for website in websites:
        loader = AsyncChromiumLoader([website])
        html_content = loader.load()
        content_list.append(html_content)
    return content_list

In [None]:
def transform_html_content(html_content_list, tags = ["span", "p", "b", "h3", "h4"]):
    """Transforms HTML content to extract specific tags using BeautifulSoup."""
    transformed_content = []
    bs_transformer = BeautifulSoupTransformer()
    for html in html_content_list:
        docs_transformed = bs_transformer.transform_documents(html, tags_to_extract=tags)
        for doc in docs_transformed:
            transformed_content.append(doc.page_content)
    return transformed_content

In [None]:
def get_web_content(query):
    """Main function to gather content from Wikipedia and websites based on a query."""
    content = []

    wikipedia_content = get_wikipedia_content(query)
    content.append(wikipedia_content)

    website_links = get_websites_links(f"What is {query}?")

    html_content_list = load_websites_content(website_links)

    transformed_content = transform_html_content(html_content_list)

    content.extend(transformed_content)
    return content

In [None]:
query = "Deloitte Company"
websites_content = get_web_content(query)
websites_content

In [None]:
def create_company_info_retriever(websites_content, k):
    docs = []

    for website_content in websites_content:
        doc = Document(page_content=website_content)
        docs.append(doc)

    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    texts = text_splitter.split_documents(docs)

    embeddings = OpenAIEmbeddings()  # need a FOSS alternative
    db = FAISS.from_documents(texts, embeddings)

    retriever = db.as_retriever(search_kwargs={"k": k})

    return retriever

In [None]:
company_info_retriever = create_company_info_retriever(websites_content, 5)

In [None]:
# todo: update this tool so it gets correct data, this is copied from the cv

company_info_tool = create_retriever_tool(
    company_info_retriever,
    "search_company_info",
    "Searches and returns company's profile with company's details to be considered by HR Specialist.",
)

## 5. Querying a Dataset

This is an optional tool for enhancing the process of hard skills review.

The dataset can be changed depending on the needs of users.

In [None]:
ds = "https://www.kaggle.com/datasets/syedmharis/software-engineering-interview-questions-dataset"

In [None]:
def get_kaggle_ds(dataset_url):
    od.download(dataset_url)

In [None]:
# Load CSV

# Set the file path to the downloaded data and the encoding of the file
file_path = r"C:\Users\DMA\Downloads\Software Questions.csv"
encoding = "ISO-8859-1"  # default English encoding

loader = CSVLoader(file_path=file_path, encoding=encoding)
docs = loader.load()


In [None]:
# Define text splitter

text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(docs)


### 5.1a Using OpenAI Embeddings 

In [None]:
def create_questions_dataset_retriever(texts, k):
    embeddings = OpenAIEmbeddings()
    db = FAISS.from_documents(texts, embeddings)

    retriever = db.as_retriever(search_kwargs={"k": k})

    return retriever

In [None]:
create_questions_dataset_retriever(texts=texts, k=5)

### 5.1b Using HuggingFace Embeddings 

To represent each chunk as a high-dimensional vector, we’ll use Hugging Face's pre-trained model sentence-transformers/all-MiniLM-L6-v2. This model is efficient and well-suited for generating text embeddings.


We’ll define a simple helper class to handle embedding generation using the Hugging Face model.

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np

class HuggingFaceEmbeddings:
    def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2"):
        # Load the model and tokenizer from Hugging Face
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)

    def embed_texts(self, texts):
        # Generate embeddings for each text
        embeddings = []
        for text in texts:
            inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
            with torch.no_grad():
                outputs = self.model(**inputs)
                embeddings.append(outputs.last_hidden_state.mean(dim=1).squeeze().numpy())
        return np.array(embeddings)

Now, let’s generate embeddings for each of the text chunks.

In [None]:
# Initialize the embedding model
embeddings_model = HuggingFaceEmbeddings()

# Generate embeddings for each chunk of text
embeddings = embeddings_model.embed_texts([text.page_content for text in texts])

After this step, `embeddings` will contain a vector representation of each document chunk.

To make our embeddings searchable, we’ll use FAISS to create an index. This allows us to find the most similar embeddings to any query.

In [None]:
import faiss

# Initialize the FAISS index
embedding_dim = embeddings.shape[1]  # Dimension of embeddings
faiss_index = faiss.IndexFlatL2(embedding_dim)

# Add the embeddings to the FAISS index
faiss_index.add(embeddings)

Finally, we’ll define a `retriever` function that, given a query, will embed it and retrieve the most similar document chunks from the FAISS index.

In [None]:
def retriever(query, texts, embeddings_model, faiss_index, k=5):
    # Generate embedding for the query
    query_embedding = embeddings_model.embed_texts([query])[0]
    
    # Search FAISS index for the top-k similar chunks
    distances, indices = faiss_index.search(np.array([query_embedding]), k)
    
    # Retrieve the corresponding text chunks
    results = [texts[i].page_content for i in indices[0]]
    return results


For testing it:

In [None]:
# Define your query
query = "What is the topic of interest?"

# Call the retriever with the required arguments
results = retriever(query, texts, embeddings_model, faiss_index, k=5)

# Print the top results
print("Top similar chunks:")
for i, result in enumerate(results, 1):
    print(f"{i}. {result}")


### 5.2 Define the tool for agents

In [None]:
# todo: update this tool so its usable by agents

@tool
questions_database_tool = create_retriever_tool(
    create_questions_dataset_retriever,
    "search_subject_matter_questions",
    "Searches and returns subject matter questions for checking hard skills.",
)

-----

# Initialize Agents

In [None]:
# Todo:
# Need to test this with OAI key
# Test each of the tools are working

# Create LangGraph agents, give them roles, assign interactions and tools to each

# Implement user-agent interaction
# LangGraph - https://github.com/langchain-ai/langgraph/blob/main/docs/docs/how-tos/human_in_the_loop/wait-user-input.ipynb

# Add a FOSS alternative for models

In [None]:
llm = ChatOpenAI(model_name="gpt-4o")  # need a FOSS alternative

In [None]:
def display_input_form_with_return():
    # Capture inputs
    print("Invoice input")
    print("")
    voice_input = record_and_transcribe_candidate_answer()
    print("")
    print("")
    print("Text input")
    print("")
    written_input = record_and_submit_text()

    # Define what happens on submit
    def on_submit(button):
        clear_output()
        print("Submitted successfully. Moving to the next step...")

    # Create the submit button and link to the on_submit action
    print("")
    print("================================================")
    print("Please, click submit button to send your answers")
    print("")
    submit_button = widgets.Button(description="Submit")
    submit_button.on_click(on_submit)

    display(submit_button)

    if submit_button:
      return voice_input, written_input

In [None]:
voice, text_input = display_input_form_with_return()

In [None]:
answer = f"Answer: {voice.get('text', '') if voice else ''}\n\n{text_input.get('text', '') if text_input else ''}"
answer

In [None]:
def call_model(state: MessagesState):
    response = llm.invoke(state["messages"])
    return {"messages": response}

In [None]:
memory = MemorySaver()

In [None]:
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=memory)

In [None]:
from IPython.display import display, Image
from langchain_core.runnables.graph import MermaidDrawMethod

display(
    Image(
        graph.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
        )
    )
)

In [None]:
config = {"configurable": {"thread_id": "1"}}

In [None]:
input_message = {"type": "user", "content": answer}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
input_message = {"type": "user", "content": answer}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

# 3. Agents (DMA)

In [None]:
from langgraph.prebuilt import ToolNode

from pydantic import BaseModel

In [None]:
llm = ChatOpenAI(model_name="gpt-4o")  # need a FOSS alternative

In [None]:
tools = []
tool_node = ToolNode(tools)

In [None]:
# We are going "bind" all tools to the model
# We have the ACTUAL tools from above, but we also need a mock tool to ask a human
# Since `bind_tools` takes in tools but also just tool definitions,
# We can define a tool definition for `ask_human`
class AskHuman(BaseModel):
    """Ask the human a question"""

    question: str
  
llm = llm.bind_tools(tools + [AskHuman])

In [None]:
# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # If there is no function call, then we finish
    if not last_message.tool_calls:
        return "end"
    # If tool call is asking Human, we return that node
    # You could also add logic here to let some system know that there's something that requires Human input
    # For example, send a slack message, etc
    elif last_message.tool_calls[0]["name"] == "AskHuman":
        return "ask_human"
    # Otherwise if there is, we continue
    else:
        return "continue"


In [None]:
# Define the function that calls the llm
def call_model(state):
    messages = state["messages"]
    response = llm.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}

In [None]:
# We define a fake node to ask the human
def ask_human(state):
    pass

Build the graph

In [None]:
from langgraph.graph import END, StateGraph

In [None]:
# Define a new graph
workflow = StateGraph(MessagesState)

# Define the three nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

In [None]:
# We now add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
    # Finally we pass in a mapping.
    # The keys are strings, and the values are other nodes.
    # END is a special node marking that the graph should finish.
    # What will happen is we will call `should_continue`, and then the output of that
    # will be matched against the keys in this mapping.
    # Based on which one it matches, that node will then be called.
    {
        # If `tools`, then we call the tool node.
        "continue": "action",
        # We may ask the human
        "ask_human": "ask_human",
        # Otherwise we finish.
        "end": END,
    },
)

In [None]:
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")

# After we get back the human response, we go back to the agent
workflow.add_edge("ask_human", "agent")

In [None]:
# Set up memory
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

In [None]:
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
# We add a breakpoint BEFORE the `ask_human` node so it never executes
app = workflow.compile(checkpointer=memory, interrupt_before=["ask_human"])

In [None]:
display(Image(app.get_graph().draw_mermaid_png()))

In [None]:
from langchain_core.messages import HumanMessage

config = {"configurable": {"thread_id": "2"}}
input_message = HumanMessage(
    content="Ask the user where they are"
)
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
    event["messages"][-1].pretty_print()

In [None]:
tool_call_id = app.get_state(config).values["messages"][-1].tool_calls[0]["id"]

from langchain_core.messages import ToolMessage
tool_message = [ToolMessage(tool_call_id=tool_call_id, content="san francisco")]

# We now update the state
# Notice that we are also specifying `as_node="ask_human"`
# This will apply this update as this node,
# which will make it so that afterwards it continues as normal
app.update_state(config, {"messages": tool_message}, as_node="ask_human")

# We can check the state
# We can see that the state currently has the `agent` node next
# This is based on how we define our graph,
# where after the `ask_human` node goes (which we just triggered)
# there is an edge to the `agent` node
app.get_state(config).next

In [None]:
for event in app.stream(None, config, stream_mode="values"):
    event["messages"][-1].pretty_print()

In [None]:
# Retrieve the current state of the application
state = app.get_state(config).values

# Access the list of messages from the state
messages = state["messages"]

# Iterate through each message and print its content
for message in messages:
    print(f"{message.type.capitalize()} Message: {message.content}")


## 3.1. Define Agents & Roles

### 3.1.1. Define Base Agent class

In [None]:
from typing import List, Dict, Optional
from langchain.schema import HumanMessage, SystemMessage, AIMessage

class Agent:
    def __init__(self, name: str, role: str, skills: List[str], llm):
        """
        Base class for an agent.

        Args:
            name (str): The name of the agent.
            role (str): The role of the agent (e.g., HR, Manager, etc.).
            skills (List[str]): The list of skills the agent possesses.
            llm: A language model instance for generating responses.
        """
        self.name = name
        self.role = role
        self.skills = skills
        self.llm = llm

    def process_task(self, task: str, context: Optional[List[Dict[str, str]]] = None) -> str:
        """
        Process a task assigned to the agent.

        Args:
            task (str): The task or question to process.
            context (Optional[List[Dict[str, str]]]): Additional context for the task.

        Returns:
            str: The agent's response.
        """
        messages = [
            SystemMessage(content=f"You are {self.name}, a {self.role}. Your skills include: {', '.join(self.skills)}. Respond to the task based on your role and skills.")
        ]
        
        # Include provided context
        if context:
            for msg in context:
                if msg['role'] == 'human':
                    messages.append(HumanMessage(content=msg['content']))
                elif msg['role'] == 'ai':
                    messages.append(AIMessage(content=msg['content']))
        
        # Add the task as the final input
        messages.append(HumanMessage(content=task))
        
        # Generate response
        response = self.llm.invoke(messages)
        return response.content


### 3.1.1. Define Orchestrator Agent

In [None]:
class OrchestratorAgent(Agent):
    def __init__(self, name: str, role: str, skills: List[str], llm):
        super().__init__(name, role, skills, llm)

    def generate_scenario(self, job_description: str, company_info: str, candidate_cv: str) -> Dict:
        """
        Generate a scenario for the interview based on input data.

        Args:
            job_description (str): The job description.
            company_info (str): Information about the company.
            candidate_cv (str): The candidate's CV.

        Returns:
            Dict: A structured interview scenario.
        """
        task = (
            "Analyze the job description, company information, and candidate CV to generate an interview scenario. "
            "Include priorities, suggested questions, and areas to focus on."
        )
        context = [
            {"role": "human", "content": f"Job Description: {job_description}"},
            {"role": "human", "content": f"Company Info: {company_info}"},
            {"role": "human", "content": f"Candidate CV: {candidate_cv}"}
        ]
        response = self.process_task(task, context)
        return response  # This should return a structured JSON or text output

    def assign_tasks(self, graph, scenario: Dict):
        """
        Assign tasks to other agents based on the generated scenario.

        Args:
            graph: LangGraph graph instance.
            scenario (Dict): The structured interview scenario.
        """
        # Extract priorities from the scenario
        for agent_task in scenario.get("agents", []):
            agent_name = agent_task["role"]
            task = agent_task["tasks"]
            
            # Dynamically find the corresponding agent node
            agent_node = graph.get_node(agent_name)
            if agent_node:
                graph.add_edge(self, agent_node, task=task)

    def process_feedback(self, feedback: Dict) -> str:
        """
        Process feedback from agents to synthesize a final report.

        Args:
            feedback (Dict): Feedback data from agents.

        Returns:
            str: A synthesized report on the candidate.
        """
        task = "Synthesize the following feedback into a unified candidate assessment report."
        context = [{"role": "human", "content": f"Feedback: {feedback}"}]
        return self.process_task(task, context)


### 3.1.2. Define HR Agent

In [None]:
class HumanResourcesAgent(Agent):
    def __init__(self, llm):
        """
        Initialize the Human Resources Agent with specific skills.
        """
        super().__init__(
            name="Jessica Taylor",
            role="Human Resources Specialist",
            skills=[
                "Sourcing Candidates",
                "Screening Resumes",
                "Interviewing Techniques",
                "Candidate Assessment",
                "Offer Negotiation",
                "Reference Checking",
                "Talent Pool Development",
                "Job Description Writing",
                "Onboarding Coordination",
                "Effective Communication",
                "Active Listening",
                "Empathy and Rapport Building",
                "Persuasion and Influence",
            ],
            llm=llm
        )

    def generate_behavioral_questions(self, job_description: str, company_values: str) -> List[str]:
        """
        Generate a list of behavioral questions based on the job description and company values.

        Args:
            job_description (str): The job description.
            company_values (str): The company's core values.

        Returns:
            List[str]: A list of behavioral interview questions.
        """
        task = (
            "Generate behavioral interview questions tailored to the following job description "
            "and company values. Focus on assessing adaptability, teamwork, and problem-solving skills."
        )
        context = [
            {"role": "human", "content": f"Job Description: {job_description}"},
            {"role": "human", "content": f"Company Values: {company_values}"}
        ]
        response = self.process_task(task, context)
        return response.split("\n")  # Assuming questions are returned as a newline-separated string

    def ensure_compliance(self, questions: List[str]) -> List[str]:
        """
        Check the list of questions for compliance with non-discrimination policies.

        Args:
            questions (List[str]): The list of interview questions.

        Returns:
            List[str]: A filtered list of compliant questions.
        """
        task = (
            "Review the following list of interview questions for compliance with "
            "non-discrimination policies. Remove or revise any questions that could be biased."
        )
        context = [{"role": "human", "content": f"Questions: {', '.join(questions)}"}]
        response = self.process_task(task, context)
        return response.split("\n")

    def generate_interview_agenda(self, role_requirements: str, company_info: str) -> str:
        """
        Generate a structured interview agenda based on the role requirements and company information.

        Args:
            role_requirements (str): The role requirements.
            company_info (str): Information about the company.

        Returns:
            str: A detailed interview agenda.
        """
        task = (
            "Create a structured interview agenda for the role described below. Include time allocations "
            "for introductions, behavioral questions, technical questions, and a Q&A session."
        )
        context = [
            {"role": "human", "content": f"Role Requirements: {role_requirements}"},
            {"role": "human", "content": f"Company Information: {company_info}"}
        ]
        response = self.process_task(task, context)
        return response


### 3.1.3. Define Manager Agent

In [None]:
class ManagerAgent(Agent):
    def __init__(self, llm):
        """
        Initialize the Hiring Manager Agent with specific skills.
        """
        super().__init__(
            name="Michael Brown",
            role="Hiring Manager",
            skills=[
                # Core Hiring and Candidate Assessment Skills
                "Interviewing Techniques",
                "Candidate Evaluation",
                "Behavioral Assessment",
                "Decision-Making",
                "Reference Checking",
                # Soft Skills
                "Effective Communication",
                "Active Listening",
                "Empathy and Rapport Building",
                "Time Management",
                # Technical Skills
                "Proficiency in Applicant Tracking Systems (ATS)",
                "Data-Driven Hiring Decisions"
            ],
            llm=llm
        )

    def assess_cultural_fit(self, candidate_answers: str, company_values: str) -> str:
        """
        Assess the candidate's cultural fit based on their answers and the company's values.

        Args:
            candidate_answers (str): Candidate's responses to cultural fit questions.
            company_values (str): The company's core values.

        Returns:
            str: An evaluation of the candidate's cultural fit.
        """
        task = (
            "Based on the following candidate responses, evaluate how well they align with "
            "the company's core values."
        )
        context = [
            {"role": "human", "content": f"Candidate Answers: {candidate_answers}"},
            {"role": "human", "content": f"Company Values: {company_values}"}
        ]
        response = self.process_task(task, context)
        return response

    def evaluate_leadership_potential(self, scenario_response: str) -> str:
        """
        Evaluate the candidate's leadership potential based on their response to a scenario.

        Args:
            scenario_response (str): The candidate's response to a leadership scenario.

        Returns:
            str: An assessment of the candidate's leadership potential.
        """
        task = (
            "Evaluate the candidate's leadership potential based on their response to the following scenario. "
            "Focus on their decision-making, problem-solving, and ability to inspire others."
        )
        context = [{"role": "human", "content": f"Scenario Response: {scenario_response}"}]
        response = self.process_task(task, context)
        return response

    def make_data_driven_decision(self, candidate_metrics: Dict[str, float]) -> str:
        """
        Make a hiring recommendation based on candidate metrics.

        Args:
            candidate_metrics (Dict[str, float]): A dictionary of candidate metrics (e.g., skills, experience, cultural fit).

        Returns:
            str: A hiring recommendation.
        """
        task = (
            "Based on the following candidate metrics, make a data-driven recommendation on whether to move forward with this candidate."
        )
        context = [{"role": "human", "content": f"Candidate Metrics: {candidate_metrics}"}]
        response = self.process_task(task, context)
        return response

    def summarize_interview_feedback(self, feedback: List[Dict[str, str]]) -> str:
        """
        Summarize feedback from multiple interviewers into a cohesive report.

        Args:
            feedback (List[Dict[str, str]]): A list of feedback dictionaries from other interviewers.

        Returns:
            str: A cohesive summary of the feedback.
        """
        task = (
            "Summarize the feedback from the following interviewers into a cohesive report. "
            "Highlight the candidate's strengths, weaknesses, and overall fit for the role."
        )
        context = [{"role": "human", "content": f"Feedback: {feedback}"}]
        response = self.process_task(task, context)
        return response


### 3.1.4. Define Field Specialist Agent

Would be cool to have the specific role and skills be generated depending on the field wrt the interview, maybe from another agent

In [None]:
class FieldSpecialistAgent(Agent):
    def __init__(self, llm):
        """
        Initialize the Field Specialist Agent with specific skills.
        """
        super().__init__(
            name="Emily Johnson",
            role="Field Specialist",
            skills=[
                # Core Technical and Functional Skills
                "Technical Expertise in Field Operations",
                "Problem-Solving in Real-Time Scenarios",
                "Data Collection and Reporting",
                "Equipment Handling and Maintenance",
                "Compliance with Safety Standards",
                # Collaboration and Teamwork
                "Team Coordination",
                "Effective Communication",
                "Adaptability",
                "Conflict Resolution",
                # Technical Skills
                "Proficiency in Field-Specific Software",
                "Report Writing and Documentation"
            ],
            llm=llm
        )

    def generate_technical_questions(self, role_specific_requirements: str) -> List[str]:
        """
        Generate technical questions based on role-specific requirements.

        Args:
            role_specific_requirements (str): The technical skills and responsibilities of the role.

        Returns:
            List[str]: A list of technical interview questions.
        """
        task = (
            "Create a list of technical questions tailored to the following role-specific requirements. "
            "Focus on assessing the candidate's expertise in practical, field-specific scenarios."
        )
        context = [{"role": "human", "content": f"Role Requirements: {role_specific_requirements}"}]
        response = self.process_task(task, context)
        return response.split("\n")  # Assuming questions are returned as a newline-separated string

    def simulate_real_world_scenario(self, scenario_description: str) -> str:
        """
        Simulate a real-world scenario and provide the candidate with a task to solve.

        Args:
            scenario_description (str): A description of the real-world scenario.

        Returns:
            str: A description of the simulated task for the candidate.
        """
        task = (
            "Design a real-world scenario based on the description below. Provide a detailed task for the candidate to solve, "
            "focusing on their problem-solving skills and adaptability."
        )
        context = [{"role": "human", "content": f"Scenario Description: {scenario_description}"}]
        response = self.process_task(task, context)
        return response

    def evaluate_adaptability(self, candidate_response: str) -> str:
        """
        Evaluate the candidate's adaptability based on their response to a scenario.

        Args:
            candidate_response (str): The candidate's response to a field-specific scenario.

        Returns:
            str: An evaluation of the candidate's adaptability.
        """
        task = (
            "Evaluate the candidate's adaptability based on their response to the following field-specific scenario. "
            "Focus on their ability to adjust to unexpected changes and find effective solutions."
        )
        context = [{"role": "human", "content": f"Candidate Response: {candidate_response}"}]
        response = self.process_task(task, context)
        return response

    def review_certifications(self, certifications: List[str]) -> str:
        """
        Review and validate the candidate's certifications for the role.

        Args:
            certifications (List[str]): A list of the candidate's certifications.

        Returns:
            str: An assessment of the relevance and validity of the certifications.
        """
        task = (
            "Review the following certifications to determine their relevance and validity for the role. "
            "Provide feedback on their applicability to the field."
        )
        context = [{"role": "human", "content": f"Certifications: {', '.join(certifications)}"}]
        response = self.process_task(task, context)
        return response


## Example workflow:

### Scenario Overview

#### Job Description:
"Senior Field Operations Engineer responsible for managing industrial equipment, leading on-site teams, and troubleshooting hydraulic and mechanical systems under tight deadlines."

#### Company Values:
"Innovation, Teamwork, Adaptability."

#### Candidate's Background:
* Experience: 8 years in field operations.
* Skills: Troubleshooting mechanical systems, team leadership, safety compliance.
* Certifications: Certified Field Technician, OSHA Safety Certification.

### Initiate agents

In [None]:
orchestrator = OrchestratorAgent(
    name="Orchestrator",
    role="Coordinator",
    skills=["Scenario Planning", "Task Delegation", "Feedback Synthesis"],
    llm=llm
)

hr_agent = HumanResourcesAgent(llm)
manager_agent = ManagerAgent(llm)
field_specialist_agent = FieldSpecialistAgent(llm)

### Create Graph

In [None]:
# Define the workflow graph
workflow = StateGraph(MessagesState)


In [None]:
# Nodes (Tasks)
def collect_candidate_details(state):
    job_description = state.get("job_description", "Senior Software Engineer for backend development.")
    company_info = state.get("company_info", "TechCorp values innovation, collaboration, and customer focus.")
    candidate_cv = state.get("candidate_cv", "Experienced backend engineer with Python and cloud expertise.")
    return {"job_description": job_description, "company_info": company_info, "candidate_cv": candidate_cv}

def orchestrator_generate_scenario(state):
    scenario = orchestrator.generate_scenario(
        state["job_description"], state["company_info"], state["candidate_cv"]
    )
    return {"scenario": scenario}

def hr_generate_questions(state):
    behavioral_questions = hr_agent.generate_behavioral_questions(
        state["job_description"], state["company_info"]
    )
    return {"behavioral_questions": behavioral_questions}

def manager_evaluate_candidate(state):
    cultural_fit = manager_agent.assess_cultural_fit(
        candidate_answers="I believe in open communication and teamwork.",
        company_values=state["company_info"],
    )
    leadership_potential = manager_agent.evaluate_leadership_potential(
        scenario_response="I prioritize clear delegation and support team members during challenges."
    )
    return {"cultural_fit": cultural_fit, "leadership_potential": leadership_potential}

def field_specialist_tasks(state):
    technical_questions = field_specialist_agent.generate_technical_questions(
        role_specific_requirements=state["job_description"]
    )
    adaptability_evaluation = field_specialist_agent.evaluate_adaptability(
        candidate_response="I inspected system pressure levels and found obstructions in valves."
    )
    return {
        "technical_questions": technical_questions,
        "adaptability_evaluation": adaptability_evaluation,
    }

def orchestrator_synthesize_feedback(state):
    feedback = {
        "HR Feedback": state.get("behavioral_questions"),
        "Manager Feedback": {
            "Cultural Fit": state.get("cultural_fit"),
            "Leadership Potential": state.get("leadership_potential"),
        },
        "Field Specialist Feedback": {
            "Technical Questions": state.get("technical_questions"),
            "Adaptability Evaluation": state.get("adaptability_evaluation"),
        },
    }
    final_report = orchestrator.process_feedback(feedback)
    return {"final_report": final_report}

In [None]:
# Create the graph
workflow.add_node("collect_candidate_details", collect_candidate_details)
workflow.add_node("orchestrator_generate_scenario", orchestrator_generate_scenario)
workflow.add_node("hr_generate_questions", hr_generate_questions)
workflow.add_node("manager_evaluate_candidate", manager_evaluate_candidate)
workflow.add_node("field_specialist_tasks", field_specialist_tasks)
workflow.add_node("orchestrator_synthesize_feedback", orchestrator_synthesize_feedback)

# Define the workflow edges
workflow.set_entry_point("collect_candidate_details")
workflow.add_edge("collect_candidate_details", "orchestrator_generate_scenario")
workflow.add_edge("orchestrator_generate_scenario", "hr_generate_questions")
workflow.add_edge("orchestrator_generate_scenario", "manager_evaluate_candidate")
workflow.add_edge("orchestrator_generate_scenario", "field_specialist_tasks")
workflow.add_edge("hr_generate_questions", "orchestrator_synthesize_feedback")
workflow.add_edge("manager_evaluate_candidate", "orchestrator_synthesize_feedback")
workflow.add_edge("field_specialist_tasks", "orchestrator_synthesize_feedback")
workflow.add_edge("orchestrator_synthesize_feedback", END)


In [None]:
app = workflow.compile()

In [None]:
display(Image(app.get_graph().draw_mermaid_png()))

In [None]:
# Example run
state = app.run({
    "job_description": "Senior Software Engineer for backend development.",
    "company_info": "TechCorp values innovation, collaboration, and customer focus.",
    "candidate_cv": "Experienced backend engineer with Python and cloud expertise.",
})

In [None]:
print(state["final_report"])


# Agents 2 (DMA)


In [None]:
import functools
import operator
from typing import Annotated, Sequence, TypedDict

from colorama import Fore, Style
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph

Define Names of the Agents

These are just strings but as we’ll have to type each of these strings multiple times it will be very annoying if we change or mistype one, hence storing these in a single place up top is the way to go.

In [None]:
HR_AGENT_NAME = "hr_agent"
MANAGER_NAME = "manager_agent"
SPECIALIST_NAME = "specialist_agent"
FEEDBACK_NAME = "feedback_agent"
ORCHESTRATOR_NAME = "orchestrator"
MEMBERS = [HR_AGENT_NAME, MANAGER_NAME, SPECIALIST_NAME, FEEDBACK_NAME]
OPTIONS = ["FINISH"] + MEMBERS

In [None]:
llm = ChatOpenAI(model_name="gpt-4o")  # need a FOSS alternative

In [None]:
def create_agent(llm: BaseChatModel, tools: list, system_prompt: str):
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt_template)
    agent_executor = AgentExecutor(agent=agent, tools=tools)  # type: ignore
    return agent_executor

In [None]:
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    next: str

In [None]:
def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

In [None]:
router_function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "next",
                "anyOf": [
                    {"enum": OPTIONS},
                ],
            }
        },
        "required": ["next"],
    },
}

Define Orchestrator Agent

In [None]:
# todo: 
# update placeholder for our project

ORCHESTRATOR_SYSTEM_PROMPT = """
You are a supervisor tasked with managing a conversation between the following workers: {members}. Given the following user request, respond with the worker to act next. Each worker will perform a task and respond with their results and status. The end goal is to provide a good travel itinerary for the user, with things to see and do, practical tips on how to deal with language difficulties, and a nice visualization that goes with the travel plan (in the form of an image path, the visualizer will save the image for you and you only need the path).

Make sure you call on each team member ({members}) at least once. Do not call the visualizer again if you've already received an image file path. Do not call any team member a second time unless they didn't provide enough details or a valid response and you need them to redo their work. When finished, respond with FINISH, but before you do, make sure you have a travel itinerary, language tips for the location, and an image file-path. If you don't have all of these, call the appropriate team member to get the missing information.
"""

This time we have three messages. The first is the ORCHESTRATOR_SYSTEM_PROMPT we defined above. The second is a MessagesPlaceholder for the messages variable (conversation context so far) and the third is a short system message that reminds the team supervisor what it’s task is and what options it has available to choose from.

In [None]:
orchestrator_prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", ORCHESTRATOR_SYSTEM_PROMPT),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=", ".join(OPTIONS), members=", ".join(MEMBERS))

So the orchastrator is basically going to act like a router between our agents, deciding who is up next. 

In [None]:
orchestrator_chain = (
    orchestrator_prompt_template
    | llm.bind_tools(tools=[router_function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

Define rest of the agents

System prompts

In [None]:
# todo: 
# update placeholder for our project

HR_SYSTEM_PROMPT = """
You are a helpful assistant that can suggest and review travel itinerary plans, providing critical feedback on how the trip can be enriched for enjoyment of the local culture. If the plan already includes local experiences, you can mention that the plan is satisfactory, with rationale.

Assume a general interest in popular tourist destinations and local culture, do not ask the user any follow-up questions.

You have access to a web search function for additional or up-to-date research if needed. You are not required to use this if you already have sufficient information to answer the question.
"""

MANAGER_SYSTEM_PROMPT = """
You are a helpful assistant that can review travel plans, providing feedback on important/critical tips about how best to address language or communication challenges for the given destination. If the plan already includes language tips, you can mention that the plan is satisfactory, with rationale.

You have access to a web search function for additional or up-to-date research if needed. You are not required to use this if you already have sufficient information to answer the question.
"""

SPECIALIST_SYSTEM_PROMPT = """
You are a helpful assistant that can generate images based on a detailed description. You are part of a travel agent team and your job is to look at the location and travel itinerary and then generate an appropriate image to go with the travel plan. You have access to a function that will generate the image as long as you provide a good description including the location and visual characteristics of the image you want to generate. This function will download the image and return the path of the image file to you.

Make sure you provide the image, and then communicate back as your response only the path to the image file you generated. You do not need to give any other textual feedback, just the path to the image file.
"""

FEEDBACK_SYSTEM_PROMPT = """
You are a helpful assistant that can generate images based on a detailed description. You are part of a travel agent team and your job is to look at the location and travel itinerary and then generate an appropriate image to go with the travel plan. You have access to a function that will generate the image as long as you provide a good description including the location and visual characteristics of the image you want to generate. This function will download the image and return the path of the image file to you.

Make sure you provide the image, and then communicate back as your response only the path to the image file you generated. You do not need to give any other textual feedback, just the path to the image file.
"""

Define agents

In [None]:
# todo: 
# Add list of tools each agent can use

hr_agent = create_agent(llm=llm, tools=[], system_prompt=HR_SYSTEM_PROMPT)
hr_agent_node = functools.partial(
    agent_node, agent=hr_agent, name=HR_AGENT_NAME
)

manager_agent = create_agent(llm=llm, tools=[], system_prompt=MANAGER_SYSTEM_PROMPT)
manager_agent_node = functools.partial(
    agent_node, agent=manager_agent, name=MANAGER_NAME
)

specialist_agent = create_agent(llm=llm, tools=[], system_prompt=SPECIALIST_SYSTEM_PROMPT)
specialist_agent_node = functools.partial(
    agent_node, agent=specialist_agent, name=SPECIALIST_NAME
)

feedback_agent = create_agent(llm=llm, tools=[], system_prompt=FEEDBACK_SYSTEM_PROMPT)
feedback_agent_node = functools.partial(
    agent_node, agent=feedback_agent, name=FEEDBACK_NAME
)

Create the graph

In [None]:
workflow = StateGraph(AgentState)
workflow.add_node(HR_AGENT_NAME, hr_agent_node)
workflow.add_node(MANAGER_NAME, manager_agent_node)
workflow.add_node(SPECIALIST_NAME, specialist_agent_node)
workflow.add_node(FEEDBACK_NAME, feedback_agent_node)
workflow.add_node(ORCHESTRATOR_NAME, orchestrator_chain)

for member in MEMBERS:
    workflow.add_edge(member, ORCHESTRATOR_NAME)

workflow.add_edge(FEEDBACK_NAME, END)

In [None]:
conditional_map = {name: name for name in MEMBERS}
conditional_map["FINISH"] = FEEDBACK_NAME
workflow.add_conditional_edges(
    ORCHESTRATOR_NAME, lambda x: x["next"], conditional_map
)

In [None]:
workflow.set_entry_point(ORCHESTRATOR_NAME)

travel_agent_graph = workflow.compile()

Visualize graph

In [None]:
display(Image(travel_agent_graph.get_graph().draw_mermaid_png()))

# Agents 3 (DMA) - I'm getting there OK

## Workflow Structure

- **Orchestration Stage:**
  - **Information Gathering:**
    - Obtain user documents (CV, Job description) using the `FileUpload` tool.
    - Retrieve company information via the `WebScraper` tool.
    - Create a "Scene" to outline the interview structure.

- **Interview Stages:**
  - **HR Stage:** Introduce the user, ask general and behavioral questions, and evaluate responses.
  - **Manager Stage:** Introduce the team, ask role-specific questions, and evaluate responses.
  - **Technical/Field Expert Stage:** Pose field-specific questions, assess technical knowledge, and evaluate responses.

- **Feedback Stage:** Compile evaluations from all stages and generate final feedback for the user.

---

## Graphs and Subgraphs

### Subgraphs
Each stage is implemented as a **subgraph**, encapsulating its sub-stages (nodes) and logic. Each task is a node.

### Parent Graph
A **parent graph** orchestrates the overall workflow by chaining the subgraphs. It also manages data flow between stages through transformers or shared state objects.

Example of a parent graph:
```python
parent_graph = StateGraph(BaseModel)

# Add Orchestration subgraph
parent_graph.add_subgraph('orchestration', orchestration_graph, OrchestrationState)

# Add HR stage subgraph
parent_graph.add_subgraph('hr', hr_graph, HRState)

# Define the flow between subgraphs
def transfer_to_hr(orchestration_state: OrchestrationState) -> HRState:
    return HRState(
        cv=orchestration_state.user_documents.get("CV", ""),
        job_description=orchestration_state.user_documents.get("Job Description", ""),
        company_info=orchestration_state.company_info,
        scene=orchestration_state.scene
    )

parent_graph.add_edge('orchestration', 'hr', transformer=transfer_to_hr)
parent_graph.add_edge('hr', END)
```

---

## `AgentState` and Data Flow

**Utilizing Multiple `AgentState` Classes:**

Each stage can have its own `AgentState` class to encapsulate relevant data, enhancing modularity and clarity. For example:

- **Orchestration State:**
  ```python
  from pydantic import BaseModel

  class OrchestrationState(BaseModel):
      user_documents: dict = {}
      company_info: str = ""
      scene: str = ""
  ```

- **Interview State:**
  ```python
  from pydantic import BaseModel

  class InterviewState(BaseModel):
      scene: str = ""
      user_documents: dict = {}
      company_info: str = ""
      questions: list = []
      user_responses: dict = {}
      evaluation: str = ""
  ```

### How Data Flows Between Stages
- Use **transformers** to extract data from one state and initialize the next.
- Example: Passing data from `OrchestrationState` to `InterviewState`.

```python
# define orchestration_graph
# final output result_orchestration_state 

# Extract data from OrchestrationState
hr_initial_state = InterviewState(
    cv=result_orchestration_state.user_documents.get("CV", ""),
    job_description=result_orchestration_state.user_documents.get("Job Description", ""),
    company_info=result_orchestration_state.company_info,
    scene=result_orchestration_state.scene
)

# define hr_graph
# final output result_hr_state 

# Execute HR graph with pre-initialized starting state
result_hr_state = hr_app.invoke(hr_initial_state)

# ----- #

# Combining Orchestration and HR 'apps'
# Define a parent graph
parent_graph = StateGraph(BaseModel)

# Add Orchestration as a subgraph
parent_graph.add_subgraph('orchestration', orchestration_graph, OrchestrationState)

# Add HR stage as a subgraph
parent_graph.add_subgraph('hr', hr_graph, InterviewState)

# Define data transfer and flow
def transfer_to_hr(orchestration_state: OrchestrationState) -> HRState:
    return HRState(
        cv=orchestration_state.user_documents.get("CV", ""),
        job_description=orchestration_state.user_documents.get("Job Description", ""),
        company_info=orchestration_state.company_info,
        scene=orchestration_state.scene
    )

parent_graph.add_edge('orchestration', 'hr', transformer=transfer_to_hr)
parent_graph.add_edge('hr', END)
```

Imports

In [None]:
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool

Initialize models

In [None]:
# OAI only for now, need to add HF
llm = ChatOpenAI(model_name="gpt-4o-mini")

## 2.1. Define Orchestration State

In [None]:
class OrchestrationState(BaseModel):
    user_documents: dict = Field(default_factory=dict)
    company_info: str = ""
    scene: str = ""

Define the tools for the Orchestration stage

Tools (placeholders)

In [None]:
@tool
def FileUpload(description: str) -> dict:
    """
    Simulate a file upload process.

    Args:
        description (str): A brief description of the file upload context.

    Returns:
        dict: A dictionary containing simulated content for 'CV' and 'Job Description'.
    """
    return {"CV": "Candidate's CV content...", "Job Description": "Job description content..."}

@tool
def WebScraper(url: str) -> str:
    """
    Simulate web scraping to extract company information.

    Args:
        url (str): The URL of the website to scrape.

    Returns:
        str: Simulated company information extracted from the website.
    """
    return "Company information scraped from website..."

@tool
def UserInput(prompt: str) -> str:
    """
    Prompt the user for input.

    Args:
        prompt (str): The message displayed to the user.

    Returns:
        str: The user's input as a string.
    """
    return input(prompt)


Define Nodes for the three sub-stages of the Orchestration stage:
- Document upload
- Scrape information about company
- Create a "Scene" to outline the interview structure.


In [None]:
# Define the system prompt for the Scene
# placeholder

GENERATE_SCENE_SYSTEM_PROMPT = """
    Based on the following:
    - CV: {cv}
    - Job Description: {job_description}
    - Company Info: {company_info}

    Generate a "scene" for conducting an interview.
"""

In [None]:
# File Upload Node
def upload_documents(state: OrchestrationState):
    state.user_documents = FileUpload("Upload user documents (CV, job description).")
    return state

# Web Scraper Node
def scrape_company_info(state: OrchestrationState):
    state.company_info = WebScraper("https://company.website")
    return state

# Scene Generation Node
# todo: update system prompt
def generate_scene(state: OrchestrationState):
    cv = state.user_documents.get("CV", "")
    job_description = state.user_documents.get("Job Description", "")
    company_info = state.company_info

    system_prompt = GENERATE_SCENE_SYSTEM_PROMPT
    response = llm([{"role": "system", "content": system_prompt}])
    state.scene = response.content.strip()
    return state

Create the graph for the orchestration stage

In [None]:
# Create the graph for orchestration
orchestration_graph = StateGraph(OrchestrationState)

# Add the nodes for each of the "sub-stages"
orchestration_graph.add_node('upload_documents', upload_documents)
orchestration_graph.add_node('scrape_company_info', scrape_company_info)
orchestration_graph.add_node('generate_scene', generate_scene)

# Define the flow of tasks
orchestration_graph.add_edge(START, 'upload_documents')
orchestration_graph.add_edge('upload_documents', 'scrape_company_info')
orchestration_graph.add_edge('scrape_company_info', 'generate_scene')
orchestration_graph.add_edge('generate_scene', END)

# Compile the orchestration graph
orchestration_app = orchestration_graph.compile()

Visualize orchestration graph

In [None]:
display(Image(orchestration_app.get_graph().draw_mermaid_png()))

In [None]:
initial_orchestration_state = OrchestrationState()
result_orchestration_state = orchestration_app.invoke(initial_orchestration_state)
result_orchestration_state = OrchestrationState(**result_orchestration_state)
print(type(result_orchestration_state))
print(result_orchestration_state.scene)


## 2.2. Interview Stage

In [None]:
class InterviewState(BaseModel):
    scene: str = ""
    user_documents: dict = {}
    company_info: str = ""
    questions: list = []
    user_responses: dict = {}
    evaluation: str = ""

### 2.2.1. Transfer data from previous stage

To ensure seamless data transfer from the Orchestration graph to the Interview sub-graphs, we need to:
1. Extract relevant data from the OrchestrationState (e.g., `User Documents`, `Company Info`, `Scene`).
2. Transform it into an appropriate format for the HRState.
3. Use a transformer function in the parent graph to bridge the data between the two sub-graphs.

In [None]:
def transfer_to_interview(orchestration_state: OrchestrationState) -> InterviewState:
    return InterviewState(
        cv=orchestration_state.user_documents.get("CV", ""),
        job_description=orchestration_state.user_documents.get("Job Description", ""),
        company_info=orchestration_state.company_info,
        scene=orchestration_state.scene
    )


### 2.2.1. Define tools for interview stage

In [None]:
@tool
def UserInput(prompt: str) -> str:
    """
    Simulates user input for demonstration purposes.

    Args:
        prompt (str): The message displayed to the user prompting them for input.

    Returns:
        str: The user's input as a string.
    
    Note:
        This function uses Python's `input()` function to capture input. In a real-world scenario, 
        it would be used to obtain actual user input interactively.
    """
    return input(prompt)

There will be three sub-stages within the Interview stage: 
- The HR interview
- The Manager interview
- The technical interview

These are quite similar, so they share the base State (InterviewState) and the tools. 

For each of the sub-stages, there are going to be the following tasks, which we will define as nodes for our graph: 
1. Introduction: Introduces the candidate and sets the stage for the interview.
2. Generate Questions Node: Uses the `Scene`, `User Documents`, and `Company Information` to generate a list of interview questions.
3. Ask Questions Node: Iterates through the generated questions, asks the user, and records responses.
4. Write Evaluation Node: Summarizes the candidate’s responses and writes an evaluation.


### 2.2.2. HR Interview sub-stage

#### Nodes

In [None]:
# Introduction
def introduction(state: InterviewState):
    print(f"Welcome to the interview! Here's an overview: {state.scene}")
    return state

In [None]:
# Generating questions

def generate_questions(state: InterviewState):
    system_prompt = f"""
    You are an HR assistant conducting an interview.
    Use the following information to generate between 1 and 3 tailored questions:

    Scene: {state.scene}
    CV: {state.cv}
    Job Description: {state.job_description}
    Company Information: {state.company_info}
"""
    response = llm([{"role": "system", "content": system_prompt}])
    state.questions = response.content.strip().split('\n')[:3]  # list slicing to cut off at max 3 questions
    return state

In [None]:
# Ask questions, using the UserInput tool

def ask_questions(state: InterviewState):
    while state.questions:
        question = state.questions.pop(0)
        response = UserInput(question)
        state.user_responses[question] = response
    return state

In [None]:
# Writing the final evaluation

def write_evaluation(state: InterviewState):
    system_prompt = f"""
    Based on the following user responses, write a brief evaluation of the candidate:

    Responses: {state.user_responses}
    """
    response = llm([{"role": "system", "content": system_prompt}])
    state.evaluation = response.content.strip()
    print("Evaluation:", state.evaluation)
    return state


#### Subgraph

In [None]:
# Create the HR sub-graph
hr_graph = StateGraph(InterviewState)

# Add nodes
hr_graph.add_node('introduction', introduction)
hr_graph.add_node('generate_questions', generate_questions)
hr_graph.add_node('ask_questions', ask_questions)
hr_graph.add_node('write_evaluation', write_evaluation)

# Define edges
hr_graph.add_edge(START, 'introduction')
hr_graph.add_edge('introduction', 'generate_questions')
hr_graph.add_edge('generate_questions', 'ask_questions')
hr_graph.add_edge('ask_questions', 'write_evaluation')
hr_graph.add_edge('write_evaluation', END)

# Compile the HR graph
hr_app = hr_graph.compile()

In [None]:
# visualize graph

display(Image(hr_app.get_graph().draw_mermaid_png()))

### 2.2.3. Manager Interview Sub-Stage

#### Nodes

In [None]:
# Introduction
def introduction(state: InterviewState):
    print(f"Welcome to the interview! Here's an overview: {state.scene}")
    return state

In [None]:
# Generating questions

def generate_questions(state: InterviewState):
    system_prompt = f"""
    You are an HR assistant conducting an interview.
    Use the following information to generate between 1 and 3 tailored questions:

    Scene: {state.scene}
    CV: {state.cv}
    Job Description: {state.job_description}
    Company Information: {state.company_info}
"""
    response = llm([{"role": "system", "content": system_prompt}])
    state.questions = response.content.strip().split('\n')[:3]  # list slicing to cut off at max 3 questions
    return state

In [None]:
# Ask questions, using the UserInput tool

def ask_questions(state: InterviewState):
    while state.questions:
        question = state.questions.pop(0)
        response = UserInput(question)
        state.user_responses[question] = response
    return state

In [None]:
# Writing the final evaluation

def write_evaluation(state: InterviewState):
    system_prompt = f"""
    Based on the following user responses, write a brief evaluation of the candidate:

    Responses: {state.user_responses}
    """
    response = llm([{"role": "system", "content": system_prompt}])
    state.evaluation = response.content.strip()
    print("Evaluation:", state.evaluation)
    return state


#### Subgraph

In [None]:
# Create the Manager sub-graph
manager_graph = StateGraph(InterviewState)

# Add nodes
manager_graph.add_node('introduction', introduction)
manager_graph.add_node('generate_questions', generate_questions)
manager_graph.add_node('ask_questions', ask_questions)
manager_graph.add_node('write_evaluation', write_evaluation)

# Define edges
manager_graph.add_edge(START, 'introduction')
manager_graph.add_edge('introduction', 'generate_questions')
manager_graph.add_edge('generate_questions', 'ask_questions')
manager_graph.add_edge('ask_questions', 'write_evaluation')
manager_graph.add_edge('write_evaluation', END)

# Compile the Manager graph
manager_app = manager_graph.compile()

In [None]:
# visualize graph

display(Image(manager_app.get_graph().draw_mermaid_png()))

-----
# Stretch goal: TTS

Example:

Define model and TTS pipelines

In [None]:
from transformers import pipeline

# Load the TTS model
tts_pipeline = pipeline("text-to-speech", model="espnet/kan-bayashi_ljspeech_vits")


Generate and Play Text with TTS in Real-Time

Create a loop where the language model generates text in small chunks. Each chunk will be converted to speech and played immediately.

In [None]:
import IPython.display as ipd

def generate_and_play_text(prompt, max_chunks=5, chunk_size=50):
    generated_text = ""
    
    # Generate text in chunks
    for _ in range(max_chunks):
        # Generate a chunk of text
        output = text_generator(prompt + generated_text, max_new_tokens=chunk_size, do_sample=True)
        new_text = output[0]["generated_text"][len(prompt + generated_text):]
        
        # Append the new text to the generated text
        generated_text += new_text
        print(new_text)  # Print the generated text chunk

        # Generate TTS for the current chunk
        audio = tts_pipeline(new_text)

        # Autoplay the audio chunk in the notebook
        ipd.display(ipd.Audio(audio["wav"], autoplay=True))
        
        # Add a short delay to simulate real-time generation if needed
        # time.sleep(1)  # Uncomment if you want to control the timing

# Example usage
generate_and_play_text("Once upon a time,")
