In [349]:
from typing import Annotated, Optional, List, Literal
from langchain_core.tools import tool
from typing_extensions import TypedDict
from datetime import datetime

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from pydantic import BaseModel, Field
import requests
from rich import print_json
from dotenv import load_dotenv
import os
import json
from langgraph.types import Command, interrupt
from IPython.display import Image, display
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langfuse import Langfuse
from langfuse.callback import CallbackHandler
from concurrent.futures import ThreadPoolExecutor
import asyncio

load_dotenv()

True

In [350]:
os.environ["LANGCHAIN_DISABLE_GRAPH_VIZ"] = "true"
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')

# Define the chat model initialization function
def init_chat_model(model_name, model_provider="openai", temperature=0):
    if model_provider == "openai":
        return ChatOpenAI(
            model=model_name,
            temperature=temperature
        )
    else:
        raise ValueError(f"Unsupported model provider: {model_provider}")

# Initialize the model
model = init_chat_model("gpt-4.1-nano", model_provider="openai")

langfuse_handler = CallbackHandler(
  secret_key="sk-lf-f01d262f-12dd-49a6-8b06-61cae9cf9118",
  public_key="pk-lf-d988846f-2f7e-424e-a029-a00a4840b729",
  host="https://cloud.langfuse.com"
)

In [351]:
from typing import Optional, List, Literal, Dict, Any

Score = Annotated[int, Field(ge=0, le=100)]

class Diarization(BaseModel):
    speaker0: Optional[str]
    speaker1: Optional[str]


class VideoAttributes(BaseModel):
    createdAt: datetime
    updatedAt: Optional[datetime]
    applicationId: Optional[str]
    url: Optional[str]
    playbackId: Optional[str]
    assetId: Optional[str]
    duration: float
    isActive: bool
    question: Optional[str]
    signedUrl: Optional[str]
    transcript: Optional[str]
    jobId: Optional[str]
    source: Optional[str]
    diarization: Optional[Diarization]
    summary: Optional[str]
    description: Optional[str]
    developerId: Optional[str]


class Video(BaseModel):
    id: str
    type: str
    attributes: VideoAttributes

# Define grading details model
class GradingDetails(BaseModel):
    """Detailed evaluation of an interview answer."""
    score: int = Field(ge=0, le=100, description="Score on scale of 0-100")
    rating: str = Field(description="Qualitative rating (Strong Yes, Yes, No, Strong No)")
    key_facts: List[str] = Field(description="Key technical facts/concepts demonstrated in the answer")
    strengths: List[str] = Field(description="Strengths of the answer")
    weaknesses: List[str] = Field(description="Weaknesses or areas for improvement in the answer")
    justification: str = Field(description="Brief justification for the score and rating")

# Structured output model for interview Q&A pairs
class QuestionAnswerPair(BaseModel):
    """A pair of question and answer from the interview."""
    
    question_text: str = Field(description="The full text of the question asked by the interviewer")
    answer_text: str = Field(description="The full text of the answer given by the interviewee")
    rating: Optional[str] = Field(description="Rating of the answer (Strong Yes, Yes, No, Strong No)", default="")
    score: Optional[int] = Field(description="Score for this answer on a scale of 0-100", ge=0, le=100, default=0)
    grading_details: Optional[GradingDetails] = Field(default=None, description="Detailed grading information")
    
class QuestionAnswerPairs(BaseModel):
    """Collection of question-answer pairs from an interview."""
    qa_pairs: List[QuestionAnswerPair]

class Evaluation(BaseModel):
    score: Score = 0
    video: Optional[Video] = None
    qa_pairs: List[QuestionAnswerPair] = Field(default_factory=list)
    url: str
    error: Optional[str] = ""
    speaker_identification: Optional[Dict[str, str]] = None

In [352]:
async def get_interview_state(state: Evaluation):
    # Access the URL directly from the Evaluation object
    video_url = state.url
    video = requests.get(video_url, headers={'authorization': 'Bearer cd6f3a3b-7cb5-43f7-a332-dd52c0b39e1c'})
    # Return the video data to update state
    return {"video": video.json().get('data')}

async def identify_speakers(state: Evaluation):
    print("Processing speakers from video data")
    try:
        # Extract transcript
        transcript = state.video.attributes.transcript
        
        if not transcript or not transcript.strip():
            return {"error": "No transcript available for speaker identification"}
        
        # Define JSON Schema for speaker identification
        speaker_schema = {
            "title": "SpeakerIdentification",
            "description": "Identification of speakers in an interview transcript",
            "type": "object",
            "properties": {
                "interviewer": {
                    "type": "string",
                    "description": "Label for the interviewer (speaker0 or speaker1)"
                },
                "interviewee": {
                    "type": "string",
                    "description": "Label for the interviewee (speaker0 or speaker1)"
                }
            },
            "required": ["interviewer", "interviewee"]
        }
        
        # Create messages for speaker identification
        messages = [
            SystemMessage(content="""
                You are an AI assistant specialized in analyzing technical interview transcripts.
                
                Your task is to identify which speaker is the interviewer and which is the interviewee in the provided transcript.
                
                The interviewer typically:
                - Asks most of the questions
                - Guides the conversation
                - Introduces coding problems or technical concepts
                - Evaluates responses
                
                The interviewee typically:
                - Answers questions
                - Explains their reasoning
                - Provides solutions to coding problems
                - Demonstrates technical knowledge
                
                Return your analysis with the labels "interviewer" and "interviewee" assigned to either "speaker0" or "speaker1".
            """),
            HumanMessage(content=f"Here is the interview transcript:\n\n{transcript}")
        ]
        
        # Use structured output with our speaker schema
        structured_llm = model.with_structured_output(speaker_schema)
        
        # Get response and extract speaker roles
        try:
            response = await structured_llm.ainvoke(messages)
            print(f"Identified speakers: Interviewer={response['interviewer']}, Interviewee={response['interviewee']}")
            
            # Simply return the speaker roles - LangGraph will handle state updates
            return {"speaker_identification": response}
            
        except Exception as e:
            print(f"Error identifying speakers: {e}")
            return {"error": f"Error identifying speakers: {e}"}
            
    except AttributeError as e:
        print(f"Error accessing transcript: {e}")
        return {"error": "Could not access transcript for speaker identification"}

In [353]:
async def extract_questions(state: Evaluation):
    print("Extracting detailed question-answer pairs from interview transcript")
    try:
        transcript = state.video.attributes.transcript
        
        if not transcript or not transcript.strip():
            return {"error": "No transcript", "qa_pairs": []}
        
        # Get speaker identification if available
        interviewer = getattr(state, 'speaker_identification', {}).get('interviewer')
        interviewee = getattr(state, 'speaker_identification', {}).get('interviewee')
        
        # Using Pydantic v2 approach with RootModel
        from pydantic import RootModel
        # Ensure List is imported here
        from typing import List
        QAPairsList = RootModel[List[QuestionAnswerPair]]
        
        # Build detailed prompt with instructions
        system_message = f"""
        You are analyzing a technical interview transcript to extract detailed question-answer pairs.
        
        Guidelines for extraction:
        1. Identify complete technical questions that test knowledge
        2. Capture the full context of both questions and answers
        3. Include any code examples mentioned in questions or answers
        4. Recognize multi-part questions and answers that span multiple turns
        5. Focus only on substantial technical questions, not conversational remarks
        
        In this transcript:
        - The interviewer is labeled as {interviewer}
        - The interviewee is labeled as {interviewee}
        
        Extract at least 5 substantial technical questions from the interviewer and full answers from the interviewee.
        """
        
        human_message = f"Here is the interview transcript to analyze:\n\n{transcript}"
        
        messages = [
            SystemMessage(content=system_message),
            HumanMessage(content=human_message)
        ]
        
        # Alternative approach using messages without structured output
        response = await model.ainvoke(messages)
        
        # Parse the response using a more robust approach
        import json
        import re
        
        # Process the text response to extract json-like content
        # This is a fallback approach since structured output failed
        qa_pairs: List[QuestionAnswerPair] = []
        
        # Use a direct prompt that requests specific formatting
        extraction_prompt = f"""
        Analyze this technical interview and extract exactly 5-7 question-answer pairs.
        The interviewer is {interviewer} and the interviewee is {interviewee}.
        
        For each pair, format your response as:
        
        QUESTION: [Full question text]
        ANSWER: [Full answer text]
        TOPIC: [Technical topic]
        
        Make sure to capture multi-part questions and their complete answers.
        Include relevant code examples in both questions and answers.
        Focus on substantial technical questions, not conversational remarks.
        
        Transcript:
        {transcript}
        """
        
        extraction_response = await model.ainvoke(extraction_prompt)
        
        # Process the formatted response
        response_text = extraction_response.content
        qa_blocks = re.split(r'QUESTION:', response_text)[1:]  # Skip the first empty element
        
        for block in qa_blocks:
            try:
                question_text = block.split('ANSWER:')[0].strip()
                remaining = block.split('ANSWER:')[1]
                
                # Handle if TOPIC is present
                if 'TOPIC:' in remaining:
                    answer_text = remaining.split('TOPIC:')[0].strip()
                    # We don't need to store topic but could add it if QuestionAnswerPair is updated
                else:
                    answer_text = remaining.strip()
                
                qa_pairs.append(QuestionAnswerPair(
                    question_text=question_text,
                    answer_text=answer_text
                ))
            except Exception as e:
                print(f"Error parsing a Q&A block: {e}")
        
        print(f"Extracted {len(qa_pairs)} detailed question-answer pairs")
        return {"qa_pairs": qa_pairs}
        
    except Exception as e:
        print(f"Error extracting question-answer pairs: {e}")
        return {"error": f"Error extracting question-answer pairs: {e}", "qa_pairs": []}

In [354]:
async def grade_answer(qa_pair: QuestionAnswerPair) -> QuestionAnswerPair:
    """Grade a single question-answer pair."""
    
    # Create a schema for structured output
    grading_schema = {
        "title": "GradingDetails",
        "type": "object",
        "properties": {
            "score": {
                "type": "integer",
                "description": "Score on scale of 0-100 based on technical accuracy, completeness, and clarity",
                "minimum": 0,
                "maximum": 100
            },
            "rating": {
                "type": "string",
                "enum": ["Strong Yes", "Yes", "No", "Strong No"],
                "description": "Overall hiring recommendation based on this answer"
            },
            "key_facts": {
                "type": "array",
                "items": {"type": "string"},
                "description": "List of key technical facts or concepts demonstrated in the answer"
            },
            "strengths": {
                "type": "array",
                "items": {"type": "string"},
                "description": "List of strengths in the answer"
            },
            "weaknesses": {
                "type": "array",
                "items": {"type": "string"},
                "description": "List of weaknesses or areas for improvement in the answer"
            },
            "justification": {
                "type": "string",
                "description": "Brief justification for the score and rating"
            }
        },
        "required": ["score", "rating", "key_facts", "strengths", "weaknesses", "justification"]
    }
    
    # Create a structured output model
    structured_model = model.with_structured_output(grading_schema)
    
    # Construct the grading prompt
    system_prompt = """You are an expert technical interviewer for JavaScript/TypeScript engineering roles.
    Your task is to grade the candidate's answer to a technical interview question. 
    
    Evaluation criteria:
    - Technical accuracy (40%): Is the answer technically correct and demonstrate understanding?
    - Completeness (30%): Does the answer address all parts of the question thoroughly?
    - Clarity & communication (30%): Is the answer well-structured, clear, and easy to follow?
    
    Scoring guide:
    - 90-100: Exceptional answer that demonstrates deep expertise
    - 75-89: Strong answer with minor areas for improvement
    - 60-74: Acceptable answer with some gaps or misconceptions
    - 40-59: Weak answer with significant gaps or errors
    - 0-39: Very poor answer showing lack of understanding
    
    Rating guide:
    - Strong Yes: Exceptional answer, candidate demonstrates mastery
    - Yes: Good answer, candidate shows competence
    - No: Problematic answer, candidate needs improvement
    - Strong No: Poor answer, candidate lacks fundamental understanding
    
    Provide a structured evaluation with a numerical score, qualitative rating, key facts demonstrated, 
    strengths, weaknesses, and brief justification."""
    
    human_prompt = f"""Question: {qa_pair.question_text}
    
    Candidate's Answer: {qa_pair.answer_text}
    
    Please evaluate this answer according to the criteria."""
    
    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=human_prompt)
    ]
    
    # Get the grading result
    grading_result = await structured_model.ainvoke(messages)
    
    # Update the qa_pair with the score and rating
    qa_pair.score = grading_result["score"]
    qa_pair.rating = grading_result["rating"]
    
    # Create a GradingDetails object and assign it to grading_details
    details = GradingDetails(
        score=grading_result["score"],
        rating=grading_result["rating"],
        key_facts=grading_result["key_facts"],
        strengths=grading_result["strengths"],
        weaknesses=grading_result["weaknesses"],
        justification=grading_result["justification"]
    )
    qa_pair.grading_details = details
    
    return qa_pair

async def parallel_grade_answers(qa_pairs: List[QuestionAnswerPair]) -> List[QuestionAnswerPair]:
    """Grade multiple question-answer pairs in parallel."""
    
    # Create tasks for parallel execution
    tasks = [grade_answer(qa_pair) for qa_pair in qa_pairs]
    
    # Execute all grading tasks in parallel
    graded_pairs = await asyncio.gather(*tasks)
    
    return graded_pairs

async def grade_interview_answers(state: Evaluation):
    """Grade all question-answer pairs in parallel and calculate an overall score."""
    print("Grading interview answers in parallel...")
    
    if not state.qa_pairs or len(state.qa_pairs) == 0:
        return {"error": "No question-answer pairs to grade"}
    
    # Grade all question-answer pairs in parallel
    graded_pairs = await parallel_grade_answers(state.qa_pairs)
    
    # Calculate overall score (average of all answer scores)
    if graded_pairs:
        total_score = sum(pair.score for pair in graded_pairs if pair.score is not None)
        overall_score = total_score // len(graded_pairs)
    else:
        overall_score = 0
    
    print(f"Grading complete. Overall interview score: {overall_score}/100")
    
    # Return updated state with graded pairs and overall score
    return {"qa_pairs": graded_pairs, "score": overall_score}

In [355]:
# Define the graph structure
workflow = StateGraph(Evaluation)

# Add all nodes
workflow.add_node("get_interview_state", get_interview_state)
workflow.add_node("identify_speakers", identify_speakers)
workflow.add_node("extract_questions", extract_questions)
workflow.add_node("grade_interview_answers", grade_interview_answers)

# Define the edges for the workflow
workflow.add_edge(START, "get_interview_state")
workflow.add_edge("get_interview_state", "identify_speakers")
workflow.add_edge("identify_speakers", "extract_questions")
workflow.add_edge("extract_questions", "grade_interview_answers")
workflow.add_edge("grade_interview_answers", END)

# Compile the graph with memory handling
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)

# Function to run the graph for a given interview URL
async def process_interview(url: str):
    # Create initial state with URL
    initial_state = Evaluation(url=url)
    
    # Run the graph
    config = {"configurable": {"thread_id": str(datetime.now().timestamp())}, "callbacks": [langfuse_handler]}
    result = await graph.ainvoke(initial_state, config)
    
    return result

In [356]:
# Example execution
# Replace with your actual interview URL
interview_url = "https://core.g2i.co/api/v2/videos/250c4ef3-114a-4709-8fdc-3419d48f8908"

# Run the interview processing workflow
try:
    result = await process_interview(interview_url)
    
    # Print the interview summary
    if hasattr(result, 'error') and result.error:
        print(f"Error processing interview: {result.error}")
    else:
        pass
        # print_interview_summary(result)
except Exception as e:
    print(f"Error running interview workflow: {e}")

Processing speakers from video data
Identified speakers: Interviewer=speaker0, Interviewee=speaker1
Extracting detailed question-answer pairs from interview transcript
Extracted 5 detailed question-answer pairs
Grading interview answers in parallel...
Grading complete. Overall interview score: 85/100
