<a href="https://colab.research.google.com/github/MayankPandey2004/InterviewPracticePartner/blob/main/InterviewPracticePartner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install --q pinecone openai autogen
!pip install --q autogen-agentchat autogen-ext[openai,azure]

In [2]:
import os
from pinecone import Pinecone
from openai import OpenAI
from typing import List
from google.colab import userdata
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.agents import UserProxyAgent

In [3]:
PINECONE_API_KEY = userdata.get('pinecone_api')
OPENAI_API_KEY = userdata.get('openai_api_key')
OPENAI_MODEL = "gpt-4o-mini"
EMBEDDING_MODEL_ID = "text-embedding-3-small"
DIMENSION = 512
INDEX_HOST = "https://interview-questions-pfcnu58.svc.aped-4627-b74a.pinecone.io"
INDEX_NAME = "interview-questions"
DEFAULT_NAMESPACE = "Frontend"

model_client = OpenAIChatCompletionClient(
        model="gpt-4o-mini",
        api_key=OPENAI_API_KEY,
        max_tokens=2000
    )

In [4]:
from openai import OpenAI
from typing import List

def embed_query(openai_client: OpenAI, query_text: str) -> List[float]:
    try:
        response = openai_client.embeddings.create(
            model="text-embedding-3-small",
            input=[query_text],
            dimensions=DIMENSION
        )
        query_vector = response.data[0].embedding
        return query_vector
    except Exception as e:
        print(f"An error occurred during embedding: {e}")
        return []

In [5]:
pc = Pinecone(api_key=PINECONE_API_KEY)
openai_client = OpenAI(api_key=OPENAI_API_KEY)
index = pc.Index(host=INDEX_HOST)

In [6]:
def search_pinecone_rag(query_text: str):
    """
    Searches the Pinecone index with a query, embeds the text using
    'text-embedding-3-small' (512 dimensions), and returns the retrieved documents.

    Args:
        query_text (str): The search query provided by the assistant agent.

    Returns:
        str: A formatted string of the relevant document questions and answers.
    """

    # 1. Generate the vector (using the globally available embed_query with dimension 512)
    # Note: If embed_query isn't accessible globally, define it inside or pass clients.
    # We rely on the notebook structure here.
    query_vector = embed_query(openai_client, query_text)

    if not query_vector:
        return "Error: Could not generate embedding for the query."

    # 2. Perform the Pinecone search (using the globally available index)
    try:
        # Using index.query with 'vector' argument, as per your successful previous attempt
        # and including metadata (question/answer) in the results
        res = index.query(
            namespace=DEFAULT_NAMESPACE,
            vector=query_vector,
            top_k=3,
            include_metadata=True,
            include_values=False
        )
    except Exception as e:
        return f"Error occurred during Pinecone query: {e}"

    # 3. Format the results
    if not res.matches:
        return "No relevant documents found in the vector store."

    context = []
    for match in res.matches:
        # Check if the match has required metadata fields
        if 'question' in match.metadata and 'answer' in match.metadata:
             context.append(f"Question: {match.metadata['question']}\nAnswer: {match.metadata['answer']}")

    # Send the raw context back to the LLM for synthesis
    return "\n---\n".join(context)

In [7]:
interviewer = AssistantAgent(
    name="interviewer",
    model_client = model_client,
    # TOOLS REMOVED HERE
    system_message="""You are a professional RAG Interviewer. Your mission is to conduct a Frontend development interview using RAG context received from the other agents.
1.  **START**: Receive the first question from the qa_handler_agent and immediately ask it to the Candidate.
2.  **RESPONSE**: When the Candidate answers, you **MUST** follow this structured, multi-part response template for your analysis and next question:
    ### üéØ Analysis and Next Question

    #### üìù Analysis of Candidate's Answer
    **Score**: [Brief rating/assessment based on the hidden stored Answer.]
    **Evaluation**: [Contrast the Candidate's answer with the stored RAG Answer from the conversation history. Be critical but fair.]

    <hr>

    #### ‚ùì Next Question
    **Strategy**: [State your reasoning: e.g., 'Follow-up on X' or 'Move to next topic Y'.]
    [Ask the next question text here, ensuring it is sourced from your history or a new RAG call if needed.]

3.  **Constraints**: You MUST adhere to the analysis structure. Do NOT reveal the stored Answer to the Candidate. Use the RAG context for all questions. Reply with 'TERMINATE' only when the interview is over."""
)
retriever_agent= AssistantAgent(
    name="retriever_agent",
    model_client = model_client,
    tools = [search_pinecone_rag],
    system_message="""You are a professional RAG system acting as a **Retriever**.
    1.  **Initial Action**: Use the 'search_pinecone_rag' tool with the query: 'Frontend development interview questions'.
    2. **Send Data**: Send the raw output from the tool *directly* to the **qa_handler_agent**. Your task is complete after this step."""
)

qa_handler_agent = AssistantAgent(
    name="qa_handler_agent",
    model_client=model_client,
    system_message="""You are a QA Handler. Your task is to process the raw RAG output received from the retriever_agent.
    1. **Context Storage**: Store the **entire** incoming raw text (all Q/A pairs) in your memory.
    2. **Question Extraction**: Extract the **first, clean question text** from the raw output.
    3. **Output**: Output ONLY that single, extracted question text and send it directly to the **interviewer** agent. Do not include any context, headers, or conversational fillers."""
)
candidate_proxy = UserProxyAgent(
    name="Candidate"
)

In [8]:
def selector_func_rag_interview(messages) -> str | None:
    last_speaker_name = messages[-1].source

    ALL_AGENTS = [interviewer.name, retriever_agent.name, candidate_proxy.name, qa_handler_agent.name]

    if last_speaker_name not in ALL_AGENTS:
        return retriever_agent.name

    if last_speaker_name == retriever_agent.name:
        return qa_handler_agent.name

    if last_speaker_name == qa_handler_agent.name:
        return interviewer.name

    if last_speaker_name == interviewer.name:
        return candidate_proxy.name

    if last_speaker_name == candidate_proxy.name:
        return interviewer.name

    return None

In [9]:
termination = MaxMessageTermination(max_messages=30) | TextMentionTermination("TERMINATE")
team = SelectorGroupChat(
    [interviewer, retriever_agent, candidate_proxy,qa_handler_agent],
    model_client=model_client,
    termination_condition=termination,
    selector_func=selector_func_rag_interview,
    allow_repeated_speaker=True,
)

In [10]:
"""
Fixed Agentic AI Interview System
- Proper async handling for Colab
- Tracks asked questions to avoid repetition
- Better follow-up logic
- Processing delays for better UX
"""

from enum import Enum
from dataclasses import dataclass
from typing import Optional, Dict, List, Any
import json
import re
import time
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.messages import TextMessage

class DifficultyLevel(Enum):
    EASY = 1
    MEDIUM = 2
    HARD = 3

@dataclass
class Question:
    question: str
    expected_answer: str
    difficulty: str
    topic: str

@dataclass
class InterviewState:
    questions_asked: List[Question]
    user_answers: List[str]
    scores: List[float]
    current_difficulty: DifficultyLevel
    topics_covered: set
    asked_question_texts: set  # Track to avoid duplicates
    total_questions: int = 0
    max_questions: int = 8

class InterviewSystem:
    """Main interview orchestrator"""

    def __init__(self, openai_client_obj, pinecone_index, openai_api_key, namespace="Frontend"):
        self.openai_client = openai_client_obj
        self.index = pinecone_index
        self.namespace = namespace
        self.api_key = openai_api_key

        # Create fresh model client
        self.model_client = OpenAIChatCompletionClient(
            model="gpt-4o-mini",
            api_key=openai_api_key,
            max_tokens=2000
        )

        self.state = InterviewState(
            questions_asked=[],
            user_answers=[],
            scores=[],
            current_difficulty=DifficultyLevel.EASY,
            topics_covered=set(),
            asked_question_texts=set()
        )

    def _search_rag(self, query: str, top_k: int = 5) -> List[Dict]:
        """Search Pinecone for questions, returning multiple to choose from"""
        from openai import OpenAI

        # Embed query
        openai_temp = OpenAI(api_key=self.api_key)
        response = openai_temp.embeddings.create(
            model="text-embedding-3-small",
            input=[query],
            dimensions=512
        )
        query_vector = response.data[0].embedding

        if not query_vector:
            return []

        try:
            res = self.index.query(
                namespace=self.namespace,
                vector=query_vector,
                top_k=top_k,
                include_metadata=True,
                include_values=False
            )

            results = []
            if res.matches:
                for match in res.matches:
                    if 'question' in match.metadata and 'answer' in match.metadata:
                        q_text = match.metadata['question']

                        # Skip if already asked
                        if q_text in self.state.asked_question_texts:
                            continue

                        results.append({
                            'question': q_text,
                            'expected_answer': match.metadata['answer'],
                            'difficulty': match.metadata.get('difficulty', 'medium'),
                            'topic': match.metadata.get('topic', 'Frontend')
                        })

            return results

        except Exception as e:
            print(f"RAG search error: {e}")
            return []

    def run_interview(self):
        """Main interview loop"""

        print("üéØ Starting Frontend Development Interview")
        print("=" * 50)
        print("\nI'll ask you a series of questions. Answer to the best of your ability.")
        print("Type 'quit' to exit early.\n")

        while self.state.total_questions < self.state.max_questions:
            # Step 1: Get question from RAG
            question_data = self._get_next_question()
            if not question_data:
                print("‚ùå Could not retrieve new question. Ending interview.")
                break

            # Step 2: Ask the question
            user_answer = self._ask_question(question_data['question'])
            if user_answer.lower() in ['quit', 'exit', 'terminate']:
                print("\nüëã Interview ended early.")
                break

            # Step 3: Evaluate the answer
            evaluation = self._evaluate_answer_sync(
                question_data['expected_answer'],
                user_answer
            )

            # Step 4: Store results
            self._update_state(question_data, user_answer, evaluation)

            # Step 5: OPTIONAL follow-up (max 1 per question)
            if evaluation.get('needs_followup', False) and evaluation.get('followup_question'):
                followup_q = evaluation['followup_question']
                print(f"\nüîç Follow-up: {followup_q}")
                followup_answer = input("Your answer: ")

                # Processing delay for follow-up
                if followup_answer.lower() not in ['quit', 'exit', 'skip']:
                    print("\n‚è≥ Processing", end="", flush=True)
                    for _ in range(2):
                        time.sleep(0.4)
                        print(".", end="", flush=True)
                    print(" ‚úì Follow-up noted.\n")
                    time.sleep(0.5)

            # Step 6: Decide next action
            next_action = self._decide_next_step(evaluation)

            if next_action['action'] == 'conclude':
                break

            # Update difficulty for next question
            try:
                self.state.current_difficulty = DifficultyLevel[next_action['next_difficulty'].upper()]
            except:
                pass

        # Generate final report
        self._generate_final_report()

    def _get_next_question(self) -> Optional[Dict]:
        """Retrieve next question from RAG, avoiding repeats"""

        # Build query with variety
        difficulty_str = self.state.current_difficulty.name.lower()

        # Try to get questions on new topics
        covered_str = ', '.join(self.state.topics_covered) if self.state.topics_covered else ''

        if covered_str:
            query = f"Frontend development {difficulty_str} interview question not about {covered_str}"
        else:
            query = f"Frontend development {difficulty_str} interview question"

        # Search RAG for multiple options
        candidates = self._search_rag(query, top_k=10)

        if not candidates:
            # Fallback: try broader search
            candidates = self._search_rag("Frontend interview question", top_k=10)

        # Return first unasked question
        return candidates[0] if candidates else None

    def _ask_question(self, question: str) -> str:
        """Ask question to user"""
        print(f"\nüìù Question {self.state.total_questions + 1}:")
        print(f"{question}\n")

        answer = input("Your answer: ")

        # Add processing delay with visual feedback
        print("\n‚è≥ Processing your answer", end="", flush=True)
        for _ in range(3):
            time.sleep(0.5)
            print(".", end="", flush=True)
        print(" Done!\n")

        return answer

    def _evaluate_answer_sync(self, expected: str, actual: str) -> Dict:
        """Evaluate answer using direct OpenAI API call"""
        from openai import OpenAI

        try:
            client = OpenAI(api_key=self.api_key)

            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {
                        "role": "system",
                        "content": """You are an expert technical interviewer evaluator.

Evaluate the candidate's answer and provide ONLY a JSON response:
{
    "score": 0.0 to 1.0,
    "feedback": "brief 1-2 sentence evaluation",
    "understanding_level": "excellent/good/partial/poor",
    "needs_followup": true/false,
    "followup_question": "one specific follow-up question if needed, otherwise null"
}

Scoring rubric:
- 0.9-1.0: Excellent, comprehensive answer covering all key points
- 0.7-0.89: Good answer, minor gaps
- 0.5-0.69: Partial understanding, missing key details
- 0.3-0.49: Significant gaps
- 0.0-0.29: Incorrect or minimal understanding

Follow-ups should probe depth or clarify gaps, not repeat the question. Set needs_followup to true if answer was partial (0.4-0.7 range) and there's something specific to probe.

Return ONLY valid JSON, no markdown, no other text."""
                    },
                    {
                        "role": "user",
                        "content": f"""Expected Answer: {expected}

Candidate Answer: {actual}

Evaluate and respond with JSON only."""
                    }
                ],
                temperature=0.3,
                max_tokens=500
            )

            response_text = response.choices[0].message.content

            # Parse JSON from response
            # Remove markdown code blocks if present
            response_text = re.sub(r'```json\s*|\s*```', '', response_text)
            response_text = response_text.strip()

            # Find JSON object
            json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response_text, re.DOTALL)
            if json_match:
                eval_data = json.loads(json_match.group())
                # Ensure score is float
                eval_data['score'] = float(eval_data.get('score', 0.5))

                # Ensure followup_question is None if not needed
                if not eval_data.get('needs_followup', False):
                    eval_data['followup_question'] = None

                return eval_data

        except Exception as e:
            print(f"‚ö†Ô∏è Evaluation error: {e}")

        # Fallback: simple scoring
        score = 0.6 if len(actual) > 50 else 0.4
        return {
            "score": score,
            "feedback": "Answer recorded and reviewed.",
            "understanding_level": "partial",
            "needs_followup": False,
            "followup_question": None
        }

    def _update_state(self, question_data: Dict, answer: str, evaluation: Dict):
        """Update interview state"""
        q_obj = Question(
            question=question_data['question'],
            expected_answer=question_data['expected_answer'],
            difficulty=question_data['difficulty'],
            topic=question_data['topic']
        )

        self.state.questions_asked.append(q_obj)
        self.state.user_answers.append(answer)
        self.state.scores.append(evaluation['score'])
        self.state.topics_covered.add(question_data['topic'])
        self.state.asked_question_texts.add(question_data['question'])  # Track asked questions
        self.state.total_questions += 1

        print(f"\n‚úì Score: {evaluation['score']:.2f}")
        print(f"üí¨ Feedback: {evaluation['feedback']}")

    def _decide_next_step(self, evaluation: Dict) -> Dict:
        """Decide next action"""

        avg_score = sum(self.state.scores) / len(self.state.scores) if self.state.scores else 0

        if self.state.total_questions >= self.state.max_questions:
            return {"action": "conclude", "next_difficulty": "medium", "next_topic": "any"}

        # Adaptive difficulty
        if avg_score >= 0.8:
            next_diff = "hard"
        elif avg_score >= 0.6:
            next_diff = "medium"
        else:
            next_diff = "easy"

        return {
            "action": "continue",
            "next_difficulty": next_diff,
            "next_topic": "any",
            "reasoning": f"Performance is {avg_score:.2f}"
        }

    def _generate_final_report(self):
        """Generate and display final interview report"""

        if not self.state.scores:
            print("\n‚ùå No questions completed.")
            return

        avg_score = sum(self.state.scores) / len(self.state.scores)

        print("\n" + "=" * 50)
        print("üìä INTERVIEW COMPLETE - FINAL REPORT")
        print("=" * 50)

        print(f"\nüìà Overall Score: {avg_score:.2f}/1.00 ({avg_score*100:.1f}%)")

        if avg_score >= 0.8:
            level = "Excellent ‚≠ê"
        elif avg_score >= 0.6:
            level = "Good ‚úì"
        elif avg_score >= 0.4:
            level = "Fair ~"
        else:
            level = "Needs Improvement ‚ö†Ô∏è"

        print(f"üéØ Performance Level: {level}")
        print(f"‚ùì Questions Answered: {self.state.total_questions}")
        print(f"üìö Topics Covered: {', '.join(self.state.topics_covered)}")

        print("\nüìù Question Breakdown:")
        for i, (q, score) in enumerate(zip(self.state.questions_asked, self.state.scores), 1):
            emoji = "üü¢" if score >= 0.7 else "üü°" if score >= 0.5 else "üî¥"
            print(f"  {emoji} Q{i}. [{q.topic}] {score:.2f}")

        print("\nüí° Recommendations:")
        weak_topics = [q.topic for q, s in zip(self.state.questions_asked, self.state.scores) if s < 0.6]
        if weak_topics:
            print(f"  üìö Focus on: {', '.join(set(weak_topics))}")
        else:
            print("  üéâ Great job! Keep practicing to maintain proficiency.")

        strong_topics = [q.topic for q, s in zip(self.state.questions_asked, self.state.scores) if s >= 0.8]
        if strong_topics:
            print(f"  üí™ Strong areas: {', '.join(set(strong_topics))}")

        print("\n" + "=" * 50)
interview_system = InterviewSystem(openai_client, index, OPENAI_API_KEY, DEFAULT_NAMESPACE)
interview_system.run_interview()

üéØ Starting Frontend Development Interview

I'll ask you a series of questions. Answer to the best of your ability.
Type 'quit' to exit early.


üìù Question 1:
How can Web Components be used in front-end development, and what are the key specifications involved in creating them?

Your answer: Web Components allow developers to create reusable, encapsulated, and framework-independent UI elements. They are used to build custom UI widgets, design systems, micro-frontends, and components that work in any framework (React, Angular, Vue) or plain HTML.

‚è≥ Processing your answer... Done!


‚úì Score: 0.70
üí¨ Feedback: The candidate provided a good overview of Web Components and their use cases but missed key technical details about the specifications and implementation.

üîç Follow-up: Can you explain the role of Shadow DOM in Web Components and how it contributes to encapsulation?
Your answer: quit

üìù Question 2:
What are web workers and how can they improve performance?

Your an