# RAG Code Generation System with FAISS

This notebook implements a complete RAG (Retrieval-Augmented Generation) system for code generation using:
- **HumanEval Dataset**: For code examples
- **Sentence Transformers**: For embeddings
- **FAISS**: For vector similarity search
- **OpenRouter API**: For code generation with open-source LLMs

## Table of Contents
1. Installation
2. Dataset Loading
3. Embedding Creation
4. Vector Index Building
5. Code Generation
6. Examples & Testing

## 1. Installation

Install all required packages:

In [29]:
!pip install datasets sentence-transformers faiss-cpu openai python-dotenv torch -q


[notice] A new release of pip is available: 24.3.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


## 2. Import Libraries

In [30]:
import os
import numpy as np
from typing import List, Dict, Tuple
from datasets import load_dataset
from sentence_transformers import SentenceTransformer
import faiss
from openai import OpenAI
from getpass import getpass

print("✓ All libraries imported successfully")

✓ All libraries imported successfully


## 3. Setup API Key

Get your OpenRouter API key from: https://openrouter.ai/

In [31]:
# Enter your OpenRouter API key
OPENROUTER_API_KEY = getpass("Enter your OpenRouter API key: ")

# Or set it directly (not recommended for shared notebooks)
# OPENROUTER_API_KEY = "your-api-key-here"

## 4. Dataset Functions

Load and process the HumanEval dataset

In [32]:
def load_humaneval_dataset():
    """Load and process the HumanEval dataset."""
    print("Loading HumanEval dataset...")
    dataset = load_dataset("openai/openai_humaneval", split="test")
    
    examples = []
    for item in dataset:
        examples.append({
            'task_id': item['task_id'],
            'prompt': item['prompt'],
            'canonical_solution': item['canonical_solution'],
            'entry_point': item['entry_point']
        })
    
    print(f"✓ Loaded {len(examples)} examples")
    return examples


def extract_prompts(examples):
    """Extract all prompts from examples."""
    return [ex['prompt'] for ex in examples]

## 5. Embedding Functions

In [33]:
def create_embeddings(texts, model_name="sentence-transformers/all-MiniLM-L6-v2"):
    """
    Create embeddings for a list of texts.
    
    Args:
        texts: List of text strings
        model_name: Name of the sentence transformer model
        
    Returns:
        Tuple of (model, embeddings_array)
    """
    print(f"Loading embedding model: {model_name}")
    model = SentenceTransformer(model_name)
    
    print(f"Creating embeddings for {len(texts)} texts...")
    embeddings = model.encode(texts, show_progress_bar=True)
    embeddings_array = np.array(embeddings).astype('float32')
    
    print(f"✓ Created embeddings with shape: {embeddings_array.shape}")
    return model, embeddings_array

## 6. FAISS Index Functions

In [34]:
def build_faiss_index_normalized(embeddings, doc_ids=None):
    """
    Build normalized FAISS index using Inner Product for cosine similarity.
    
    This is better than L2 distance for semantic similarity because:
    - Normalized vectors + Inner Product = Cosine Similarity
    - Cosine similarity is direction-based, not magnitude-based
    - More accurate for text embeddings
    
    Args:
        embeddings: Numpy array of embeddings
        doc_ids: Optional array of document IDs (uses indices if None)
        
    Returns:
        FAISS IndexIDMap with normalized vectors
    """
    print("Building normalized FAISS index with Inner Product...")
    
    # Get dimension
    dim = embeddings.shape[1]
    
    # Normalize embeddings to unit length (L2 normalization)
    # This makes Inner Product equivalent to Cosine Similarity
    norm_embeddings = embeddings.copy()
    faiss.normalize_L2(norm_embeddings)
    print(f"✓ Normalized {len(norm_embeddings)} vectors")
    
    # Create IDs if not provided
    if doc_ids is None:
        doc_ids = np.arange(len(embeddings)).astype('int64')
    else:
        doc_ids = np.array(doc_ids).astype('int64')
    
    # Create Inner Product index (faster and more accurate for normalized vectors)
    base_index = faiss.IndexFlatIP(dim)
    
    # Wrap with IndexIDMap to maintain document IDs
    faiss_index = faiss.IndexIDMap(base_index)
    
    # Add normalized vectors with IDs
    faiss_index.add_with_ids(norm_embeddings, doc_ids)
    
    print(f"✓ FAISS index built with {faiss_index.ntotal} vectors")
    print(f"  Using: IndexIDMap(IndexFlatIP) for cosine similarity")
    
    return faiss_index


def search_similar(query, embedding_model, faiss_index, k=3):
    """
    Search for k most similar examples using cosine similarity.
    
    Args:
        query: Query text
        embedding_model: Sentence transformer model
        faiss_index: FAISS IndexIDMap
        k: Number of results to return
        
    Returns:
        Tuple of (similarities, indices)
        Note: Higher similarity = more similar (opposite of L2 distance)
    """
    # Encode query
    query_embedding = embedding_model.encode([query]).astype('float32')
    
    # Normalize query vector
    faiss.normalize_L2(query_embedding)
    
    # Search (returns similarity scores, not distances)
    similarities, indices = faiss_index.search(query_embedding, k)
    
    return similarities[0], indices[0]

## 7. Code Generation Functions

In [35]:
def build_context(examples):
    """Build context string from retrieved examples."""
    context_parts = []
    for i, ex in enumerate(examples, 1):
        context_parts.append(f"Example {i}:")
        context_parts.append(f"Task: {ex['prompt'].strip()}")
        context_parts.append(f"Solution:\n{ex['canonical_solution'].strip()}")
        context_parts.append("")
    return "\n".join(context_parts)


def create_prompt(task_description, context):
    """Create the full prompt for code generation."""
    return f"""Based on the following examples of Python coding tasks and solutions, generate a complete function for the new task.

{context}

New Task:
{task_description}

Generate a complete, working Python function that solves this task. Include the function signature and implementation. Only return the code, no explanations."""


def extract_code(response):
    """Extract code from the LLM response."""
    if "```python" in response:
        start = response.find("```python") + len("```python")
        end = response.find("```", start)
        return response[start:end].strip()
    elif "```" in response:
        start = response.find("```") + 3
        end = response.find("```", start)
        return response[start:end].strip()
    return response.strip()


def generate_code(task_description, retrieved_examples, api_key, 
                 model="deepseek/deepseek-coder", max_tokens=500, temperature=0.2):
    """
    Generate code using OpenRouter API.
    
    Args:
        task_description: Natural language description of the task
        retrieved_examples: List of similar code examples
        api_key: OpenRouter API key
        model: Model name to use
        max_tokens: Maximum tokens for generation
        temperature: Temperature for generation
        
    Returns:
        Generated code as a string
    """
    client = OpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=api_key
    )
    
    context = build_context(retrieved_examples)
    prompt = create_prompt(task_description, context)
    
    print("🤖 Generating code...")
    response = client.chat.completions.create(
        model=model,
        messages=[
            {
                "role": "system",
                "content": "You are an expert Python programmer. Generate clean, efficient, and well-documented code."
            },
            {
                "role": "user",
                "content": prompt
            }
        ],
        max_tokens=max_tokens,
        temperature=temperature
    )
    
    generated_code = response.choices[0].message.content
    return extract_code(generated_code)

## 8. Main Pipeline Setup

In [36]:
def setup_rag_pipeline(api_key, embedding_model_name="sentence-transformers/all-MiniLM-L6-v2"):
    """
    Setup the complete RAG pipeline with normalized FAISS index.
    
    Args:
        api_key: OpenRouter API key
        embedding_model_name: Name of embedding model to use
        
    Returns:
        Dictionary containing all pipeline components
    """
    print("="*80)
    print("🚀 Setting up RAG Pipeline with Normalized FAISS")
    print("="*80 + "\n")
    
    # Load dataset
    examples = load_humaneval_dataset()
    
    # Create embeddings
    prompts = extract_prompts(examples)
    embedding_model, embeddings = create_embeddings(prompts, embedding_model_name)
    
    # Build normalized FAISS index with Inner Product
    index = build_faiss_index_normalized(embeddings)
    
    print("\n✓ Pipeline setup complete!\n")
    
    return {
        'examples': examples,
        'embedding_model': embedding_model,
        'faiss_index': index,
        'api_key': api_key
    }


## 9. Code Generation Function

In [37]:
def generate_code_for_task(pipeline, task_description, n_examples=3, 
                           generation_model="x-ai/grok-4-fast:free", verbose=True):
    """
    Generate code for a given task using the RAG pipeline.
    
    Args:
        pipeline: Dictionary containing pipeline components from setup_rag_pipeline
        task_description: Natural language description of the coding task
        n_examples: Number of similar examples to retrieve
        generation_model: Model to use for code generation
        verbose: Whether to print retrieval information
        
    Returns:
        Dictionary containing generated code and retrieved examples
    """
    # Retrieve similar examples (higher similarity = more similar)
    similarities, indices = search_similar(
        task_description,
        pipeline['embedding_model'],
        pipeline['faiss_index'],
        k=n_examples
    )
    
    retrieved_examples = [pipeline['examples'][idx] for idx in indices]
    
    if verbose:
        print(f"📚 Retrieved {len(retrieved_examples)} similar examples:")
        print("="*80)
        for i, (ex, sim) in enumerate(zip(retrieved_examples, similarities), 1):
            print(f"{i}. {ex['task_id']} (similarity: {sim:.4f})")
            print(f"   {ex['prompt'][:100]}...")
            print()
    
    # Generate code
    generated_code = generate_code(
        task_description,
        retrieved_examples,
        pipeline['api_key'],
        model=generation_model
    )
    
    return {
        "task_description": task_description,
        "generated_code": generated_code,
        "retrieved_examples": [
            {
                "task_id": ex['task_id'],
                "prompt": ex['prompt'],
                "canonical_solution": ex['canonical_solution'],
                "similarity": float(sim)
            }
            for ex, sim in zip(retrieved_examples, similarities)
        ]
    }


## 10. Utility Functions

In [38]:
def print_result(result):
    """Pretty print the generation result."""
    print("\n" + "="*80)
    print("📝 TASK DESCRIPTION:")
    print("="*80)
    print(result["task_description"])
    
    print("\n" + "="*80)
    print("💻 GENERATED CODE:")
    print("="*80)
    print(result["generated_code"])
    
    print("\n" + "="*80)
    print("📚 RETRIEVED EXAMPLES:")
    print("="*80)
    for i, ex in enumerate(result["retrieved_examples"], 1):
        print(f"\n{i}. {ex['task_id']} (cosine similarity: {ex['similarity']:.4f})")
        print(f"   {ex['prompt'][:150]}...")

## 11. Initialize the Pipeline

Run this cell to set up the entire RAG system:

In [39]:
# Initialize the pipeline
pipeline = setup_rag_pipeline(
    api_key=OPENROUTER_API_KEY,
    embedding_model_name="sentence-transformers/all-MiniLM-L6-v2"
)

🚀 Setting up RAG Pipeline with Normalized FAISS

Loading HumanEval dataset...
✓ Loaded 164 examples
Loading embedding model: sentence-transformers/all-MiniLM-L6-v2
Creating embeddings for 164 texts...


Batches:   0%|          | 0/6 [00:00<?, ?it/s]

✓ Created embeddings with shape: (164, 384)
Building normalized FAISS index with Inner Product...
✓ Normalized 164 vectors
✓ FAISS index built with 164 vectors
  Using: IndexIDMap(IndexFlatIP) for cosine similarity

✓ Pipeline setup complete!



## 12. Example 1: Calculate Median

In [40]:
task1 = """
def calculate_median(numbers: List[float]) -> float:
    \"\"\" Calculate the median of a list of numbers.
    >>> calculate_median([3, 1, 2, 4, 5])
    3.0
    >>> calculate_median([1, 2, 3, 4])
    2.5
    \"\"\"
"""

result1 = generate_code_for_task(
    pipeline=pipeline,
    task_description=task1,
    n_examples=3,
    generation_model="x-ai/grok-4-fast:free"
)

print_result(result1)

📚 Retrieved 3 similar examples:
1. HumanEval/47 (similarity: 0.8026)
   

def median(l: list):
    """Return median of elements in the list l.
    >>> median([3, 1, 2, 4, 5...

2. HumanEval/4 (similarity: 0.6628)
   from typing import List


def mean_absolute_deviation(numbers: List[float]) -> float:
    """ For a ...

3. HumanEval/21 (similarity: 0.5341)
   from typing import List


def rescale_to_unit(numbers: List[float]) -> List[float]:
    """ Given li...

🤖 Generating code...

📝 TASK DESCRIPTION:

def calculate_median(numbers: List[float]) -> float:
    """ Calculate the median of a list of numbers.
    >>> calculate_median([3, 1, 2, 4, 5])
    3.0
    >>> calculate_median([1, 2, 3, 4])
    2.5
    """


💻 GENERATED CODE:
from typing import List


def calculate_median(numbers: List[float]) -> float:
    """ Calculate the median of a list of numbers.
    >>> calculate_median([3, 1, 2, 4, 5])
    3.0
    >>> calculate_median([1, 2, 3, 4])
    2.5
    """
    numbers = sorted(number

## 13. Example 2: Palindrome Check

In [41]:
task2 = """
def is_palindrome(s: str) -> bool:
    \"\"\" Check if a string is a palindrome (ignoring spaces and case).
    >>> is_palindrome("A man a plan a canal Panama")
    True
    >>> is_palindrome("hello")
    False
    \"\"\"
"""

result2 = generate_code_for_task(
    pipeline=pipeline,
    task_description=task2,
    n_examples=3,
    verbose=True
)

print_result(result2)

📚 Retrieved 3 similar examples:
1. HumanEval/48 (similarity: 0.8870)
   

def is_palindrome(text: str):
    """
    Checks if given string is a palindrome
    >>> is_palind...

2. HumanEval/10 (similarity: 0.7355)
   

def is_palindrome(string: str) -> bool:
    """ Test if given string is a palindrome """
    retur...

3. HumanEval/112 (similarity: 0.6228)
   
def reverse_delete(s,c):
    """Task
    We are given two strings s and c, you have to deleted all ...

🤖 Generating code...

📝 TASK DESCRIPTION:

def is_palindrome(s: str) -> bool:
    """ Check if a string is a palindrome (ignoring spaces and case).
    >>> is_palindrome("A man a plan a canal Panama")
    True
    >>> is_palindrome("hello")
    False
    """


💻 GENERATED CODE:
def is_palindrome(s: str) -> bool:
    """ Check if a string is a palindrome (ignoring spaces and case).
    >>> is_palindrome("A man a plan a canal Panama")
    True
    >>> is_palindrome("hello")
    False
    """
    cleaned = ''.join(c.lower() for c

## 14. Example 3: Custom Task

Try your own coding task!

In [42]:
# Define your own task here
custom_task = """
def find_duplicates(nums: List[int]) -> List[int]:
    \"\"\" Find all duplicate numbers in a list.
    >>> find_duplicates([1, 2, 3, 2, 4, 3])
    [2, 3]
    >>> find_duplicates([1, 2, 3, 4])
    []
    \"\"\"
"""

result_custom = generate_code_for_task(
    pipeline=pipeline,
    task_description=custom_task,
    n_examples=3
)

print_result(result_custom)

📚 Retrieved 3 similar examples:
1. HumanEval/26 (similarity: 0.7875)
   from typing import List


def remove_duplicates(numbers: List[int]) -> List[int]:
    """ From a lis...

2. HumanEval/34 (similarity: 0.6059)
   

def unique(l: list):
    """Return sorted unique elements in a list
    >>> unique([5, 3, 5, 2, 3,...

3. HumanEval/104 (similarity: 0.5869)
   
def unique_digits(x):
    """Given a list of positive integers x. return a sorted list of all 
    ...

🤖 Generating code...

📝 TASK DESCRIPTION:

def find_duplicates(nums: List[int]) -> List[int]:
    """ Find all duplicate numbers in a list.
    >>> find_duplicates([1, 2, 3, 2, 4, 3])
    [2, 3]
    >>> find_duplicates([1, 2, 3, 4])
    []
    """


💻 GENERATED CODE:
from collections import Counter

c = Counter(nums)
added = set()
result = []
for n in nums:
    if c[n] > 1 and n not in added:
        result.append(n)
        added.add(n)
return result

📚 RETRIEVED EXAMPLES:

1. HumanEval/26 (cosine similarity: 0.7875)
   from 

## 15. Test Generated Code

You can test if the generated code is valid Python:

In [43]:
import ast

def validate_code(code: str) -> bool:
    """Check if generated code is valid Python."""
    try:
        ast.parse(code)
        return True
    except SyntaxError as e:
        print(f"Syntax Error: {e}")
        return False

# Test the generated code
if validate_code(result1["generated_code"]):
    print("✅ Valid Python code generated!")
else:
    print("❌ Syntax error in generated code")

✅ Valid Python code generated!


## 16. Save/Load FAISS Index (Optional)

Save the index to disk for faster future runs:

In [44]:
# Save index
faiss.write_index(pipeline['faiss_index'], "humaneval_faiss_index.bin")
print("✓ Index saved to humaneval_faiss_index.bin")

# To load later:
# loaded_index = faiss.read_index("humaneval_faiss_index.bin")
# pipeline['faiss_index'] = loaded_index

✓ Index saved to humaneval_faiss_index.bin


## 17. Batch Processing Multiple Tasks

In [None]:
tasks = [
    "def reverse_string(s: str) -> str: # Reverse a string",
    "def count_vowels(text: str) -> int: # Count vowels in text",
    "def is_prime(n: int) -> bool: # Check if number is prime"
]

results = []
for i, task in enumerate(tasks, 1):
    print(f"\n{'='*80}")
    print(f"Processing Task {i}/{len(tasks)}")
    print(f"{'='*80}")
    
    result = generate_code_for_task(
        pipeline=pipeline,
        task_description=task,
        n_examples=2,
        verbose=False
    )
    results.append(result)
    print(f"\n✓ Generated code for: {task[:50]}...")
    print(result["generated_code"])

print(f"\n✅ Completed {len(results)} tasks!")


Processing Task 1/3
🤖 Generating code...

✓ Generated code for: def reverse_string(s: str) -> str: # Reverse a str...
def reverse_string(s: str) -> str:
    """Reverse a string.
    >>> reverse_string('hello')
    'olleh'
    """
    return s[::-1]

Processing Task 2/3
🤖 Generating code...

✓ Generated code for: def count_vowels(text: str) -> int: # Count vowels...
def count_vowels(text: str) -> int:
    vowels = "aeiouAEIOU"
    return sum(c in vowels for c in text)

Processing Task 3/3
🤖 Generating code...

✓ Generated code for: def is_prime(n: int) -> bool: # Check if number is...
import math

def is_prime(n: int) -> bool:
    """Return True if a given number is prime, and False otherwise."""
    if n < 2:
        return False
    for k in range(2, int(math.sqrt(n)) + 1):
        if n % k == 0:
            return False
    return True

✅ Completed 3 tasks!


## 🎯 Summary

You've successfully created a RAG code generation system that:
- ✅ Loads the HumanEval dataset
- ✅ Creates embeddings using Sentence Transformers
- ✅ Builds a FAISS vector index for fast retrieval
- ✅ Retrieves similar code examples
- ✅ Generates new code using open-source LLMs via OpenRouter

### Next Steps:
1. Try different embedding models for better retrieval
2. Experiment with various generation models
3. Adjust `n_examples` based on task complexity
4. Save the index for faster future runs
5. Implement evaluation metrics for generated code

In [None]:
# ==================== Evaluation Functions ====================

def evaluate_faiss_accuracy(index, embeddings, ids, k=5):
    """
    Evaluate retrieval accuracy for FAISS index using self-retrieval.
    Each prompt should retrieve itself as the nearest neighbor.
    
    Args:
        index: FAISS index
        embeddings: np.ndarray of normalized embeddings
        ids: list of task_ids in the same order
        k: number of neighbors to check
    
    Returns:
        dict with accuracy@1, accuracy@k, and MRR
    """
    total = len(ids)
    correct_at_1 = 0
    correct_at_k = 0
    reciprocal_ranks = []

    for i, emb in enumerate(embeddings):
        query = np.expand_dims(emb, axis=0).astype("float32")
        faiss.normalize_L2(query)
        D, I = index.search(query, k)

        retrieved_ids = [ids[j] for j in I[0] if j != -1]

        # accuracy@1
        if ids[i] == retrieved_ids[0]:
            correct_at_1 += 1
            reciprocal_ranks.append(1.0)
        else:
            # accuracy@k
            if ids[i] in retrieved_ids:
                correct_at_k += 1
                rank = retrieved_ids.index(ids[i]) + 1
                reciprocal_ranks.append(1.0 / rank)
            else:
                reciprocal_ranks.append(0.0)

    acc1 = correct_at_1 / total
    acck = (correct_at_1 + correct_at_k) / total
    mrr = sum(reciprocal_ranks) / total

    return {"accuracy@1": acc1, f"accuracy@{k}": acck, "MRR": mrr}


In [None]:
# Evaluate FAISS index quality
embeddings = pipeline['embedding_model'].encode(
    extract_prompts(pipeline['examples'])
).astype("float32")
faiss.normalize_L2(embeddings)

results = evaluate_faiss_accuracy(
    pipeline['faiss_index'],
    embeddings,
    [ex['task_id'] for ex in pipeline['examples']],
    k=5
)
print("\n📊 FAISS Retrieval Evaluation:")
for metric, value in results.items():
    print(f"  {metric}: {value:.4f}")



📊 FAISS Retrieval Evaluation:
  accuracy@1: 1.0000
  accuracy@5: 1.0000
  MRR: 1.0000
