In [12]:
# Install required packages
import sys
!{sys.executable} -m pip install python-frontmatter minsearch sentence-transformers pydantic-ai tqdm pandas numpy --quiet

print("‚úÖ Packages installed successfully!")


‚úÖ Packages installed successfully!


In [21]:
# =============================================================================
# CONFIGURATION: Set your OpenAI API Key
# =============================================================================
# Uncomment and set your API key here, or set it as an environment variable

import os

# Option 1: Set directly in the notebook (not recommended for production)
# os.environ['OPENAI_API_KEY'] = 'sk-your-api-key-here'

# Option 2: Check if already set in environment
if os.environ.get('OPENAI_API_KEY'):
    print("‚úÖ OpenAI API key is already set")
else:
    print("‚ö†Ô∏è  OpenAI API key not set")
    print("   To use the agent features, uncomment the line above and add your key")
    print("   or set OPENAI_API_KEY in your environment before starting Jupyter")

‚ö†Ô∏è  OpenAI API key not set
   To use the agent features, uncomment the line above and add your key
   or set OPENAI_API_KEY in your environment before starting Jupyter


In [13]:
# Project: Tech Interview Handbook AI Agent
# Repository: yangshun/tech-interview-handbook

# =============================================================================
# DAY 1: DATA INGESTION
# =============================================================================

import io
import zipfile
import requests
import frontmatter

def read_repo_data(repo_owner, repo_name):
    """
    Download and parse all markdown files from a GitHub repository.
    
    Args:
        repo_owner: GitHub username or organization
        repo_name: Repository name
    
    Returns:
        List of dictionaries containing file content and metadata
    """
    prefix = 'https://codeload.github.com' 
    url = f'{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main'
    resp = requests.get(url)
    
    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: {resp.status_code}")

    repository_data = []
    zf = zipfile.ZipFile(io.BytesIO(resp.content))
    
    for file_info in zf.infolist():
        filename = file_info.filename
        filename_lower = filename.lower()

        if not (filename_lower.endswith('.md') 
            or filename_lower.endswith('.mdx')):
            continue
    
        try:
            with zf.open(file_info) as f_in:
                content = f_in.read().decode('utf-8', errors='ignore')
                post = frontmatter.loads(content)
                data = post.to_dict()
                
                # Strip repo prefix from filename
                _, filename_repo = filename.split('/', maxsplit=1)
                data['filename'] = filename_repo
                repository_data.append(data)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            continue
    
    zf.close()
    return repository_data

# Download Tech Interview Handbook
REPO_OWNER = "yangshun"
REPO_NAME = "tech-interview-handbook"

print(f"Downloading {REPO_OWNER}/{REPO_NAME}...")
tech_interview_docs = read_repo_data(REPO_OWNER, REPO_NAME)

print(f"‚úÖ Downloaded {len(tech_interview_docs)} documents")
print(f"üìÑ Sample: {tech_interview_docs[0]['filename']}")

Downloading yangshun/tech-interview-handbook...
‚úÖ Downloaded 82 documents
üìÑ Sample: CODE_OF_CONDUCT.md
‚úÖ Downloaded 82 documents
üìÑ Sample: CODE_OF_CONDUCT.md


In [14]:
# =============================================================================
# DAY 2: CHUNKING
# =============================================================================

def sliding_window(seq, size, step):
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        chunk = seq[i:i+size]
        result.append({'start': i, 'content': chunk})
        if i + size >= n:
            break

    return result

def chunk_documents(docs, size=2000, step=1000):
    chunks = []

    for doc in docs:
        doc_copy = doc.copy()
        doc_content = doc_copy.pop('content', '')
        
        if len(doc_content) > size:
            doc_chunks = sliding_window(doc_content, size=size, step=step)
            for chunk in doc_chunks:
                chunk.update(doc_copy)
            chunks.extend(doc_chunks)
        else:
            doc_copy['content'] = doc_content
            chunks.append(doc_copy)

    return chunks

tech_interview_chunks = chunk_documents(tech_interview_docs, size=2000, step=1000)

print(f"‚úÖ Created {len(tech_interview_chunks)} chunks from {len(tech_interview_docs)} documents")

‚úÖ Created 535 chunks from 82 documents


In [15]:
# =============================================================================
# DAY 3: SEARCH ENGINE
# =============================================================================

from minsearch import Index

print("Building search index...")

tech_index = Index(
    text_fields=["content", "title", "description", "filename"],
    keyword_fields=[]
)

tech_index.fit(tech_interview_chunks)

print("‚úÖ Search index built")

# Test search
test_query = "How to prepare for coding interviews?"
print(f"\nTesting search: '{test_query}'")

results = tech_index.search(test_query, num_results=3)

for i, result in enumerate(results, 1):
    print(f"\n{i}. {result.get('title', result['filename'])}")
    print(f"   File: {result['filename']}")
    preview = result.get('content', '')[:100]
    print(f"   Preview: {preview}...")

Building search index...
‚úÖ Search index built

Testing search: 'How to prepare for coding interviews?'

1. Coding interviews: Everything you need to prepare
   File: apps/website/contents/coding-interview-prep.md
   Preview: emember the questions they have practiced before.

Instead, this is how to prepare for your Software...

2. Coding interviews: Everything you need to prepare
   File: apps/website/contents/coding-interview-prep.md
   Preview: rately determine time and space complexity and optimize them.
1. **Technical competency** - Translat...

3. Coding interviews: Everything you need to prepare
   File: apps/website/contents/coding-interview-prep.md
   Preview:  language should be used for interviews? Generally, we want higher level languages that have many st...


In [18]:
# =============================================================================
# DAY 4: AGENT + TOOL INTEGRATION
# =============================================================================

import os
from pydantic_ai import Agent
from pydantic import BaseModel
from typing import Any, List
from pydantic_ai.models.openai import OpenAIModel

# Set up OpenAI
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')

if not OPENAI_API_KEY:
    print("‚ö†Ô∏è  OPENAI_API_KEY not set. Please set it before using the agent:")
    print("   import os")
    print("   os.environ['OPENAI_API_KEY'] = 'your-api-key-here'")
else:
    print("‚úÖ OpenAI API key configured")

# Define response model
class InterviewResponse(BaseModel):
    answer: str
    sources: List[str]

# System prompt for the agent
system_prompt = """You are an expert technical interviewing coach. You help software engineers 
prepare for technical interviews by providing clear, practical advice based on proven strategies.

When answering questions:
1. Be specific and actionable
2. Use examples where helpful
3. Cite sources from the Tech Interview Handbook when available
4. Keep answers concise but thorough
5. Break down complex topics into digestible parts

Always base your answers on the search results provided."""

# Create the agent
tech_agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt=system_prompt,
    output_type=InterviewResponse,
    retries=2
)

# Tool: Search the Tech Interview Handbook
@tech_agent.tool
def search_handbook(context, query: str) -> str:
    """Search the Tech Interview Handbook for relevant information.
    
    Args:
        query: The search query to find relevant content
        
    Returns:
        A formatted string containing the top search results
    """
    results = tech_index.search(query, num_results=5)
    
    if not results:
        return "No relevant information found in the handbook."
    
    formatted = []
    for i, result in enumerate(results, 1):
        title = result.get('title', result['filename'])
        content = result.get('content', '')[:300]
        source = result['filename']
        
        formatted.append(f"[{i}] {title}\nSource: {source}\n{content}...\n")
    
    return "\n".join(formatted)

# Test the agent
print("\n" + "="*60)
print("Testing Agent")
print("="*60)

if OPENAI_API_KEY:
    test_question = "What are the most important data structures for coding interviews?"
    print(f"\nQuestion: {test_question}")
    print("\nAnswer:")

    try:
        # Use await in async context (Jupyter)
        import asyncio
        result = asyncio.run(tech_agent.run(test_question))
        print(result.data.answer)
        print(f"\nüìö Sources: {', '.join(result.data.sources)}")
    except Exception as e:
        print(f"Error: {e}")
else:
    print("\n‚ö†Ô∏è  Skipping test - set OPENAI_API_KEY to test the agent")
    print("Example:")
    print("  import os")
    print("  os.environ['OPENAI_API_KEY'] = 'sk-...'")
    print("  # Then re-run this cell")

‚ö†Ô∏è  OPENAI_API_KEY not set. Please set it before using the agent:
   import os
   os.environ['OPENAI_API_KEY'] = 'your-api-key-here'

Testing Agent

‚ö†Ô∏è  Skipping test - set OPENAI_API_KEY to test the agent
Example:
  import os
  os.environ['OPENAI_API_KEY'] = 'sk-...'
  # Then re-run this cell


In [20]:
# =============================================================================
# DAY 5: LOGGING & EVALUATION
# =============================================================================

import json
import time
from datetime import datetime

# Simple conversation logger
conversation_history = []

async def ask_agent_async(question: str, log: bool = True):
    """Ask the agent a question and optionally log the interaction."""
    
    start_time = time.time()
    
    try:
        result = await tech_agent.run(question)
        elapsed_time = time.time() - start_time
        
        response = {
            "timestamp": datetime.now().isoformat(),
            "question": question,
            "answer": result.data.answer,
            "sources": result.data.sources,
            "elapsed_time": round(elapsed_time, 2),
            "status": "success"
        }
        
        if log:
            conversation_history.append(response)
        
        return response
    
    except Exception as e:
        elapsed_time = time.time() - start_time
        
        response = {
            "timestamp": datetime.now().isoformat(),
            "question": question,
            "answer": None,
            "sources": [],
            "elapsed_time": round(elapsed_time, 2),
            "status": "error",
            "error": str(e)
        }
        
        if log:
            conversation_history.append(response)
        
        return response

# Test evaluation
print("\n" + "="*60)
print("Testing Evaluation System")
print("="*60)

if not OPENAI_API_KEY:
    print("\n‚ö†Ô∏è  Skipping evaluation tests - OPENAI_API_KEY not set")
    print("Set your API key in the configuration cell at the top and re-run this cell")
else:
    test_questions = [
        "What is Big O notation?",
        "How should I prepare for system design interviews?",
        "What companies have the best interview processes?"
    ]

    for q in test_questions:
        print(f"\nüìù Q: {q}")
        response = await ask_agent_async(q)
        
        if response["status"] == "success":
            print(f"‚úÖ A: {response['answer'][:150]}...")
            print(f"‚è±Ô∏è  Time: {response['elapsed_time']}s")
            print(f"üìö Sources: {len(response['sources'])} references")
        else:
            print(f"‚ùå Error: {response['error']}")

    # Summary
    print("\n" + "="*60)
    print("Conversation Summary")
    print("="*60)
    print(f"Total questions: {len(conversation_history)}")
    successful = sum(1 for r in conversation_history if r['status'] == 'success')
    print(f"Successful: {successful}")
    print(f"Failed: {len(conversation_history) - successful}")

    if successful > 0:
        avg_time = sum(r['elapsed_time'] for r in conversation_history if r['status'] == 'success') / successful
        print(f"Average response time: {avg_time:.2f}s")

# You can save the conversation history to a file
# with open('conversation_log.json', 'w') as f:
#     json.dump(conversation_history, f, indent=2)


Testing Evaluation System

‚ö†Ô∏è  Skipping evaluation tests - OPENAI_API_KEY not set
Set your API key in the configuration cell at the top and re-run this cell
